Effect CommunityEC
Effect Community2y ago
12 replies
jrmdayn

Managing Effect Completion in PubSub Subscription Termination

In the example below I fork a subscription to a pubsub and run an effect on each message. I simulate the termination of this subscription using scope closure. As it stands, the effect itself does not complete since it is interrupted in the middle. How can I wait until current effects are done before shutting things down? is my only option to use Effect.uninterruptible around the
mapEffect
?

Effect.gen(function* (_) {
  const scope = yield* _(Scope.make());
  const pubsub = yield* _(PubSub.unbounded());
  const queue = yield* _(PubSub.subscribe(pubsub), Scope.extend(scope));

  yield* _(
    Stream.fromQueue(queue),
    Stream.mapEffect(() =>
      Effect.gen(function* (_) {
        yield* _(Effect.addFinalizer(() => Console.log('Finalizer')));
        yield* _(Console.log('start work...'));
        yield* _(Effect.sleep('1 seconds'));
        yield* _(Console.log('done work !'));
      }),
    ),
    Stream.runDrain,
    Effect.forkScoped,
    Scope.extend(scope),
  );

  yield* _(PubSub.publish(pubsub, 1));
  yield* _(Effect.sleep('100 millis'));

  yield* _(Scope.close(scope, Exit.unit));

  yield* _(Effect.sleep('2 seconds'));

  yield* _(Console.log('exit'));
}).pipe(Effect.scoped, Effect.runPromise);
Was this page helpful?