Effect CommunityEC
Effect Communityβ€’3mo agoβ€’
65 replies
Stephen Bluck

Issue with streams

I seem to be doing something wrong when trying to convert the below code to a Stream. The code below correctly outputs the log "received message" when a message is published.

yield* Effect.async<Rabbit.Consumer>(resume => {
  const consumer = rabbit.createConsumer(
    settings,
    msg => console.log('received message', msg)
  );

  consumer.on('ready', () => {
    console.log('ready');
    resume(Effect.succeed(consumer));
  });
});

yield* Effect.promise(() =>
  pub.send(envelope, { id: 1, name: 'Alan Turing' })
);


However when I try to convert the above code to a stream, I no longer get messages coming through because the "received message" log never appears.
const sub = Stream.async<Rabbit.AsyncMessage>(emit => {
  return Effect.async<Rabbit.Consumer>(resume => {
    const consumer = rabbit.createConsumer(
      settings,
      msg => {
        console.log('received message', msg);
        return emit.single(msg);
      }
    );

    consumer.on('ready', () => {
      console.log('ready');
      resume(Effect.succeed(consumer));
    });
  })
});

yield* sub.pipe(Stream.runDrain, Effect.fork);

yield* Effect.promise(() =>
  pub.send(envelope, { id: 1, name: 'Alan Turing' })
);

Any ideas?
Was this page helpful?