C
C#•3d ago
AllMight

Channels usage for serial chunked async calls

I get a list of ids and I have one service I need to call in chunks in parallel and once a chunk is done start calling a second service with chunks in a different size also in parallel. I used channels but not sure if it is the correct approach or is there an easier way to do it (also not sure if write to channel needs to be async?)
12 Replies
AllMight
AllMightOP•3d ago
Here is the code:
public async Task<IReadOnlyList<string>> Execute(List<string> ids)
{
ConcurrentBag<string> results = [];

var channel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }
);

await Task.WhenAll(
Producer(ids, channel.Writer),
Consumer(channel.Reader, results)
);

return [.. results];
}
public async Task<IReadOnlyList<string>> Execute(List<string> ids)
{
ConcurrentBag<string> results = [];

var channel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }
);

await Task.WhenAll(
Producer(ids, channel.Writer),
Consumer(channel.Reader, results)
);

return [.. results];
}
private async Task Producer(List<string> ids, ChannelWriter<string> writer)
{
using SemaphoreSlim semaphoreSlim = new(2);

await Task.WhenAll(
ids
.Chunk(10_000)
.Select(async (chunk) =>
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var resultItem in await GetService1Data([.. chunk]))
{
writer.TryWrite(resultItem);
}
}
finally
{
semaphoreSlim.Release();
}
})
);

writer.Complete();
}

private async Task Consumer(
ChannelReader<string> reader,
ConcurrentBag<string> results)
{
HashSet<string> currentItems = [];
List<Task> service2Tasks = [];
using SemaphoreSlim semaphoreSlim = new(5);

async Task Execute(List<string> items)
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var item in await GetService2Data(items))
{
results.Add(item);
}
}
finally
{
semaphoreSlim.Release();
}
}

await foreach (var item in reader.ReadAllAsync())
{
currentItems.Add(item);

if (currentItems.Count == 1_000)
{
service2Tasks.Add(Execute([.. currentItems]));
currentItems.Clear();
}
}

if (currentItems.Count > 0)
{
service2Tasks.Add(Execute([.. currentItems]));
}

await Task.WhenAll(service2Tasks);
}
private async Task Producer(List<string> ids, ChannelWriter<string> writer)
{
using SemaphoreSlim semaphoreSlim = new(2);

await Task.WhenAll(
ids
.Chunk(10_000)
.Select(async (chunk) =>
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var resultItem in await GetService1Data([.. chunk]))
{
writer.TryWrite(resultItem);
}
}
finally
{
semaphoreSlim.Release();
}
})
);

writer.Complete();
}

private async Task Consumer(
ChannelReader<string> reader,
ConcurrentBag<string> results)
{
HashSet<string> currentItems = [];
List<Task> service2Tasks = [];
using SemaphoreSlim semaphoreSlim = new(5);

async Task Execute(List<string> items)
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var item in await GetService2Data(items))
{
results.Add(item);
}
}
finally
{
semaphoreSlim.Release();
}
}

await foreach (var item in reader.ReadAllAsync())
{
currentItems.Add(item);

if (currentItems.Count == 1_000)
{
service2Tasks.Add(Execute([.. currentItems]));
currentItems.Clear();
}
}

if (currentItems.Count > 0)
{
service2Tasks.Add(Execute([.. currentItems]));
}

await Task.WhenAll(service2Tasks);
}
Unknown User
Unknown User•2d ago
Message Not Public
Sign In & Join Server To View
AllMight
AllMightOP•2d ago
@TeBeCo So just let me make sure I get correctly: 1. Channels ARE the correct solution here? 2. After GetService2Data call instead of adding to the ConcurrentBag send it to a new channel. The new channel will handle the "concurrency probelm" and I can just add to a list from the reader 3. If I should avoid IAsyncEnumerable - what;s your alternative suggestion? Also why is it a bad idea? Mind explaining? 4. Should I use TryWrite or WriteAsync ? 5. Currently the service 2 semaphore is used after adding to the list - maybe I should WaitAsync inside the async enumreable? (unless of course I get rid of it) Ah got it - you mean as there are multiple tasks writing to the results channel I need multiple producers in the results channel. Makes sense So you mean to say (relating to my question number 2) that it is better the ConcurrentBag in terms of performance?
Unknown User
Unknown User•2d ago
Message Not Public
Sign In & Join Server To View
MODiX
MODiX•2d ago
see $channel
Unknown User
Unknown User•2d ago
Message Not Public
Sign In & Join Server To View
AllMight
AllMightOP•23h ago
@TeBeCo I read the links - some of my questions still stand though Also it seems IAsyncEnumerable is actually preferred in most cases if I understood correctly @TeBeCo did some changes and will post again the code But still waiting for some questions answered if you have an answer 🙂
Unknown User
Unknown User•22h ago
Message Not Public
Sign In & Join Server To View
AllMight
AllMightOP•22h ago
@TeBeCo first of all sleeping is important and we can continue tomorrow 🙂 Concerning semaphore - I used it to limit the parallel calls. The thing is - I chunk the incoming items so I am not entirely sure if I need bounded channel or use semaphore to limit the reading amount or just read it all and let the semaphore and tasks list handle the limiting. Anyway I guess the example I a bit complex. I will post an updates one and hopefully you will be more "sharp" tomorrow and could help. Good night!
AllMight
AllMightOP•22h ago
Btw that is the ReadAllAsync source (pretty similar):
No description
AllMight
AllMightOP•22h ago
Lastly here is an updated version - please let me know how does it seem tomorrow:
public async Task<IReadOnlyList<string>> Execute(List<string> ids)
{
var service1DataChannel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }
);
var resultsDataChannel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false }
);

