Issue with `Effect.onExit` Triggering Early in Queue Processing with Semaphore

I got an issue with Queues and Effect.onExit triggering too early:
const concurrencyLimit = yield* Effect.makeSemaphore(1);
const queue = yield* Queue.unbounded<QueuedTransaction>();

yield* Effect.forkScoped(Stream.fromQueue(queue).pipe(
  Stream.takeUntilEffect(() => Queue.isEmpty(queue)),
  Stream.runForEach((innerTr) => concurrencyLimit.withPermits(1)(handleTransaction(queueId, innerTr))),
  Effect.onExit(() => Effect.gen(function* () {
    console.log('Project queue done');
    yield* updateSyncStatus();
  })),
));

For some reason, the above code is triggering the Effect.onExit too early, when I have a thing I am waiting on (in this case, a Latch) inside the handleTransaction effect.

When I put the exit effect on the forkScoped effect instead, the callback instead never gets called at all?

The idea is that I want to put things into a queue that has a shared concurrency with other queues in the system.
Was this page helpful?