Issue with processing multiple event emitters in a queue using Effect and Stream
Could anybody explain why this only works for the first event emitter in the queue? In my real code the eventemitter is a websocket.
https://effect.website/play/#7adf6257f945
import { Effect, Stream, StreamEmit, Chunk, Queue, pipe } from "effect";
import { NodeRuntime } from "@effect/platform-node";
import { EventEmitter } from "events";
const program = Effect.gen(function* () {
const q = yield* Queue.unbounded<EventEmitter>();
yield* pipe(
Stream.fromQueue(q),
Stream.flatMap((ee) => {
return Stream.async(
(emit: StreamEmit.Emit<never, never, string, void>) => {
ee.on("event", (data: string) => {
emit(Effect.succeed(Chunk.of(data)));
});
},
);
}),
Stream.tap(Effect.log),
Stream.runDrain,
Effect.fork,
);
let c = 0;
const ee1 = new EventEmitter();
yield* q.offer(ee1);
yield* pipe(
Effect.sync(() => ee1.emit("event", "data-" + c++)),
Effect.delay(10),
Effect.repeatN(4),
);
yield* Effect.sleep(250);
const ee2 = new EventEmitter();
yield* q.offer(ee2);
yield* pipe(
Effect.sync(() => ee2.emit("event", "data-" + c++)),
Effect.delay(10),
Effect.repeatN(4),
);
});
pipe(program, NodeRuntime.runMain);import { Effect, Stream, StreamEmit, Chunk, Queue, pipe } from "effect";
import { NodeRuntime } from "@effect/platform-node";
import { EventEmitter } from "events";
const program = Effect.gen(function* () {
const q = yield* Queue.unbounded<EventEmitter>();
yield* pipe(
Stream.fromQueue(q),
Stream.flatMap((ee) => {
return Stream.async(
(emit: StreamEmit.Emit<never, never, string, void>) => {
ee.on("event", (data: string) => {
emit(Effect.succeed(Chunk.of(data)));
});
},
);
}),
Stream.tap(Effect.log),
Stream.runDrain,
Effect.fork,
);
let c = 0;
const ee1 = new EventEmitter();
yield* q.offer(ee1);
yield* pipe(
Effect.sync(() => ee1.emit("event", "data-" + c++)),
Effect.delay(10),
Effect.repeatN(4),
);
yield* Effect.sleep(250);
const ee2 = new EventEmitter();
yield* q.offer(ee2);
yield* pipe(
Effect.sync(() => ee2.emit("event", "data-" + c++)),
Effect.delay(10),
Effect.repeatN(4),
);
});
pipe(program, NodeRuntime.runMain);https://effect.website/play/#7adf6257f945
