Creating a Queue-based runtime in Effect Typescript involves setting up services that manage the ...
I'm attempting to create a runtime with
I have a very simple express app like this
QueueQueue in it that I can send data to and it will perform some operation on as it comes in. My services look like this at the moment.class MyQueue extends Effect.Service<MyQueue>()("MyQueue", {
effect: Effect.gen(function* () {
yield* Effect.log("Starting the Queue Runtime");
const q = yield* Queue.unbounded();
return q;
}),
}) {}
export class Send extends Effect.Service<Send>()("Send", {
effect: Effect.gen(function* () {
const q = yield* MyQueue;
return {
send: (a) =>
Effect.gen(function* () {
yield* Effect.log("SEND", a);
yield* Queue.offer(q, a);
}),
};
}),
dependencies: [MyQueue.Default],
}) {}
export class Take extends Effect.Service<Take>()("Take", {
effect: Effect.gen(function* () {
const q = yield* MyQueue;
return {
take: () =>
Effect.gen(function* () {
const elem = yield* Queue.take(q);
yield* Effect.log(elem);
}),
};
}),
dependencies: [MyQueue.Default],
}) {}class MyQueue extends Effect.Service<MyQueue>()("MyQueue", {
effect: Effect.gen(function* () {
yield* Effect.log("Starting the Queue Runtime");
const q = yield* Queue.unbounded();
return q;
}),
}) {}
export class Send extends Effect.Service<Send>()("Send", {
effect: Effect.gen(function* () {
const q = yield* MyQueue;
return {
send: (a) =>
Effect.gen(function* () {
yield* Effect.log("SEND", a);
yield* Queue.offer(q, a);
}),
};
}),
dependencies: [MyQueue.Default],
}) {}
export class Take extends Effect.Service<Take>()("Take", {
effect: Effect.gen(function* () {
const q = yield* MyQueue;
return {
take: () =>
Effect.gen(function* () {
const elem = yield* Queue.take(q);
yield* Effect.log(elem);
}),
};
}),
dependencies: [MyQueue.Default],
}) {}I have a very simple express app like this
const MainLayer = Layer.mergeAll(Send.Default, Take.Default);
const QueueRuntime = ManagedRuntime.make(MainLayer);
app.get("/", (req, res) => {
QueueRuntime.runPromise(
Effect.gen(function* () {
const t = yield* Take;
yield* t.take();
}),
);
res.send("Taking from Queue");
});
app.post("/", (req, res) => {
QueueRuntime.runPromise(
Effect.gen(function* () {
Effect.log("from app.post");
const send = yield* Send;
yield* send.send(req.body);
}),
);
res.send("recieved POST");
});const MainLayer = Layer.mergeAll(Send.Default, Take.Default);
const QueueRuntime = ManagedRuntime.make(MainLayer);
app.get("/", (req, res) => {
QueueRuntime.runPromise(
Effect.gen(function* () {
const t = yield* Take;
yield* t.take();
}),
);
res.send("Taking from Queue");
});
app.post("/", (req, res) => {
QueueRuntime.runPromise(
Effect.gen(function* () {
Effect.log("from app.post");
const send = yield* Send;
yield* send.send(req.body);
}),
);
res.send("recieved POST");
});