Effect CommunityEC
Effect Community14mo ago
5 replies
kristo

Stream Batching

I am running an indexer that consumes messages from a websocket, processes them, and then makes inserts into the DB. I have modeled this as a stream so far, with messages coming in and filtered, processed messages coming out. I now need to batch these messages for bulk insertion into the db.

I see Stream.buffer, but this seems to limit the output to the defined spacing without accumulating messages. How can I accumulte messages and only emit them once every x miliseconds?
Was this page helpful?