Best way to implement a thread-safe reactive collection elements?

I have a backend service that performs a long running task. The task is multi-threaded and produces Results. Every time a Result is produced it gets pushed into a collection. My frontend should be able to view these Results as they are submitted but it must also be able to get the previously submitted results. It should be able to get the results in the order they were submitted. My plan is to use server side events/websockets to sent the results to the frontend. On connection it should receive a "catch up" of all previous results then it will handle new ones as they come in. I think I can accomplish this using something like reactor-core Sinks.many().replay().all() but I'd prefer to avoid bringing that in. My problem is that I'm not sure how to make the previous results safely (and simply) "observable" in this manner. Ensuring that a new client never receives a duplicate result (if a result is submitted while it is in the "catch up" phase) and that it gets events in the correct order. Is something like a custom SubmissionPublisher a good starting point? Does anyone have any suggestions for how to go about this?
10 Replies
JavaBot
JavaBot3y ago
This post has been reserved for your question.
Hey @undoublethink! Please use /close or the Close Post button above when you're finished. Please remember to follow the help guidelines. This post will be automatically closed after 300 minutes of inactivity.
TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here.
Unknown User
Unknown User3y ago
Message Not Public
Sign In & Join Server To View
undoublethink
undoublethinkOP3y ago
Thanks for the response. Are you able to expand on that? My collection is already a concurrent linked queue but I don't think it solves the problem in this case?
JavaBot
JavaBot3y ago
If you are finished with your post, please close it. If you are not, please ignore this message. Note that you will not be able to send further messages here after this post have been closed but you will be able to create new posts. If you are finished with your post, please close it. If you are not, please ignore this message. Note that you will not be able to send further messages here after this post have been closed but you will be able to create new posts.
Unknown User
Unknown User3y ago
Message Not Public
Sign In & Join Server To View
undoublethink
undoublethinkOP3y ago
I see thread safety issues in two cases: 1. A result arrives while I'm sending the old results to a new client. 2. A result arrives after I've sent the old results but before I've created a "subscription" for new results. I suppose for that to work I'd have to additionally synchronize everything (prevent additions to the queue) while a new client is subscribing? Which is fine and roughly my plan but I was hoping for a better or more standardised solution (this seems like something that is relatively common?)
Unknown User
Unknown User3y ago
Message Not Public
Sign In & Join Server To View
undoublethink
undoublethinkOP3y ago
Could do but then it could receive new events before it's finished receiving all old events?
Unknown User
Unknown User3y ago
Message Not Public
Sign In & Join Server To View
JavaBot
JavaBot3y ago
💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one.

Did you find this page helpful?