...
const make = Effect.gen(function* (_) {
const workerService = yield* BullMQWorkerService;
const userService = yield* UserService;
const processNotifyUserJob = (job: Job<AddNotifyUserJobData, void, UserJobName>) => {
return Effect.gen(function* () {
const users = yield* userService.getUsers();
yield* Effect.log(`This works! Found ${users.length} users`);
}).pipe(Effect.withSpan("UserJobConsumer.processNotifyUserJob"));
};
...
const jobProcessors: Record<UserJobName, ...> = {
[UserJobName.NOTIFY_USER]: processNotifyUserJob,
...
};
const processJob = (job: Job<UserJobData, UserJobResult, UserJobName>) => {
return Effect.gen(function* () {
const jobProcessor = jobProcessors[job.name];
if (!jobProcessor) {
yield* Effect.fail( ... );
}
yield* jobProcessor(job);
}).pipe(
Effect.catchAll((error) =>
...
)
);
};
yield* workerService.createWorker<UserJobData, UserJobResult, UserJobName>(
QueueName.USER,
(job) => Effect.runPromise(processJob(job))
);
});
export class UserJobConsumer extends ... {
static readonly Live = Layer.scoped(this, make).pipe(
Layer.provide(BullMQWorkerService.Live),
Layer.provide(UserService.Live)
);
}
...
const make = Effect.gen(function* (_) {
const workerService = yield* BullMQWorkerService;
const userService = yield* UserService;
const processNotifyUserJob = (job: Job<AddNotifyUserJobData, void, UserJobName>) => {
return Effect.gen(function* () {
const users = yield* userService.getUsers();
yield* Effect.log(`This works! Found ${users.length} users`);
}).pipe(Effect.withSpan("UserJobConsumer.processNotifyUserJob"));
};
...
const jobProcessors: Record<UserJobName, ...> = {
[UserJobName.NOTIFY_USER]: processNotifyUserJob,
...
};
const processJob = (job: Job<UserJobData, UserJobResult, UserJobName>) => {
return Effect.gen(function* () {
const jobProcessor = jobProcessors[job.name];
if (!jobProcessor) {
yield* Effect.fail( ... );
}
yield* jobProcessor(job);
}).pipe(
Effect.catchAll((error) =>
...
)
);
};
yield* workerService.createWorker<UserJobData, UserJobResult, UserJobName>(
QueueName.USER,
(job) => Effect.runPromise(processJob(job))
);
});
export class UserJobConsumer extends ... {
static readonly Live = Layer.scoped(this, make).pipe(
Layer.provide(BullMQWorkerService.Live),
Layer.provide(UserService.Live)
);
}