Implementing retries in a Stream
I'm struggling to wrap my head around how to implement retries in a stream built like this:
// connect or retry
const connectToJetstream = Effect.try({
try: () =>
new SWJetstream({
wantedCollections: ["app.bsky.feed.post"],
}),
catch: (e) =>
new JetstreamError({
message: "Failed to connect to Jetstream",
cause: e,
}),
}).pipe(
Effect.tapError(Effect.logError),
Effect.retry({
schedule: Schedule.exponential(Duration.seconds(1)),
}),
)
const JetstreamService: Effect.Effect<JetstreamImpl> = Effect.gen(function* () {
yield* Effect.log("Connecting to jetstream...")
const jetstream = yield* connectToJetstream
jetstream.on("error", (err, cursor) => {
// When the websocket emits an error, we want to retry the connection
// and redo this whole process probably
})
jetstream.on("close", () => {
// log the closing
})
yield* Effect.async((resume) => {
jetstream.on("open", () => {
resume(Effect.succeed(Effect.void))
})
})
// i actually want to log this after the "open" event is emitted
yield* Effect.log("Connected to jetstream.")
// create two separate streams for create and delete events
// so they can be processed separately into DB transactions
const createStream = Stream.asyncPush<
CommitCreateEvent<"app.bsky.feed.post">
>(
(emit) =>
Effect.gen(function* () {
jetstream.onCreate("app.bsky.feed.post", (ev) => {
emit.single(ev)
})
}),
{
bufferSize: "unbounded",
},
)
const deleteStream = Stream.asyncPush<
CommitDeleteEvent<"app.bsky.feed.post">
>((emit) =>
Effect.gen(function* () {
jetstream.onDelete("app.bsky.feed.post", (ev) => {
emit.single(ev)
})
}),
)
return {
createStream,
deleteStream,
}
})// connect or retry
const connectToJetstream = Effect.try({
try: () =>
new SWJetstream({
wantedCollections: ["app.bsky.feed.post"],
}),
catch: (e) =>
new JetstreamError({
message: "Failed to connect to Jetstream",
cause: e,
}),
}).pipe(
Effect.tapError(Effect.logError),
Effect.retry({
schedule: Schedule.exponential(Duration.seconds(1)),
}),
)
const JetstreamService: Effect.Effect<JetstreamImpl> = Effect.gen(function* () {
yield* Effect.log("Connecting to jetstream...")
const jetstream = yield* connectToJetstream
jetstream.on("error", (err, cursor) => {
// When the websocket emits an error, we want to retry the connection
// and redo this whole process probably
})
jetstream.on("close", () => {
// log the closing
})
yield* Effect.async((resume) => {
jetstream.on("open", () => {
resume(Effect.succeed(Effect.void))
})
})
// i actually want to log this after the "open" event is emitted
yield* Effect.log("Connected to jetstream.")
// create two separate streams for create and delete events
// so they can be processed separately into DB transactions
const createStream = Stream.asyncPush<
CommitCreateEvent<"app.bsky.feed.post">
>(
(emit) =>
Effect.gen(function* () {
jetstream.onCreate("app.bsky.feed.post", (ev) => {
emit.single(ev)
})
}),
{
bufferSize: "unbounded",
},
)
const deleteStream = Stream.asyncPush<
CommitDeleteEvent<"app.bsky.feed.post">
>((emit) =>
Effect.gen(function* () {
jetstream.onDelete("app.bsky.feed.post", (ev) => {
emit.single(ev)
})
}),
)
return {
createStream,
deleteStream,
}
})