<A, E1, R1, E2, R2>(
self: Stream.Stream<A, E1, R1>,
gate: Stream.Stream<number, E2, R2>
): Stream.Stream<A, E1 | E2, R1 | R2> =>
Effect.gen(function* () {
const count = yield* Ref.make(0);
const queue = yield* Stream.toQueueOfElements(self, { capacity: 2 });
return Stream.flatMap(
gate,
(until) =>
Effect.gen(function* () {
const n = yield* count;
if (n >= until) return yield* Effect.fail(Option.none());
const chunk = yield* Queue.takeBetween(queue, 1, until - n);
yield* Ref.set(count, n + chunk.length);
return chunk;
}).pipe(Stream.repeatEffectChunkOption, Stream.flattenExitOption),
{ switch: true }
).pipe(Stream.haltWhen(Queue.awaitShutdown(queue)));
}).pipe(Stream.unwrapScoped)
<A, E1, R1, E2, R2>(
self: Stream.Stream<A, E1, R1>,
gate: Stream.Stream<number, E2, R2>
): Stream.Stream<A, E1 | E2, R1 | R2> =>
Effect.gen(function* () {
const count = yield* Ref.make(0);
const queue = yield* Stream.toQueueOfElements(self, { capacity: 2 });
return Stream.flatMap(
gate,
(until) =>
Effect.gen(function* () {
const n = yield* count;
if (n >= until) return yield* Effect.fail(Option.none());
const chunk = yield* Queue.takeBetween(queue, 1, until - n);
yield* Ref.set(count, n + chunk.length);
return chunk;
}).pipe(Stream.repeatEffectChunkOption, Stream.flattenExitOption),
{ switch: true }
).pipe(Stream.haltWhen(Queue.awaitShutdown(queue)));
}).pipe(Stream.unwrapScoped)