Effect CommunityEC
Effect Community10mo ago
2 replies
for1988

Issue with `HttpServerResponse.stream` and Filter in SSE API

Hi, I am using HttpServerResponse.stream to return a stream in a sse API. But the stream isn't consumed with pipe a filter, and the request will be blocked.

test('test PubSub subscription stream', () => {
  return Effect.gen(function* () {
    const pubSub = yield* PubSub.unbounded<number>();
    const ref = yield* Ref.make<number>(0);
    Effect.repeat(
      Effect.gen(function* () {
        const c = yield* Ref.get(ref);
        yield* pubSub.publish(c);
        const count = yield* Ref.getAndUpdate(ref, (n) => n + 1);
        if (count > 10) {
          yield* PubSub.shutdown(pubSub);
        }
      }),
      Schedule.spaced('200 millis')
    ).pipe(Effect.runFork);
    const stream = Stream.fromPubSub(pubSub).pipe(
      Stream.onEnd(Effect.log('stream end')),
      Stream.filter((m) => m > 2), // If add a filter, the stream will not be customed
      Stream.map((m) => `${m}\n`)
    );
    // yield* stream.pipe(stdout);
    const readable = Stream.toReadableStreamRuntime(stream, Runtime.defaultRuntime);
    const response = new Response(readable);
    yield* Effect.promise(async () => {
      for await (const chunk of response.body!) {
        console.log('chunk: ' + chunk);
      }
    });
  }).pipe(Effect.runPromise);
});
Was this page helpful?