await Task.WhenAll(
Service1DataProducer(ids, service1DataChannel.Writer),
Service1ConsumerToResultsProducer(service1DataChannel.Reader, resultsDataChannel.Writer)
);

return await ResultsConsumer(resultsDataChannel.Reader);
}

private async Task Service1DataProducer(List<string> ids, ChannelWriter<string> service1DataWriter)
{
using SemaphoreSlim semaphoreSlim = new(2);

await Task.WhenAll(
ids
.Chunk(10_000)
.Select(async (chunk) =>
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var resultItem in await GetService1Data([.. chunk]))
{
await service1DataWriter.WriteAsync(resultItem);
}
}
finally
{
semaphoreSlim.Release();
}
})
);

service1DataWriter.Complete();
}
public async Task<IReadOnlyList<string>> Execute(List<string> ids)
{
var service1DataChannel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }
);
var resultsDataChannel = Channel.CreateUnbounded<string>(
new UnboundedChannelOptions() { SingleReader = true, SingleWriter = false }
);

await Task.WhenAll(
Service1DataProducer(ids, service1DataChannel.Writer),
Service1ConsumerToResultsProducer(service1DataChannel.Reader, resultsDataChannel.Writer)
);

return await ResultsConsumer(resultsDataChannel.Reader);
}

private async Task Service1DataProducer(List<string> ids, ChannelWriter<string> service1DataWriter)
{
using SemaphoreSlim semaphoreSlim = new(2);

await Task.WhenAll(
ids
.Chunk(10_000)
.Select(async (chunk) =>
{
await semaphoreSlim.WaitAsync();

try
{
foreach (var resultItem in await GetService1Data([.. chunk]))
{
await service1DataWriter.WriteAsync(resultItem);
}
}
finally
{
semaphoreSlim.Release();
}
})
);

service1DataWriter.Complete();
}
private async Task Service1ConsumerToResultsProducer(
ChannelReader<string> service1DataReader,
ChannelWriter<string> resultsWriter)
{
HashSet<string> currentItems = [];
List<Task> service2Tasks = [];
using SemaphoreSlim semaphoreSlim = new(5);

async Task Execute(List<string> items)
{
try
{
foreach (var item in await GetService2Data(items))
{
await resultsWriter.WriteAsync(item);
}
}
finally
{
semaphoreSlim.Release();
}
}

await foreach (var item in service1DataReader.ReadAllAsync())
{
currentItems.Add(item);

if (currentItems.Count == 1_000)
{
await semaphoreSlim.WaitAsync();
service2Tasks.Add(Execute([.. currentItems]));
currentItems.Clear();
}
}

if (currentItems.Count > 0)
{
await semaphoreSlim.WaitAsync();
service2Tasks.Add(Execute([.. currentItems]));
}

await Task.WhenAll(service2Tasks);

resultsWriter.Complete();
}

private async Task<List<string>> ResultsConsumer(ChannelReader<string> resultsReader)
{
List<string> results = [];

await foreach (var result in resultsReader.ReadAllAsync())
{
results.Add(result);
}

return results;
}
private async Task Service1ConsumerToResultsProducer(
ChannelReader<string> service1DataReader,
ChannelWriter<string> resultsWriter)
{
HashSet<string> currentItems = [];
List<Task> service2Tasks = [];
using SemaphoreSlim semaphoreSlim = new(5);

async Task Execute(List<string> items)
{
try
{
foreach (var item in await GetService2Data(items))
{
await resultsWriter.WriteAsync(item);
}
}
finally
{
semaphoreSlim.Release();
}
}

await foreach (var item in service1DataReader.ReadAllAsync())
{
currentItems.Add(item);

if (currentItems.Count == 1_000)
{
await semaphoreSlim.WaitAsync();
service2Tasks.Add(Execute([.. currentItems]));
currentItems.Clear();
}
}

if (currentItems.Count > 0)
{
await semaphoreSlim.WaitAsync();
service2Tasks.Add(Execute([.. currentItems]));
}

await Task.WhenAll(service2Tasks);

resultsWriter.Complete();
}

private async Task<List<string>> ResultsConsumer(ChannelReader<string> resultsReader)
{
List<string> results = [];

await foreach (var result in resultsReader.ReadAllAsync())
{
results.Add(result);
}

return results;
}

Did you find this page helpful?