Issue with processing multiple event emitters in a queue using Effect and Stream

Could anybody explain why this only works for the first event emitter in the queue? In my real code the eventemitter is a websocket.

import { Effect, Stream, StreamEmit, Chunk, Queue, pipe } from "effect";
import { NodeRuntime } from "@effect/platform-node";
import { EventEmitter } from "events";

const program = Effect.gen(function* () {
  const q = yield* Queue.unbounded<EventEmitter>();

  yield* pipe(
    Stream.fromQueue(q),
    Stream.flatMap((ee) => {
      return Stream.async(
        (emit: StreamEmit.Emit<never, never, string, void>) => {
          ee.on("event", (data: string) => {
            emit(Effect.succeed(Chunk.of(data)));
          });
        },
      );
    }),
    Stream.tap(Effect.log),
    Stream.runDrain,
    Effect.fork,
  );

  let c = 0;

  const ee1 = new EventEmitter();
  yield* q.offer(ee1);

  yield* pipe(
    Effect.sync(() => ee1.emit("event", "data-" + c++)),
    Effect.delay(10),
    Effect.repeatN(4),
  );

  yield* Effect.sleep(250);

  const ee2 = new EventEmitter();
  yield* q.offer(ee2);

  yield* pipe(
    Effect.sync(() => ee2.emit("event", "data-" + c++)),
    Effect.delay(10),
    Effect.repeatN(4),
  );
});

pipe(program, NodeRuntime.runMain);


https://effect.website/play/#7adf6257f945
Was this page helpful?