const acquire = Effect.gen(function* () {
const { subscription } = yield* PubsubSubscription.PubsubSubscription
const queue = yield* Queue.unbounded<Message>()
function messageListener(message: GcpsMessage) {
Effect.runFork(
Queue.offer(queue, {
ack: Effect.sync(() => message.ack()),
nack: Effect.sync(() => message.nack()),
read: decodeIngestionAttempted(message.data),
})
)
}
function errorListener(error: Error) {
// Fail
}
yield* Effect.sync(() => {
subscription.on('message', messageListener)
subscription.on('error', errorListener)
})
return { queue, subscription, messageListener, errorListener }
})
function release(resource: Effect.Effect.Success<typeof acquire>) {
return Effect.gen(function* () {
yield* Effect.sync(() => {
resource.subscription.removeListener('message', resource.messageListener)
resource.subscription.removeListener('error', resource.errorListener)
})
yield* Queue.shutdown(resource.queue)
})
}
const make = Effect.acquireRelease(acquire, release).pipe(
Effect.map(({ queue }) => MessageQueue.of({ queue }))
)
const subscription = PubsubSubscription.layer(
Config.string('PUBSUB_SUBSCRIPTION_NAME')
)
export const layer: Layer.Layer<
MessageQueue,
ConfigError.ConfigError,
PubsubClient.PubsubClient
> = Layer.scoped(MessageQueue, make).pipe(Layer.provide(subscription))
const acquire = Effect.gen(function* () {
const { subscription } = yield* PubsubSubscription.PubsubSubscription
const queue = yield* Queue.unbounded<Message>()
function messageListener(message: GcpsMessage) {
Effect.runFork(
Queue.offer(queue, {
ack: Effect.sync(() => message.ack()),
nack: Effect.sync(() => message.nack()),
read: decodeIngestionAttempted(message.data),
})
)
}
function errorListener(error: Error) {
// Fail
}
yield* Effect.sync(() => {
subscription.on('message', messageListener)
subscription.on('error', errorListener)
})
return { queue, subscription, messageListener, errorListener }
})
function release(resource: Effect.Effect.Success<typeof acquire>) {
return Effect.gen(function* () {
yield* Effect.sync(() => {
resource.subscription.removeListener('message', resource.messageListener)
resource.subscription.removeListener('error', resource.errorListener)
})
yield* Queue.shutdown(resource.queue)
})
}
const make = Effect.acquireRelease(acquire, release).pipe(
Effect.map(({ queue }) => MessageQueue.of({ queue }))
)
const subscription = PubsubSubscription.layer(
Config.string('PUBSUB_SUBSCRIPTION_NAME')
)
export const layer: Layer.Layer<
MessageQueue,
ConfigError.ConfigError,
PubsubClient.PubsubClient
> = Layer.scoped(MessageQueue, make).pipe(Layer.provide(subscription))