Effect CommunityEC
Effect Community3y ago
44 replies
bsluther

Creating a Stream for Database Entity Updates

I'm trying to create a stream which needs to first run an effect to get a database entity, then subscribe to updates on that entity. Does this look like the right idea? Type annotations are abbreviated to R, E to de-clutter things

type objectForPrimaryKey = <A>(entityType: string, id: BSON.ObjectId) => Effect.Effect<R, E, A>
type wrapCallbackInStream = <A>(identifier: string, fn: (cb: (val: T) => void) => () => void) => Stream.Stream<never, never, A>

  // userActivityStream: (id: BSON.ObjectId) => Stream.Stream<R, E, UserActivity>
  const userActivityStream = (id: BSON.ObjectId) => pipe(
    Stream.fromEffect(dbWrapper.objectForPrimaryKey<UserActivityRO>('UserActivity', id)),
    Stream.flatMap(act =>
      wrapCallbackInStream<UserActivity>(
        `UserActivity/${id}`,
        cb => {
          act.addListener(cb)
          return () => act.removeListener(cb)
        }
      )
    )
  )


From what I can tell by inserting a log, the effect inside Stream.fromEffect is getting running whenever the stream updates. What I would like to do is only run that effect to set up the stream.
Was this page helpful?