Effect CommunityEC
Effect Community9mo ago
4 replies
정제훈

How can I improve this Stream Broadcasting and Fiber Management code?

Hi Effect community! 👋
I'm working on understanding Effect streams and concurrency patterns, and I've put together a small example. I'd appreciate any feedback on how this code could be improved or written more idiomatically using Effect.

Playground Link: https://effect.website/play#c0a24bb689fa

I've set up a stream that generates random numbers until a specific condition is met.
I then use Stream.broadcast to split this stream into two concurrent consumers: one (mainStream) immediately collects all the numbers into an array, while the other (reportStream) simulates a slower, throttled process like sending reports to a server.
I achieve concurrency using Effect.fork, wait for the collection to finish with Fiber.join, and then explicitly Fiber.interrupt the reporting task.

It works as expected, but the code is kinda messy and trying to improve in any possible ways.

Instead of using Stream.broadcast in a complex way to create a downstream, I thought it might work if I calculate the cost in Stream.throttle such that chunks meeting a certain condition always have a cost of 0. However, this didn't work as I expected.

What I want is for generateRandomNumber() to execute strictly every 100ms (even if a previous execution hasn't finished yet), and for sendToMainServer() to execute only once per second, using the most recent value from the stream at that time.

Thanks in advance for any insights or suggestions!
Effect Documentation
Was this page helpful?