Effect CommunityEC
Effect Community2y ago
8 replies
bowzee

Possible Infinite Effect in Effect-TS Stream `broadcastDynamic`

Why this effect never ends? There is some problem with the broadcastDynamic(), i am pretty sure it is because of this line

https://github.com/Effect-TS/effect/blob/48a543e0e4c0dd856916d352ffb24fec4acd044e/packages/effect/src/internal/stream.ts#L6455

If i use runIntoPubSub (non-scoped), the process terminates as expected by me. Is it expected behavior, or did i missed something?

import {Console, Effect, PubSub, Schedule, Stream, Take} from "effect";
import {NodeRuntime} from "@effect/platform-node";
import {broadcastDynamic} from "effect/Stream";

const intervalStream = Stream.fromSchedule(Schedule.spaced('500 millis'))
    .pipe(
        Stream.tap((x) => Console.log('Source', x)),
        broadcastDynamic(99)
    )

const module = Effect.gen(function* () {
    const stream =  yield* (intervalStream);

    let running = true
    const backgroundTask = yield* stream.pipe(
        Stream.takeWhile(() => running),
        Stream.buffer({capacity: "unbounded"}),
        Stream.schedule(Schedule.spaced("1 second")),
        Stream.tap(x => Console.log(`Side-effect executed`, x)),
        Stream.ensuring(Console.log(`Daemon killed`)),
        Stream.runDrain,
        Effect.forkDaemon
    );

    yield* Effect.addFinalizer(() => backgroundTask.await.pipe(Effect.tap(x => Console.log(x))))
    yield* Effect.addFinalizer(() => Effect.sync(() => {
        console.log('module shut down')
        running = false
    }))

    return stream.pipe(
        Stream.takeWhile(() => running),
    )
})

const program = module.pipe(
    Stream.unwrap,
    Stream.take(10),
    Stream.tap((x) => Console.log('main', x)),
    Stream.runCount
);
NodeRuntime.runMain(Effect.scoped((program)))
Was this page helpful?