Creating a Queue-based runtime in Effect Typescript involves setting up services that manage the ...

I'm attempting to create a runtime with Queue in it that I can send data to and it will perform some operation on as it comes in. My services look like this at the moment.

class MyQueue extends Effect.Service<MyQueue>()("MyQueue", {
  effect: Effect.gen(function* () {
    yield* Effect.log("Starting the Queue Runtime");
    const q = yield* Queue.unbounded();
    return q;
  }),
}) {}

export class Send extends Effect.Service<Send>()("Send", {
  effect: Effect.gen(function* () {
    const q = yield* MyQueue;
    return {
      send: (a) =>
        Effect.gen(function* () {
          yield* Effect.log("SEND", a);
          yield* Queue.offer(q, a);
        }),
    };
  }),
  dependencies: [MyQueue.Default],
}) {}

export class Take extends Effect.Service<Take>()("Take", {
  effect: Effect.gen(function* () {
    const q = yield* MyQueue;
    return {
      take: () =>
        Effect.gen(function* () {
          const elem = yield* Queue.take(q);
          yield* Effect.log(elem);
        }),
    };
  }),
  dependencies: [MyQueue.Default],
}) {}


I have a very simple express app like this

const MainLayer = Layer.mergeAll(Send.Default, Take.Default);

const QueueRuntime = ManagedRuntime.make(MainLayer);

app.get("/", (req, res) => {
  QueueRuntime.runPromise(
    Effect.gen(function* () {
      const t = yield* Take;
      yield* t.take();
    }),
  );
  res.send("Taking from Queue");
});

app.post("/", (req, res) => {
  QueueRuntime.runPromise(
    Effect.gen(function* () {
      Effect.log("from app.post");
      const send = yield* Send;
      yield* send.send(req.body);
    }),
  );
  res.send("recieved POST");
});
Was this page helpful?