Combining infinite streams with concurrency in the Effect library involves transforming the strea...

Hello! I'm new to Effect and I'm a bit lost. It's just that I can't find a way to combine infinite streams with concurrency. I'm basing my work on two examples from the documentation. I don't have a specific goal—I'm just testing and experimenting with the library:

Concurrency N:

import { Effect, Duration } from "effect"

// Helper function to simulate a task with a delay
const makeTask = (n: number, delay: Duration.DurationInput) =>
  Effect.promise(
    () =>
      new Promise<void>((resolve) => {
        console.log(`start task${n}`) // Logs when the task starts
        setTimeout(() => {
          console.log(`task${n} done`) // Logs when the task finishes
          resolve()
        }, Duration.toMillis(delay))
      })
  )

const task1 = makeTask(1, "300 millis")
const task2 = makeTask(2, "400 millis")
const task3 = makeTask(3, "310 millis")
const task4 = makeTask(4, "410 millis")
const task5 = makeTask(5, "550 millis")

const numbered = Effect.all([task1, task2, task3, task4, task5], {
  concurrency: 3
})

Effect.runPromise(numbered)


Stream of infinite numbers:

import { Stream, Effect, Console, Duration } from "effect"

const infiniteNumberStream = Stream.iterate(1, (n) => n + 1)

const effect = infiniteNumberStream.pipe(
  Stream.runForEach((n) => Console.log(n))
)

Effect.runPromise(effect).then(console.log)


But I can't find the operator or function to process an infinite stream with a certain level of concurrency. In other libraries, it would be what mergeMap/orderedMergeMap does.

Can anyone shed some light on this?
Was this page helpful?