**Modeling a Reactive AI Enrichment Service with Effect-Atom**
effect-atomeffect-atom a spin and I'm not fully sure how I'd model following:- A service when I can put in documents that get processed (stream/queue)
- I want a reactive way in react to read the state like
{
processing: 3,
waiting: 100,
done: 5
}{
processing: 3,
waiting: 100,
done: 5
}This is what I have so far - I'm struggling to understand how I'd write either atoms within the service
class AIEnrichmentService extends Effect.Service<AIEnrichmentService>()('app/AIEnrichmentService', {
// Define how to create the service
effect: Effect.gen(function* () {
const queue = yield* Queue.unbounded<Document>()
yield* Effect.log('Starting AI enrichment service')
yield* pipe(
Stream.fromQueue(queue),
Stream.mapEffect(
(document) =>
Effect.gen(function* () {
// do AI stuff
yield* Effect.log('Processing document AI', document._id)
yield* Effect.sleep('1 second')
}),
{ concurrency: 5 }
),
Stream.runDrain,
Effect.forkDaemon
)
yield* Effect.log('AI enrichment service started')
return {
queue,
processDocumentAI: (document: Document) => Queue.offer(queue, document),
getAIEnrichmentStatus: () => ({
toProcess: Queue.size(queue)
})
} as const
}),
dependencies: []
}) {}
const runtimeAtom = Atom.runtime(AIEnrichmentService.Default)
const proccessAIOnDocument = runtimeAtom.fn(
Effect.fnUntraced(function* (document: Document) {
const aiEnrichmentService = yield* AIEnrichmentService
return yield* aiEnrichmentService.processDocumentAI(document)
})
)class AIEnrichmentService extends Effect.Service<AIEnrichmentService>()('app/AIEnrichmentService', {
// Define how to create the service
effect: Effect.gen(function* () {
const queue = yield* Queue.unbounded<Document>()
yield* Effect.log('Starting AI enrichment service')
yield* pipe(
Stream.fromQueue(queue),
Stream.mapEffect(
(document) =>
Effect.gen(function* () {
// do AI stuff
yield* Effect.log('Processing document AI', document._id)
yield* Effect.sleep('1 second')
}),
{ concurrency: 5 }
),
Stream.runDrain,
Effect.forkDaemon
)
yield* Effect.log('AI enrichment service started')
return {
queue,
processDocumentAI: (document: Document) => Queue.offer(queue, document),
getAIEnrichmentStatus: () => ({
toProcess: Queue.size(queue)
})
} as const
}),
dependencies: []
}) {}
const runtimeAtom = Atom.runtime(AIEnrichmentService.Default)
const proccessAIOnDocument = runtimeAtom.fn(
Effect.fnUntraced(function* (document: Document) {
const aiEnrichmentService = yield* AIEnrichmentService
return yield* aiEnrichmentService.processDocumentAI(document)
})
)