Implementing a Concurrency-Enabled Queue Job Processor in TypeScript

I'm trying to create a Queue Job Processor with concurrency for parallel consumption of jobs. I have already researched here and in docs but I don't know how to achieve what I want on the Consumer Side.

What I have done so far:
1. I have seen that Queue.isEmpty will use more cpu so I used unique symbol named EOQ for indicating the queue is empty.
2. I have created Queue as Layer named JobQueue
const EOQ = Symbol('EOQ')

export type ConvertJob = {
  source: 'directory',
  path: string
} | {
  source: 'file',
  path: string
} | typeof EOQ

const make = Effect.gen(function* () {
  return yield* Queue.unbounded<ConvertJob>()
})

export class JobQueue extends Context.Tag('JobQueue')<
  JobQueue,
  Effect.Effect.Success<typeof make>
>() {
  static readonly Live = Layer.effect(this, make)
  static readonly EOQ: typeof EOQ = EOQ
}

What I want to achieve is that I want to create a Effect function as consumer which will run forever until it got EOQ and interrupt using Effect.interrupt, till then it needs to run forever for that I think I need to use Effect.forever and I want it to be forked so
Effect.fork
. I want the consumer to be awaited until they finish/interrupted and I want them to finish their job when they interrupted externally such as CTRL+C or SIGINT.

 const queue = yield* JobQueue
 const consumer = (index: number) => Effect.gen(function* () {
    const job = yield* queue.take
    if (job === JobQueue.EOQ) {
      yield* Effect.logInfo(`[${index}] Job Queue Empty`)
      return yield* Effect.interrupt
    }
    if (job.source === 'file') {
      yield* Effect.logInfo(`[${index}] Converting File: "${job.path}"`)
    }
    if (job.source === 'directory') {
      yield* Effect.logInfo(`[${index}] Converting Directory: "${job.path}"`)
    }
  }).pipe(
    Effect.forever,
    Effect.forkDaemon
  )


How can I create X amount of consumer function that consumes the queue concurrently and the code awaits all of them to be finished with their job?
Was this page helpful?