Creating Feedback Loops in Effect Streams

What is the best way to create feedback loops in effect streams?
if I have a stream Sa = [1,2,3,4,...]
how can I express something like the attached circuit?
things are well defined because at index 0 the value of delaying any stream is 0.

I tried using queues, but I'm not getting anywhere, would appreciate some pointers.
my example is for addition, but eventually I will need something that works generically over arbitrary operations.

    Effect.gen(function*() {
      const delayNumber = (stream: Stream.Stream<number>) =>
        Stream.zipWithPrevious(stream).pipe(
          Stream.map(([prev, _]) =>
            Option.match(prev, {
              onNone: () => 0,
              onSome: (prev) => prev
            })
          )
        )

      const queue = yield* Queue.bounded<number>(2)
      queue.offer(0)

      const sA = Stream.fromIterable([1, 2, 3])
      const sC = delayNumber(Stream.fromQueue(queue))
      const result = Stream.zipWith(sA, sC, (a, b) => a + b)

      yield* Stream.runForEach(result, (elem) => queue.offer(elem))

      return result
    })
Screenshot_2025-05-19_at_14.57.16.png
Was this page helpful?