export const app = () => Effect.gen(function* () {
/// *** some code ***/
yield* aggregator.messagesStream.pipe(
Stream.groupByKey((message) => message.userId, {
bufferSize: 256
}),
GroupBy.evaluate((userId, userStream) => userStream.pipe(
Stream.mapEffect((message) =>
MessageProcessor.pipe(
Effect.provide(SalonToolkitHandlers),
Effect.provideService(MessageState, {userId}),
Effect.flatMap((processor) => processor.processMessage(message))
), {concurrency: 1}
)
)),
Stream.runDrain
)
// some more code
})
)
export const app = () => Effect.gen(function* () {
/// *** some code ***/
yield* aggregator.messagesStream.pipe(
Stream.groupByKey((message) => message.userId, {
bufferSize: 256
}),
GroupBy.evaluate((userId, userStream) => userStream.pipe(
Stream.mapEffect((message) =>
MessageProcessor.pipe(
Effect.provide(SalonToolkitHandlers),
Effect.provideService(MessageState, {userId}),
Effect.flatMap((processor) => processor.processMessage(message))
), {concurrency: 1}
)
)),
Stream.runDrain
)
// some more code
})
)