Effect CommunityEC
Effect Community3y ago
7 replies
bsluther

Streaming Data with Callback API and Stream Operations

In this
In the following I'm trying to stream data from one callback API, then use the data to (maybe) create a new stream. If either stream updates, I want to see the result, but that means that if the first stream updates, the second stream needs to recreated. I thought Stream.flatMap was what I was looking for, but the first stream stops receiving elements after the flatMap. merge doesn't seem quite right, because the second stream depends on the first. Any ideas?
const preferredProfileStream = (activityId: BSON.ObjectId) => pipe(
    dbWrapper.objectForPrimaryKey<UserActivity>('UserActivity', id), // Effect.Effect<R, E, UserActivity>
    Stream.flatMap(act =>
      Stream.asyncInterrupt<never, never, UserActivity>(emit => {
        const callback = (newAct: UserActivity) => emit.single(newAct)
        act.addListener(callback)
        return Either.left(Effect.sync(() => act.removeListener(callback)))
      })
    ),
    Stream.map(act => act.preferredProfile),
    Stream.flatMap(profileId => {
      return Stream.suspend((): Stream.Stream<
          RuntimeR, 
          extractStreamError<ReturnType<typeof entityStream.userActivity>> | EntityDoesNotExistError,
          ActivityProfileData
        > => 
        profileId
          ? entityStream.activityProfile(profileId)
          : Stream.fail(new EntityDoesNotExistError({
            message: `No preferredProfile for UserActivity ${activityId.toHexString()}`
          }))
      )
    })
  )
Was this page helpful?