C
C#3mo ago
CodeMan

Dynamic channel/queue system where only one unit of a particular group can process at any given time

Hello all. For a specific requirement, I need to build a channel/queue system for units of work, where each unit has a "group ID". No more than one unit per group can be allowed to process at any given time. The number of groups is not known at runtime, and may quickly scale into the hundreds as units arrive. Each unit, when processing, resolves within a second. I have experimented with dynamically creating one channel per group, but frequently standing up and tearing down channels seems inefficient. Each channel also consumes a task for processing. A single channel will not suffice as I need to be able to process many units in parallel without being locked behind a waiting group. I have also experimented with allowing the units to exists as simple tasks on the scheduler, waiting for a TaskCompletionSource, but fear I could starve the threads. I was wondering if anybody had some ideas or suggestions. I have attached a simple diagram. I do not have any code I can share at this time.
No description
8 Replies
Sossenbinder
Sossenbinder3mo ago
Would a ConcurrentDictionary storing the active task per group Id be an option? Best case you can distribute Tasks piece by piece with little collision Worst case you need to sequentially wait if an initial queue containing all items might have a sequence of the same ID somewhere Or you could reenqueue the blocked item in case order is not as important
CodeMan
CodeMan3mo ago
Order is unfortunately important. I have considered a few variations of a concurrent dictionary as well. For example, ConcurrentDictionary<string, Channel<UnitOfWork>>. One of my problems is I do not want to eat memory by spinning up new channels/queues and then having to discard them shortly thereafter as they are most likely short lived. I considered having a shared pool of channels but that seems like it could get me into trouble. Even wilder ideas are to use something like Orleans and have a grain dedicated to a group ID. I do not have a lot of experience with this setup, so I am hoping there is maybe a pattern or library that would make this kind of problem simpler to work with. Thinking about going with something like this.
internal class MyReceivingWorker : MyWorker
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _turns = new();

protected override async Task DoWork(Item item)
{
_ = EnqueueItem(item);

await Task.CompletedTask;
}

private async Task EnqueueItem(Item item)
{
var thisTurn = new TaskCompletionSource<bool>();

var previousTurn = GetTurnToAwait(item.GroupId, thisTurn);

if (previousTurn != null)
{
await previousTurn.Task;
}

try
{
// Do stuff.
}
catch (Exception ex)
{
await EmitLog(new LogMessage(
LogSeverity.Error,
GetType().Name,
ex.Message,
ex));
}
finally
{
thisTurn.SetResult(true);
CleanupTurns();
}
}

private TaskCompletionSource<bool>? GetTurnToAwait(string groupId, TaskCompletionSource<bool> nextTurn)
{
lock (_turns)
{
TaskCompletionSource<bool>? currentTurn = null;

if (_turns.TryGetValue(groupId, out var tcs))
{
currentTurn = tcs;
}

_turns[groupId] = nextTurn;

return currentTurn;
}
}

private void CleanupTurns()
{
lock (_turns)
{
var completedEntries = _turns
.Where(x => x.Value.Task.IsCompleted)
.ToList();

foreach (var completedEntry in completedEntries)
{
_turns.TryRemove(completedEntry.Key, out _);
}
}
}
}
internal class MyReceivingWorker : MyWorker
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<bool>> _turns = new();

protected override async Task DoWork(Item item)
{
_ = EnqueueItem(item);

await Task.CompletedTask;
}

private async Task EnqueueItem(Item item)
{
var thisTurn = new TaskCompletionSource<bool>();

var previousTurn = GetTurnToAwait(item.GroupId, thisTurn);

if (previousTurn != null)
{
await previousTurn.Task;
}

try
{
// Do stuff.
}
catch (Exception ex)
{
await EmitLog(new LogMessage(
LogSeverity.Error,
GetType().Name,
ex.Message,
ex));
}
finally
{
thisTurn.SetResult(true);
CleanupTurns();
}
}

private TaskCompletionSource<bool>? GetTurnToAwait(string groupId, TaskCompletionSource<bool> nextTurn)
{
lock (_turns)
{
TaskCompletionSource<bool>? currentTurn = null;

if (_turns.TryGetValue(groupId, out var tcs))
{
currentTurn = tcs;
}

_turns[groupId] = nextTurn;

return currentTurn;
}
}

private void CleanupTurns()
{
lock (_turns)
{
var completedEntries = _turns
.Where(x => x.Value.Task.IsCompleted)
.ToList();

foreach (var completedEntry in completedEntries)
{
_turns.TryRemove(completedEntry.Key, out _);
}
}
}
}
DaVinki
DaVinki3mo ago
When there are new units of work, is it a first come first serve basis on which unit gets to be processed first by first appearance If so then you could use a sorted set for pending units and then when it's their time, pop them out of the sorted set and push them into a channel which is now dedicated to that specific unit
CodeMan
CodeMan3mo ago
When a unit arrives, it is first come first serve. The only caveat being the group condition where a unit must wait for any actively processing unit of the same group.
DaVinki
DaVinki3mo ago
The sorted set idea would be more about the incoming units and how they are sent to a queue for processing with the rest of their group
No description
DaVinki
DaVinki3mo ago
You would just need something to forward units to queues actively working on unit group N if it's "late" or something when a consumer is done with the queue, it takes in another unit group of another ID and repeats SortedSet isn't array backed so it'll only be as large as it needs to be, no creating more queues
CodeMan
CodeMan3mo ago
Ah, I think I see what you mean. There might be something I can do on that end to optimize it. Generally speaking, the units and their group IDs are incoming messages from a third party. I do not know much at runtime. I simply handle the messages as they come. What you're saying just clicked with me. That is an interesting idea. The "unit groups" are fairly short lived. It may see 2 or 3 messages before needing to be discarded. There is a high occurence of new "group IDs" and few items belonging to each group. The work generally resolves in no more than a second or two. In your example, I would just need a way to ensure the work is completed as fast as possible. Probably raise an event to let the consumers know that there is a new group to pickup.
DaVinki
DaVinki3mo ago
Yeah, you could do something like a manual reset event where an incoming unit would notify threads that there's something new and available
Want results from more Discord servers?
Add your server
More Posts