Implementing Producer-Consumer Pattern with Effect and Fibers in TypeScript

Hello!

Using Effect, I'm trying to have an Effect in a Fiber take a queue to add element in it

In an other Fiber, I want a consumer to get elements in the queue, and process them

Once the Effect that produce is done, I want to wait for the queue to be empty, then interrupt without an error the consumer and shutdown the program

So far, I have done this, but it doesn't seem like an effective way to use Effect

const test = Effect.gen(function* (_) {
  const queue = yield* Queue.unbounded<number>();

  const producerFiber = yield* _(
    produce({ queue }),
    Effect.schedule(
      Schedule.fixed("2 seconds").pipe(Schedule.compose(Schedule.recurs(5)))
    ),
    Effect.fork
  );

  const consumerFiber = yield* _(
    consume({ queue }),
    Effect.forever,
    Effect.fork
  );

  yield* Fiber.join(producerFiber);
  yield* Effect.log("producer stopped");

  yield* Effect.repeat(
    Effect.gen(function* (_) {
      const isEmpty = yield* _(Queue.isEmpty(queue));
      if (isEmpty) {
        yield* Effect.log("queue is empty");
        yield* Fiber.interrupt(consumerFiber);
      }
      return isEmpty;
    }),
    { until: (r) => r }
  );
});

const produce = ({ queue }: { queue: Queue.Queue<number> }) =>
  Effect.gen(function* (_) {
    yield* Effect.log("producing");
    yield* queue.offerAll([1, 2, 3]);
  });

const consume = ({ queue }: { queue: Queue.Queue<number> }) =>
  Effect.gen(function* (_) {
    const value = yield* _(Queue.poll(queue));
    if (Option.isNone(value)) {
      return;
    }
    yield* Effect.log(`Consumed value '${value.value}'`);
  });


Any hint would be appreciated 🙏
Was this page helpful?