import * as NodeContext from "@effect/platform-node/NodeContext";
import * as NodeRuntime from "@effect/platform-node/NodeRuntime";
import * as Console from "effect/Console";
import * as Effect from "effect/Effect";
import * as Function from "effect/Function";
import * as Schedule from "effect/Schedule";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
const processStream = <E1, R1>(stream: Stream.Stream<number, E1, R1>) =>
Function.pipe(stream, Stream.aggregate(Sink.take(2)), Stream.runCollect);
const program = Effect.gen(function* () {
const stream1 = Stream.make(1, 2).pipe(Stream.schedule(Schedule.spaced("1 millis")));
const result1 = yield* processStream(stream1);
yield* Effect.log("results1:");
for (const result of result1) {
yield* Console.log(result.length);
}
});
program.pipe(Effect.provide(NodeContext.layer)).pipe(NodeRuntime.runMain);
import * as NodeContext from "@effect/platform-node/NodeContext";
import * as NodeRuntime from "@effect/platform-node/NodeRuntime";
import * as Console from "effect/Console";
import * as Effect from "effect/Effect";
import * as Function from "effect/Function";
import * as Schedule from "effect/Schedule";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
const processStream = <E1, R1>(stream: Stream.Stream<number, E1, R1>) =>
Function.pipe(stream, Stream.aggregate(Sink.take(2)), Stream.runCollect);
const program = Effect.gen(function* () {
const stream1 = Stream.make(1, 2).pipe(Stream.schedule(Schedule.spaced("1 millis")));
const result1 = yield* processStream(stream1);
yield* Effect.log("results1:");
for (const result of result1) {
yield* Console.log(result.length);
}
});
program.pipe(Effect.provide(NodeContext.layer)).pipe(NodeRuntime.runMain);