const preferredProfileStream = (activityId: BSON.ObjectId) => pipe(
dbWrapper.objectForPrimaryKey<UserActivity>('UserActivity', id), // Effect.Effect<R, E, UserActivity>
Stream.flatMap(act =>
Stream.asyncInterrupt<never, never, UserActivity>(emit => {
const callback = (newAct: UserActivity) => emit.single(newAct)
act.addListener(callback)
return Either.left(Effect.sync(() => act.removeListener(callback)))
})
),
Stream.map(act => act.preferredProfile),
Stream.flatMap(profileId => {
return Stream.suspend((): Stream.Stream<
RuntimeR,
extractStreamError<ReturnType<typeof entityStream.userActivity>> | EntityDoesNotExistError,
ActivityProfileData
> =>
profileId
? entityStream.activityProfile(profileId)
: Stream.fail(new EntityDoesNotExistError({
message: `No preferredProfile for UserActivity ${activityId.toHexString()}`
}))
)
})
)
const preferredProfileStream = (activityId: BSON.ObjectId) => pipe(
dbWrapper.objectForPrimaryKey<UserActivity>('UserActivity', id), // Effect.Effect<R, E, UserActivity>
Stream.flatMap(act =>
Stream.asyncInterrupt<never, never, UserActivity>(emit => {
const callback = (newAct: UserActivity) => emit.single(newAct)
act.addListener(callback)
return Either.left(Effect.sync(() => act.removeListener(callback)))
})
),
Stream.map(act => act.preferredProfile),
Stream.flatMap(profileId => {
return Stream.suspend((): Stream.Stream<
RuntimeR,
extractStreamError<ReturnType<typeof entityStream.userActivity>> | EntityDoesNotExistError,
ActivityProfileData
> =>
profileId
? entityStream.activityProfile(profileId)
: Stream.fail(new EntityDoesNotExistError({
message: `No preferredProfile for UserActivity ${activityId.toHexString()}`
}))
)
})
)