import {Console, Effect, PubSub, Schedule, Stream, Take} from "effect";
import {NodeRuntime} from "@effect/platform-node";
import {broadcastDynamic} from "effect/Stream";
const intervalStream = Stream.fromSchedule(Schedule.spaced('500 millis'))
.pipe(
Stream.tap((x) => Console.log('Source', x)),
broadcastDynamic(99)
)
const module = Effect.gen(function* () {
const stream = yield* (intervalStream);
let running = true
const backgroundTask = yield* stream.pipe(
Stream.takeWhile(() => running),
Stream.buffer({capacity: "unbounded"}),
Stream.schedule(Schedule.spaced("1 second")),
Stream.tap(x => Console.log(`Side-effect executed`, x)),
Stream.ensuring(Console.log(`Daemon killed`)),
Stream.runDrain,
Effect.forkDaemon
);
yield* Effect.addFinalizer(() => backgroundTask.await.pipe(Effect.tap(x => Console.log(x))))
yield* Effect.addFinalizer(() => Effect.sync(() => {
console.log('module shut down')
running = false
}))
return stream.pipe(
Stream.takeWhile(() => running),
)
})
const program = module.pipe(
Stream.unwrap,
Stream.take(10),
Stream.tap((x) => Console.log('main', x)),
Stream.runCount
);
NodeRuntime.runMain(Effect.scoped((program)))
import {Console, Effect, PubSub, Schedule, Stream, Take} from "effect";
import {NodeRuntime} from "@effect/platform-node";
import {broadcastDynamic} from "effect/Stream";
const intervalStream = Stream.fromSchedule(Schedule.spaced('500 millis'))
.pipe(
Stream.tap((x) => Console.log('Source', x)),
broadcastDynamic(99)
)
const module = Effect.gen(function* () {
const stream = yield* (intervalStream);
let running = true
const backgroundTask = yield* stream.pipe(
Stream.takeWhile(() => running),
Stream.buffer({capacity: "unbounded"}),
Stream.schedule(Schedule.spaced("1 second")),
Stream.tap(x => Console.log(`Side-effect executed`, x)),
Stream.ensuring(Console.log(`Daemon killed`)),
Stream.runDrain,
Effect.forkDaemon
);
yield* Effect.addFinalizer(() => backgroundTask.await.pipe(Effect.tap(x => Console.log(x))))
yield* Effect.addFinalizer(() => Effect.sync(() => {
console.log('module shut down')
running = false
}))
return stream.pipe(
Stream.takeWhile(() => running),
)
})
const program = module.pipe(
Stream.unwrap,
Stream.take(10),
Stream.tap((x) => Console.log('main', x)),
Stream.runCount
);
NodeRuntime.runMain(Effect.scoped((program)))