Correctness of Unbounded PubSub Broadcasting in Streams

Is it a correct way to broadcast a stream with unbounded PubSub, or is there a better way of doing it?

const intervalStream = Stream.async((emit: Emit<never, never, number, void>) => {
    let i = 0;
    const intervalId = setInterval(() => {
        i++;
        emit(Effect.succeed(Chunk.of(i)));
    });
    return Effect.sync(() => clearInterval(intervalId))
}).pipe((stream) =>
    Effect.gen(function* () {
        const pubsub = yield* Effect.acquireRelease(PubSub.unbounded<Take.Take<number>>(), (pubsub) => PubSub.shutdown(pubsub));
        yield* Effect.fork(Stream.runIntoPubSub(stream, pubsub));
        return Stream.flattenTake(Stream.fromPubSub(pubsub));
    }));
Was this page helpful?