Correctness of Unbounded PubSub Broadcasting in Streams
Is it a correct way to broadcast a stream with unbounded PubSub, or is there a better way of doing it?
const intervalStream = Stream.async((emit: Emit<never, never, number, void>) => {
let i = 0;
const intervalId = setInterval(() => {
i++;
emit(Effect.succeed(Chunk.of(i)));
});
return Effect.sync(() => clearInterval(intervalId))
}).pipe((stream) =>
Effect.gen(function* () {
const pubsub = yield* Effect.acquireRelease(PubSub.unbounded<Take.Take<number>>(), (pubsub) => PubSub.shutdown(pubsub));
yield* Effect.fork(Stream.runIntoPubSub(stream, pubsub));
return Stream.flattenTake(Stream.fromPubSub(pubsub));
}));const intervalStream = Stream.async((emit: Emit<never, never, number, void>) => {
let i = 0;
const intervalId = setInterval(() => {
i++;
emit(Effect.succeed(Chunk.of(i)));
});
return Effect.sync(() => clearInterval(intervalId))
}).pipe((stream) =>
Effect.gen(function* () {
const pubsub = yield* Effect.acquireRelease(PubSub.unbounded<Take.Take<number>>(), (pubsub) => PubSub.shutdown(pubsub));
yield* Effect.fork(Stream.runIntoPubSub(stream, pubsub));
return Stream.flattenTake(Stream.fromPubSub(pubsub));
}));