Building Pub-Sub with Durable Objects
Can anyone suggest a potential architecture using Durable Objects in order to create a "BuddyList" functionality? The goal here is to be able to propagate status updates ( online / offline ) at scale to all users belonging to the BuddyList.
Our current design uses a DO instance per user and our
setStatus()
DO method utilizes a fan-out with batch concurrency in order to iterate through each "buddy" of a user, fetching the buddy's remote DO instance in order to push the status state.
Does that make sense? We've found the fan-out is brittle, timeouts happen, probable issues with fan-out storms. Retries with back-offs aren't great either. We don't really have the luxury of a status update failing.
I think the question is generalized as how could we design a pub-sub using DOs? I'm at a bit of a cross-roads here trying to figure out the best solution. In our previous system ( before CF ), we used a Redis cluster to maintain buddylist state and this solution worked well for buddylists up to 1,000 members in size.45 Replies
Created thread.
I think the best approach may be to refactor the (n) RPC fan-out of the BuddyListDO to instead use a hub-and-spoke approach with a new Durable Object class, ServerDO.
This would entail each client connecting to a single ServerDO instance via websocket for updates, instead of each client connecting to their DO instance to perform fan-outs. This would be a hub-and-spoke model instead of a full peer-to-peer fan-out.
The downside is that we are capping our concurrent users for the entire system to a single DO instance; however I don't see this being a problem we'll need to deal with in the short-term.
I think the question is generalized as how could we design a pub-sub using DOs?This is a problem. Designing a general pub-sub system without explicit constraints is just very broad with a million variations and solutions. There are simpler approaches once you put some concrete constraints into the equation. For example: 1. how many users are in each BuddyList on average/p99/p100? 2. how fast do you need the status propagation to happen? 3. how many BuddyLists can a user be part of? And how many of those need the status updates to be always synced? 4. How many users do you need to support, how many BuddyLists? Depending on these numbers, there are solutions that are easier to do than a "generic pub-sub to suppport any use-case". Just to elaborate as example, although I won't go into specifics until you answer the above.
Our current design uses a DO instance per user and our setStatus() DO method utilizes a fan-out with batch concurrency in order to iterate through each "buddy" of a user, fetching the buddy's remote DO instance in order to push the status state.This has a very high degree of fan out. If you had a DO per Buddylist, the users should only update their own status in their own DO, and then the user DO should only update the BuddyList DOs. Then, the updates could flow from the BuddyList DOs down to the users periodically, batching updates if needed for all users of that buddylist, or users poll the buddylist DO. You could even have the Buddylist DOs publish into KV an item with the entire buddylist status every N seconds (10,30,60), and the users would just read those few KV items, and your RPC calls now are orders of magnitude less. Again, without numbers this could not be a good solution, or it could be one.
Thanks for the reply @lambrospetrou
how many users are in each BuddyList on average/p99/p100?
1000 capped, average might be under 100
how fast do you need the status propagation to happen?
under 5 seconds
how many BuddyLists can a user be part of? And how many of those need the status updates to be always synced?
(N), where N is the total number of users in the system, for example my personal account is added to every BuddyList
How many users do you need to support, how many BuddyLists?
As many as possible. 20k to start as a compromise. The current design with the fan-out should work for (N), assuming a capped amount of buddies per list.
___
I'd prefer to keep the fan-out strategy in place, but it's throwing a lot of timeout errors. We haven't been able to get
setStatus()
loop working consistently. Updating 500 buddies at once using batches of various sizes routinely ends with timeout errors in calling the remote DO instances. Timeout errors are relatively random. Seems brittle to depend on invoking hundreds of DO's per status update.
We think using a hub-and-spoke approach to broadcast the buddylist will be the best solution, capping our max user count to under 20k concurrent users for the short term. KV feels like it will be to slow, and any strategy that negates websocket hibernation will cost too much.
If you had a DO per Buddylist, the users should only update their own status in their own DO, and then the user DO should only update the BuddyList DOs.This is how the current setup works. We can't get consistent results performing the batch updates, see above. Timeout errors are happening, inconsistent results from processing the batch updates.
how many users are in each BuddyList on average/p99/p100? 1000 capped, average might be under 100 ... (N), where N is the total number of users in the system, for example my personal account is added to every BuddyListMaybe I am missing the point of the BuddyList, this means you can have millions of Buddylists and thousands of users being in most of those (even with the cap per Buddylist), so you would need to be propagating millions and millions of status updates every few seconds.
KV feels like it will be to slow,What do you mean slow? Another question, how often does a user change their status? Is your status like a post or like an online/offline signal?
Seems brittle to depend on invoking hundreds of DO's per status updateUnless I am missing something your most popular users will need many thousands of DOs to be updated each time, not just a few hundred, and at that point it's almost guaranteed that some of those will be failing.
I think we have a disconnect here. The max amount of push updates for the fan-out would be capped at the limit of users on the buddylist, 1000. The max amount of push updates for a user would 1,000.
yes but you said a user can be in all the buddylists, so that's ALL*1000, unless you do a layer of aggregation and batching considering the target users
When I make a status update, it only needs to propagate to the 1,000 users in my list. Not everyone gets the update unless they are 1,000 most recently active.
You are in the all the Buddylists, right, so is each status PER Buddylist, or is your status going to be broadcasted to all the Buddylists you are part of?
In that case I push my status update and the system will create a batch of 1,000 of the most recent users in my list, so not everyone gets the update at the larger caps. Not ideal, but was a deliberate design design for the early build.
I push my status update and the system will create a batch of 1,000 of the most recent users in my listWhat is "my list"? You said you are in many Buddylists
The KV was considered, but I am to understand it may take 30-60 seconds to propagate updates.
Everyone gets a buddylist. When you signup a new account it auto-adds my account, and then my list auto-adds you. In this case the new user needs to push updates to (1) account and my list ( being "Marak" ) needs to push updates to all buddies. Do you understand how a buddylist works? Like a friends list on Discord?
Status changes may happen frequently. Usually status changes are only associated with login / logout.
I am still seeing some contradiction from the previous answers. At least it's still not clear a) how many Buddylists a user can be part of, b) what is the scope of each status update, is it all the Buddylists I am part of, do I choose only one or a few.
A user could be on (N) buddylists, I've capped the fan-out to only consider the last 1,000 by utime.
The scope of the status update is currently 2 small text fields. Status text and profile picture.
When a user updates their status, it's pushed to every single buddy on that specific users list, capped at 1,000.
I've capped the fan-out to only consider the last 1,000 by utimeHere do you refer to 1000 "users" or 1000 Buddylists out of those N that I am part of?
The scope of the status update is currently 2 small text fields. Status text and profile picture.By scope I meant, which Buddylists of the N I am part of should be updated. All, or some based on my choice?
When attempting the fan-out update, we see random DO's just timeout. In this screenshot we can see two random users
ok
and Tom
failed to update.
Just make an example:
- User: Marak
- Marak is part of 10K Buddylists.
- Each of those 10K buddylists has 1000 users (10% overlap let's assume among buddylists)
Now Marak posts a status update "where?". Is it to all those 10K buddylists, or do you choose one?
We run a query to generate the batch update for "Marak" that fetches the last 1,000 users on his list by utime.
Not ideal, but was considered a stop-gap for early versions of the system.
You keep refering to "his list", but Marak is part of many buddylists.
I am honestly not following the application model.
Yeah, it's okay. We are having a disconnect here.
Each user get's their own DO which contains their own buddylist in a separate SQLite storage.
It's not a single source of truth for all the buddylists, they are separately stored.
The same design worked fantastically when we had it backed by a Redis cluster. We were able to easily make 1,000 updates at once very quickly. There is a degree of duplication to the data to ensure scalability. If that helps contextualize the design.
If I am getting this right, then, you have "friendship" defined as 2 edges if it was modeled as a graph. Each edge is stored in a separate DO (DO1 stores "A is friend of B", DO2 stores "B is friend of A"), instead of having one edge indicating "A and B are friends".
Yes
OK, then the confusion was when you said a user can be part of many buddylists, it just meant they can have a lot of friends. You don't care about the contents of those other Buddylists. Anyway, now it's clear.
🙂
The crux of the issue seems to around how much batch processing of DO instances is viable. I was under the assumption we could do at least 500 updates in batches of 100. I can reduce batch size to 25 updates. It seems batch updates at scale is problematic. Earlier discussions about this approach in the chat seemed to indicate we would be okay. Production usage is having some issues.
I personally would avoid such big RPC fan outs if you need at least once delivery. Especially if those target DOs are spread around, you will almost always be hitting a bad one.
Exactly! Yes. This is what we have found.
I suspect hub and spoke is going to be much more reliable, and eventually we will need to migrate to KV or a more performant queuing system or an actual pub-sub.
It's almost impossible to do this without having a higher level aggregation in the fan out logic. It won't work blasting out millions of RPCs all the time.
You can reduce these RPCs a lot by grouping users into "shards", with one DO managing that group of users, aggregating updates of those, and sending them further down only to the users necessary.
Something like:
the usergroups don't even need to based on the friendships, although if you can be a bit smart and find common friends and put them in the same group then you drastically reduce RPCs needed
That's the current idea for moving forward. We'll create a single shard using a separate ws connection from the client itself. Capping at 20k concurrent users. Once we start exceeding that limit we'll figure out the more intelligent sharding strategies or migrate systems.
So, no need to perform batch updates of fetching DO's. Only a single DO with subscriptions
You mean having literally one DO managing all the users???
There will be one DO for managing status updates for all users. Each user still get's their own DO.
So like if you think in terms of Discord, we'll have a "Server" that manages all status updates for each member of the "Server"
That's not going to work either, once you get over a few hundred/low thousands users
Why not use Y of those DOs?
How many concurrent websocket connections can a single DO handle? I thought it could do 20k?
Also, if that one DO is down your whole system is down. The benefit of DOs is that they are decoupled, and you should not depend on a single DO for the whole system.
That's the main bottleneck for D1 for example, since it's one DO per DB.
In theory, but as always, it depends on load, throughput, latency per call, etc.
Not really. It would just be status updates that are down. Buddy list would still load, messaging would still work, etc.
Well, sure, I meant the whole feature.
I think we are on the same page. I'm building towards sharding and starting with a single shard.
Always do at least 2-3. Otherwise your code will inevitably have some assumption that will break once you go above 1. 😅
Where would the choice of shard come into play? The client would need to determine where to connect to? Perhaps a single worker that would return a unique endpoint for the most available shard? Does that make sense?
You can start with Y (2-10) shards initially, and hashing your user ID to those.
If you want to make it future-proof from the start, you can use something like Randezvous Hashing, or Jump Consistent Hashing, which hash in a predictable manner given the number of shards and you can migrate a user from one shard to another according to the Y historical values.
Example:
1. A fresh user A in your system starts their session, you have Y=10 as the initial starting point of number of shards, so you go to the shard that handles the hashed user ID.
2. Store some info in its local storage.
3. Now you decide to increase Y to 100, and the Y history of values is
[10, 100]
.
5. User A disconnects
6. User A now reconnects, you go to the shard based on Y=100, you don't find anything locally in its storage so now you need to ask the previous shard the user would be mapped based on Y=10. This is to migrate this user's state, if there is anything and clean up the previous shard for this user.
Again, mostly as an optimization in case you have persistent state in these shard DOs. Static sharding works for a while if you pick a high number (Y=20-50) and manage to group users more efficiently over time.
There are a lot of ways to optimize the above, even entirely different approaches, but the original RPC approach for each user or the single DO as bottleneck should be avoided.
To simplify the thinking around this, imagine your previous design with Redis should be split in multiple clusters. Each cluster is represented by each of these Shard DOs.
Not as simple as a simple cluster, obviously, but that's the way I am thinking about it.
Which of course might be different from other solutions 😺Yeah, thanks. I think I understand well. This should be relatively easy to implement. The source of truth for the buddylist is D1 and replicated into the DO. It should be easy to create an elastic sharding pool and stick users to specific shards, having each shard passing state to each other via persistent connection.
BTW, this exact use-case of fanout is something we are starting to think about how to make much easier to implement with DOs.
So, if you implement the above, post here with the results or improvements you saw.
Or regressions 😅
Getting confirmation that the batch DO RPC is not viable was good to hear.
I appreciate your time today @lambrospetrou. I will keep you posted. Our system has been in production for 6+ months now. User count grows each day.
I mean, don't get me wrong, the RPC simple approach works very well in a small degree of fan-out. But, once you go above 100 in my experience it never works well.
Yes. Thank you.
We may be silent but I am sure I am not the only one craving and enjoying such deep into details discussions about some strategies with DOs. This was fun to read on my lunch break, thanks for sharing!
@lambrospetrou -
I have powered through and deployed the new service here using a distributed stateful pub-sub.
We’ve built a replicated state mesh on top of Durable Objects, where each DO instance (hub-0, hub-1, hub-2, etc.) acts as a realtime hub for client connections. This pattern could be generalized for any kind of data, our current build is focusing on buddylist data.
Each hub maintains a full copy of the buddylist state in local SQLite for low-latency fan-out, while D1 serves as the global commit log and source of truth.
On updates (e.g.
addBuddy()
, setStatus()
), we write to D1, update the local SQLite, and broadcast the change across the mesh ( which writes to the each node's SQLite ) so all hubs converge on the same state. The result is essentially a stateful pub-sub cluster. Clients can connect to any available hub, get the full state instantly, and stay synced through realtime replication.
Discovery is based on finding the most available hub, clients are currently not routed by hashing or any stickiness.
Current count is (1) hub, we haven't fully enabled the mesh yet. It's not much more code; the mechanisms are in place to scale-out assuming our design assumptions hold up. Planning to fully deploy the mesh within a month, depending on our site traffic.