Combining Stream Mapping and Aggregation with Conditional Emission in TypeScript

Is there an API that allows me to do this?

Currently, I have a stream of a type ParsedEvent and I'm mapping over them to return a stream of my own custom tagged enum StreamEvent:

getParsedEventStream.pipe(
  Effect.map(toStreamEvent)
)


...basically. However, I also need to aggregate some contents of each ParsedEvent and occasionally emit an additional type of StreamEvent when I've aggregated enough data.

In other words, I sort of need plain old Stream.map and Stream.runFold combined—but even the runFold needs to sort of reset state occasionally mid-stream because I may collect and emit multiple aggregate events.
Was this page helpful?