© 2026 Hedgehog Software, LLC
HttpServerResponse.stream
test('test PubSub subscription stream', () => { return Effect.gen(function* () { const pubSub = yield* PubSub.unbounded<number>(); const ref = yield* Ref.make<number>(0); Effect.repeat( Effect.gen(function* () { const c = yield* Ref.get(ref); yield* pubSub.publish(c); const count = yield* Ref.getAndUpdate(ref, (n) => n + 1); if (count > 10) { yield* PubSub.shutdown(pubSub); } }), Schedule.spaced('200 millis') ).pipe(Effect.runFork); const stream = Stream.fromPubSub(pubSub).pipe( Stream.onEnd(Effect.log('stream end')), Stream.filter((m) => m > 2), // If add a filter, the stream will not be customed Stream.map((m) => `${m}\n`) ); // yield* stream.pipe(stdout); const readable = Stream.toReadableStreamRuntime(stream, Runtime.defaultRuntime); const response = new Response(readable); yield* Effect.promise(async () => { for await (const chunk of response.body!) { console.log('chunk: ' + chunk); } }); }).pipe(Effect.runPromise); });