Support for `Effect` Producing a `Stream` in `effect-rx`

The current implementations of effect-rx's Rx.make and RxRuntime.proto.rx support
Effect
s and Streams as parameters (or functions returning such types). However, a common use case I can envision is passing in (a function returning) an
Effect
producing a Stream, i.e. Effect<Stream>.

For instance, if I have a
const layer: Layer<MyService>
with
Context.Tag.Service(typeof MyService) === { stream: Stream<number> },

and I create
const runtime = Rx.runtime(layer),

I want to be able to define
const myRxObservingStream: Rx<Result<number>> = runtime.rx(MyService.pipe(a => a.s))

Right now myRxObservingStream would have the type Rx<Result<Stream<number>>> which totally misses the point.

The stream that I want to expose is actually the changes Stream of a SubscriptionRef. So the workaround is for the MyService to expose the SubscriptionRef directly and use runtime.subscriptionRef(MyService.ref). However, I would like to manipulate the changes stream inside MyService (e.g. splitting it up into multiple streams, caching/buffering each of them individually), rather than inside the "effect-rx layer" for reasons like data hiding and CQRS.

I could have a go at making RxRuntime.proto.rx and Rx.make also support Effect<Stream>s, however, I wanted to ask whether this was already attempted by the authors of effect-rx and abandoned with a good reason that I do not yet see.
Was this page helpful?