Trouble Implementing Schedule in a Stream Processing Setup

Hi all, I am struggeling with Schedule what I try to achieve is the following:
const use = (n: Chunk.Chunk<number>) =>
  Effect.forEach(n, (e) => Effect.succeed(`${e} is processed`), { concurrency: 2 })

const test = Effect.gen(function* (_) {
  const stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).pipe(Stream.rechunk())
  const getChumk = yield* _(Stream.toPull(stream))
  const result = Chunk.empty<number>()
  while (true) {
    const chunk = yield* _(getChumk)
    yield* _(
      Effect.acquireUseRelease(
        Effect.succeed(Chunk.empty<number>()),
        () => use(chunk),
        () => Effect.succeed('closing'),
      ),
      //   Effect.scheduleFrom((a) => a, Schedule.spaced(Duration.seconds(1)),
      //   Schedule.delayedSchedule(Schedule.spaced(Duration.seconds(1)))
      Effect.map((e) => Chunk.append(result, e)),
    )
  }
})

Now I need to add a Schedule that e.g. only every minute 4 chunks are send so the above would need > 2 Minutes to complete
Was this page helpful?