Help with Stream Processing and Effect Consumption in Effect Typescript Library

Please can I ask for some help with repeating an effect into a stream.

I have an effect that produces chunks of items (numbers for simplicity) with 0-10 items each time its called. I (think I) want to consume this in a stream, flatten the chunks and the use groupedWithin to set an output chunk size and maximum wait time before emitting whatever it has. The stream is consumed with runForEach so that each batch is processed. The stream should stop when the effect produces an empty chunk, emitting the last partial chunk. When I use repeatEffect and log when the source effect is called vs when the output occurs, it seems to be too eager and gets ahead of the output (it reads more items than needed to satisfy the grouped count. How can I make the stream only run the effect as needed, outputting regrouped chunks or what it has if it waits too long.

AKA read small batches, terminating when empty, output in batches that are typically larger than the source size.

(Playground - https://effect.website/play#90406f35099d).

FWIW, this is intended to be for processing AWS SQS message batches (larger batches than the SQS batch limit), with each regrouped batch going to a lambda invocation which might support concurrent processing of batches. I have this as as async iterators at present, so trying to evaluate how I'd acheive the same in Effect.

Thanks.
Was this page helpful?