Issue with `HttpServerResponse.stream` and Filter in SSE API
Hi, I am using
HttpServerResponse.streamHttpServerResponse.stream to return a stream in a sse API. But the stream isn't consumed with pipe a filter, and the request will be blocked.test('test PubSub subscription stream', () => {
return Effect.gen(function* () {
const pubSub = yield* PubSub.unbounded<number>();
const ref = yield* Ref.make<number>(0);
Effect.repeat(
Effect.gen(function* () {
const c = yield* Ref.get(ref);
yield* pubSub.publish(c);
const count = yield* Ref.getAndUpdate(ref, (n) => n + 1);
if (count > 10) {
yield* PubSub.shutdown(pubSub);
}
}),
Schedule.spaced('200 millis')
).pipe(Effect.runFork);
const stream = Stream.fromPubSub(pubSub).pipe(
Stream.onEnd(Effect.log('stream end')),
Stream.filter((m) => m > 2), // If add a filter, the stream will not be customed
Stream.map((m) => `${m}\n`)
);
// yield* stream.pipe(stdout);
const readable = Stream.toReadableStreamRuntime(stream, Runtime.defaultRuntime);
const response = new Response(readable);
yield* Effect.promise(async () => {
for await (const chunk of response.body!) {
console.log('chunk: ' + chunk);
}
});
}).pipe(Effect.runPromise);
});test('test PubSub subscription stream', () => {
return Effect.gen(function* () {
const pubSub = yield* PubSub.unbounded<number>();
const ref = yield* Ref.make<number>(0);
Effect.repeat(
Effect.gen(function* () {
const c = yield* Ref.get(ref);
yield* pubSub.publish(c);
const count = yield* Ref.getAndUpdate(ref, (n) => n + 1);
if (count > 10) {
yield* PubSub.shutdown(pubSub);
}
}),
Schedule.spaced('200 millis')
).pipe(Effect.runFork);
const stream = Stream.fromPubSub(pubSub).pipe(
Stream.onEnd(Effect.log('stream end')),
Stream.filter((m) => m > 2), // If add a filter, the stream will not be customed
Stream.map((m) => `${m}\n`)
);
// yield* stream.pipe(stdout);
const readable = Stream.toReadableStreamRuntime(stream, Runtime.defaultRuntime);
const response = new Response(readable);
yield* Effect.promise(async () => {
for await (const chunk of response.body!) {
console.log('chunk: ' + chunk);
}
});
}).pipe(Effect.runPromise);
});