Effect CommunityEC
Effect Community14mo ago
5 replies
kristo

Implementing retries in a Stream

I'm struggling to wrap my head around how to implement retries in a stream built like this:

// connect or retry
const connectToJetstream = Effect.try({
  try: () =>
    new SWJetstream({
      wantedCollections: ["app.bsky.feed.post"],
    }),
  catch: (e) =>
    new JetstreamError({
      message: "Failed to connect to Jetstream",
      cause: e,
    }),
}).pipe(
  Effect.tapError(Effect.logError),
  Effect.retry({
    schedule: Schedule.exponential(Duration.seconds(1)),
  }),
)

const JetstreamService: Effect.Effect<JetstreamImpl> = Effect.gen(function* () {
  yield* Effect.log("Connecting to jetstream...")

  const jetstream = yield* connectToJetstream

  jetstream.on("error", (err, cursor) => {
    // When the websocket emits an error, we want to retry the connection
    // and redo this whole process probably
  })

  jetstream.on("close", () => {
    // log the closing
  })

  yield* Effect.async((resume) => {
    jetstream.on("open", () => {
      resume(Effect.succeed(Effect.void))
    })
  })
  // i actually want to log this after the "open" event is emitted
  yield* Effect.log("Connected to jetstream.")

  // create two separate streams for create and delete events
  // so they can be processed separately into DB transactions
  const createStream = Stream.asyncPush<
    CommitCreateEvent<"app.bsky.feed.post">
  >(
    (emit) =>
      Effect.gen(function* () {
        jetstream.onCreate("app.bsky.feed.post", (ev) => {
          emit.single(ev)
        })
      }),
    {
      bufferSize: "unbounded",
    },
  )
  const deleteStream = Stream.asyncPush<
    CommitDeleteEvent<"app.bsky.feed.post">
  >((emit) =>
    Effect.gen(function* () {
      jetstream.onDelete("app.bsky.feed.post", (ev) => {
        emit.single(ev)
      })
    }),
  )

  return {
    createStream,
    deleteStream,
  }
})
Was this page helpful?