**Clarifying the Behavior of `Stream.aggregate` in Effect Streams**

I'm confused on the "Whenever the downstream fiber is busy processing elements, the upstream fiber will feed elements into the sink until it signals completion" clause of Stream.aggregate. I thought Stream.aggregate behaved like Stream.peel, the difference being that it would repeatedly peel, but it feels like that is not entirely the case.

Here is the simplest example I could make for what confuses me:

import * as NodeContext from "@effect/platform-node/NodeContext";
import * as NodeRuntime from "@effect/platform-node/NodeRuntime";
import * as Console from "effect/Console";
import * as Effect from "effect/Effect";
import * as Function from "effect/Function";
import * as Schedule from "effect/Schedule";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";

const processStream = <E1, R1>(stream: Stream.Stream<number, E1, R1>) =>
    Function.pipe(stream, Stream.aggregate(Sink.take(2)), Stream.runCollect);

const program = Effect.gen(function* () {
    const stream1 = Stream.make(1, 2).pipe(Stream.schedule(Schedule.spaced("1 millis")));
    const result1 = yield* processStream(stream1);

    yield* Effect.log("results1:");
    for (const result of result1) {
        yield* Console.log(result.length);
    }
});

program.pipe(Effect.provide(NodeContext.layer)).pipe(NodeRuntime.runMain);


In my head, even though the stream is emitting values every 1 millisecond (to me this is like a network stream or file stream where there is some io delay) I expected both the number values to be grouped into one chunk by the aggregate function. But that is not the case. Instead, it seems to just ignore it to me. Removing the schedule from the stream produces the results that I expected, however, I don't know if I can really do that as its a filesystem stream. I guess my question is how can I achieve the results I expected (where the numbers are grouped into one chunk by the aggregate function) without having the change to input stream?
Was this page helpful?