Interrupting Effects in a Stream with New Queue Items

I have a unbounded Queue, that is used to create a stream to run effects when new items get added. I want to interrupt the current effect if new items get added before the effect finishes. Any ideas on how to accomplish this?

Here's what I have so far:

const messageQueue = yield* Queue.unbounded<string>()
    yield* Stream.fromQueue(messageQueue).pipe(
      Stream.runForEach(() => Effect.gen(function*() {
        yield* Effect.sleep("10 seconds")
        yield* Effect.log("Done sleeping", message)
      })),
      Effect.forkScoped,
)

yield* messageQueue.offer("hello")
yield* messageQueue.offer("world") // this would cancel the first one, so only "Done sleeping", world would get printed
Was this page helpful?