class EventQueue extends Context.Tag("EventQueue")<EventQueue, Queue.Queue<Event>>() {}
type Agent = (input: string) => Effect.Effect<string, never, EventQueue>
const agent: Agent = Effect.fn("agent")(function* (name: string, input: string) {
const queue = yield* EventQueue
yield* Queue.offer(queue, Events.AgentStarted({ agent: name }))
const response = yield* streamResponseFromModel(input).pipe(
Stream.tap((chunk) => Queue.offer(queue, Events.AgentDelta({ chunk }))),
Stream.runFold("", accumulateTextChunks)
)
yield* Queue.offer(queue, Events.AgentCompleted({ output: response }))
return response
})
const flow = Effect.fn("agentWorkflow")(function*(userMessage: string) {
const outputA = yield* agent("A", userMessage)
const outputB = yield* agentB("B", outputA)
const outputC = yield* agentC("C", outputA + outputB)
return { outputA, outputB, outputC }
})
const respond = (userMessage: string): Stream.Stream<Event, never, never> => {
return Stream.unwrapScoped(
Effect.gen(function*() {
const queue = yield* Queue.unbounded<Event>()
yield* Effect.fork(
flow(userMessage).pipe(
Effect.tap((result) => Queue.offer(queue, Events.WorkflowCompleted(result))),
Effect.provideService(EventQueue, queue),
),
)
return Stream.fromQueue(queue, { shutdown: true })
}),
)
}
class EventQueue extends Context.Tag("EventQueue")<EventQueue, Queue.Queue<Event>>() {}
type Agent = (input: string) => Effect.Effect<string, never, EventQueue>
const agent: Agent = Effect.fn("agent")(function* (name: string, input: string) {
const queue = yield* EventQueue
yield* Queue.offer(queue, Events.AgentStarted({ agent: name }))
const response = yield* streamResponseFromModel(input).pipe(
Stream.tap((chunk) => Queue.offer(queue, Events.AgentDelta({ chunk }))),
Stream.runFold("", accumulateTextChunks)
)
yield* Queue.offer(queue, Events.AgentCompleted({ output: response }))
return response
})
const flow = Effect.fn("agentWorkflow")(function*(userMessage: string) {
const outputA = yield* agent("A", userMessage)
const outputB = yield* agentB("B", outputA)
const outputC = yield* agentC("C", outputA + outputB)
return { outputA, outputB, outputC }
})
const respond = (userMessage: string): Stream.Stream<Event, never, never> => {
return Stream.unwrapScoped(
Effect.gen(function*() {
const queue = yield* Queue.unbounded<Event>()
yield* Effect.fork(
flow(userMessage).pipe(
Effect.tap((result) => Queue.offer(queue, Events.WorkflowCompleted(result))),
Effect.provideService(EventQueue, queue),
),
)
return Stream.fromQueue(queue, { shutdown: true })
}),
)
}