Effect CommunityEC
Effect Community9mo ago
14 replies
The Aleks

Managing Entity Lifecycle and State Recovery in Effect Typescript

What decides the lifecycle of an entity? When does it die? And how do you recover state if it's interrupted and need to be spun up again?

In the code below the messageProcessor is interrupted - the goal is to keep it running as long as the request is open.

AND if the function crashed while processing a message it should attempt to process it again when it boots up.

export const TransactionAgentLive = TransactionAgent.toLayer(
  Effect.gen(function* () {
    const address = yield* Entity.CurrentAddress;
    const generateResponse = yield* GenerateResponse();
    const mailbox = yield* Mailbox.make<Message>();

    // Ref to hold the currently active generation fiber (running agent.respond())
    const generationFiberRef = yield* Ref.make<
      Option.Option<Fiber.RuntimeFiber<unknown, any>>
    >(Option.none()); // Error type might be more specific

    // Really don't like this...
    const requestId = RequestId.make(address.entityId);
    const requestScope = yield* Effect.scope;
    const { callDb } = yield* Database;

    yield* Effect.logInfo(
      `TransactionAgent entity started at ${address.entityId}`,
    );

    const cancelCurrent = Effect.gen(function* () {
      const oldFiberOpt = yield* Ref.getAndSet(
        generationFiberRef,
        Option.none(),
      );
      yield* Option.match(oldFiberOpt, {
        onNone: () =>
          Effect.logInfo(`No existing fiber to interrupt for ${requestId}`),
        onSome: (fiber) =>
          Effect.logInfo(`Interrupting previous fiber for ${requestId}`).pipe(
            Effect.zipRight(Fiber.interrupt(fiber)), // Interrupt (triggers onInterrupt status update)
          ),
      });
    });

    const messageProcessor = Effect.gen(function* () {
      yield* Effect.logInfo("Waiting for message");
      const { messageId, recipient } = yield* mailbox.take;

      yield* Effect.logInfo(`Received message: ${messageId}`);

     

      yield* Effect.logInfo(`Starting new fiber for ${requestId}`);

    }).pipe(
      Effect.tapError((error) =>
        Effect.logError(`Error processing message: ${error}`),
      ),
      Effect.forever,
      Effect.scoped,
      Effect.onInterrupt((cause) =>
        Effect.logError(`Message processing interrupted: ${cause}`),
      ),
    );

    // Start the message processor and tie to the entity scope
    yield* Effect.fork(messageProcessor);

    return {
      GenerateResponse: Effect.fnUntraced(
        function* (envelope) {
          // Add message to the mailbox
          return yield* Effect.logInfo(
            `Offering message to mailbox: ${envelope.payload.messageId}`,
          ).pipe(
            Effect.tap(
              mailbox.size.pipe(
                Effect.andThen((size) =>
                  Effect.logInfo(`Mailbox size: ${size}`),
                ),
              ),
            ),
            Effect.andThen(mailbox.offer(envelope.payload)),
          );
        },
        (effect, { payload }) =>
          Effect.annotateLogs(effect, {
            address,
            pid: process.pid,
            messageId: payload.messageId,
            agentName: payload.recipient,
          }),
      ),

      GetTransactionStatus: Effect.fnUntraced(
        function* (envelope) {
          yield* Effect.logInfo(
            `Checking status for transaction ${envelope.address.entityId}`,
          );

          // Simulate retrieving transaction status
          const isProcessed = Math.random() > 0.3;

          return yield* Effect.succeed({
            status: isProcessed ? "COMPLETED" : "PENDING",
            timestamp: Date.now(),
            processed: isProcessed,
          });
        },
        (effect, { payload }) =>
          Effect.annotateLogs(effect, {
            address,
            pid: process.pid,
            transactionId: payload,
          }),
      ),
    };
  }),
).pipe(Layer.provideMerge(Dependencies));
Was this page helpful?