consumeMessage: <ConsumeEventType>(key: string) =>
Effect.gen(function* () {
const channel = yield* createChannel();
yield* Effect.tryPromise({
try: () => channel.assertQueue(key),
catch: (error) => new RabbitMQError({ error, message: "Error while asserting Queue" }),
});
return Stream.async((emit: Emit<never, never, ConsumeEventType, void>) =>
Effect.tryPromise({
try: () =>
channel.consume(key, (message) => {
const event = JSON.parse(message?.content.toString() ?? "") as ConsumeEventType;
if (message && event) {
consolEffect.log("CONSUMED", event);
channel.ack(message);
const chunk = Chunk.of(event);
emit(Effect.succeed(chunk));
} else {
emit(Effect.fail(Option.none()));
}
}),
catch: (error) => new RabbitMQError({ error, message: "Error while consuming from Queue" }),
}).pipe(
Effect.catchAll(() => Effect.void),
Effect.andThen(() => Effect.void),
Effect.runSync,
),
);
}),
consumeMessage: <ConsumeEventType>(key: string) =>
Effect.gen(function* () {
const channel = yield* createChannel();
yield* Effect.tryPromise({
try: () => channel.assertQueue(key),
catch: (error) => new RabbitMQError({ error, message: "Error while asserting Queue" }),
});
return Stream.async((emit: Emit<never, never, ConsumeEventType, void>) =>
Effect.tryPromise({
try: () =>
channel.consume(key, (message) => {
const event = JSON.parse(message?.content.toString() ?? "") as ConsumeEventType;
if (message && event) {
consolEffect.log("CONSUMED", event);
channel.ack(message);
const chunk = Chunk.of(event);
emit(Effect.succeed(chunk));
} else {
emit(Effect.fail(Option.none()));
}
}),
catch: (error) => new RabbitMQError({ error, message: "Error while consuming from Queue" }),
}).pipe(
Effect.catchAll(() => Effect.void),
Effect.andThen(() => Effect.void),
Effect.runSync,
),
);
}),