Effect CommunityEC
Effect Community•2mo ago•
7 replies
tobiaslins

**Modeling a Reactive AI Enrichment Service with Effect-Atom**

👋 I wanted to give effect-atom a spin and I'm not fully sure how I'd model following:
- A service when I can put in documents that get processed (stream/queue)
- I want a reactive way in react to read the state like
{
   processing: 3,
   waiting: 100,
   done: 5
}



This is what I have so far - I'm struggling to understand how I'd write either atoms within the service

class AIEnrichmentService extends Effect.Service<AIEnrichmentService>()('app/AIEnrichmentService', {
  // Define how to create the service
  effect: Effect.gen(function* () {
    const queue = yield* Queue.unbounded<Document>()

    yield* Effect.log('Starting AI enrichment service')
    yield* pipe(
      Stream.fromQueue(queue),
      Stream.mapEffect(
        (document) =>
          Effect.gen(function* () {
            // do AI stuff
            yield* Effect.log('Processing document AI', document._id)
            yield* Effect.sleep('1 second')
          }),
        { concurrency: 5 }
      ),
      Stream.runDrain,
      Effect.forkDaemon
    )

    yield* Effect.log('AI enrichment service started')

    return {
      queue,
      processDocumentAI: (document: Document) => Queue.offer(queue, document),

      getAIEnrichmentStatus: () => ({
        toProcess: Queue.size(queue)
      })
    } as const
  }),
  dependencies: []
}) {}

const runtimeAtom = Atom.runtime(AIEnrichmentService.Default)

const proccessAIOnDocument = runtimeAtom.fn(
  Effect.fnUntraced(function* (document: Document) {
    const aiEnrichmentService = yield* AIEnrichmentService
    return yield* aiEnrichmentService.processDocumentAI(document)
  })
)
Was this page helpful?