C
C#•4w ago
Merlin

Recommended way to implement a Channel consumer with async handlers & backpressure?

Hey everyone 👋 I’m working on a microservice (DDD style) where aggregates communicate through an in-memory Channel (System.Threading.Channels). Producers push domain messages/events into it, and a background consumer runs handlers for each message. I want to keep backpressure (so producers block if the consumer is behind) but still handle multiple messages asynchronously — without serializing everything or flooding the system. Here’s the trade-off I’m running into:
await foreach (var msg in reader.ReadAllAsync())
{
await HandleAsync(msg); // preserves backpressure but no parallelism
}


await foreach (var msg in reader.ReadAllAsync())
{
_ = Task.Run(() => HandleAsync(msg)); // loses backpressure
}
await foreach (var msg in reader.ReadAllAsync())
{
await HandleAsync(msg); // preserves backpressure but no parallelism
}


await foreach (var msg in reader.ReadAllAsync())
{
_ = Task.Run(() => HandleAsync(msg)); // loses backpressure
}
20 Replies
Jimmacle
Jimmacle•4w ago
seems like you want to implement a concurrency limit so only N messages can be handled at once
Merlin
MerlinOP•4w ago
yes, but to set static amount of workers seems limitting given if its deployed to cloud it can scale and i wont be able to optimize the ideal amount of workers
Jimmacle
Jimmacle•4w ago
then make it dynamic
Merlin
MerlinOP•4w ago
okay and would u advise to do something like Environment.ProcessorCount and multiply that by some constant ? to get roughly the amount of threads we want
Jimmacle
Jimmacle•4w ago
you're not controlling threads here, you're controlling tasks they use whatever thread pool threads are available when there is CPU bound work to be done
Merlin
MerlinOP•4w ago
okay so that means i could just use the _ = Task.Run(() => HandleAsync(msg)); and when threads are available they do the work ? my worry here is that if we enqueue too many tasks wont there be an issue with context switching ?
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
so dont use in mem channel for communication inside of a microservice but use external message bus also ? meaning that microservice would consume its own events?
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
okay i mean that can actually simplify my code logic, i ditch the channel completely and subscribe it to its own events ; because for communication between microservices i still use service bus so its easier to use it for internal stuff also
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
i get what u mnea stuff like paymentprocessed shouldnt be stored in memory channel because if the app goes down i am screwed
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
might do that or just directly from handler commit to outbox and send to service bus
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
via bg service what do u mean
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Unknown User
Unknown User•3w ago
Message Not Public
Sign In & Join Server To View
Merlin
MerlinOP•3w ago
thanks for the help, i will be using external busses even for in proc communication then given i already use them

Did you find this page helpful?