Interruptions and `Stream.toReadableStream`

Is there a way to interrupt effects running inside a stream that was passed to Stream.toReadableStream?

I figured maybe calling reader.cancel() would interrupt, but that doesn't seem to work.

My use case is that I have an existing app that relies on standard web streaming APIs, and I'd like to incrementally introduce Effect into the codebase. I receive some ReadableStream of values, and I need to produce a ReadableStream that applies a (cancellable) async operation on each element. There's also limits I'm trying to place on concurrency and buffer capacity but that's not directly relevant to my question.

Here is an attempt I made at creating this interface with Effect's streams. Although the stream correctly stops creating new jobs, "Finished job for input 1" prints even after the reader has been cancelled indicating that the AbortSignal provided by Effect.promise was not aborted.

Playground: https://effect.website/play#KABQchk5GyAdGAQhQVwOw
Try Effect in your browser and share your code!
Effect Playground
Was this page helpful?