Effect CommunityEC
Effect Community4w ago
1 reply
Baptiste

Error with tRPC Subscription Handler: All Fibers Interrupted Without Errors

Getting this error on my app server (tRPC subscription handler). Am I doing something obviously wrong here? 🙏
timestamp=2025-12-12T11:40:01.310Z level=INFO fiber=#20 message="STARTING WORKFLOW"
RPC stream failed:
All fibers interrupted without errors.
FATAL
InterruptedException: Interrupted by fibers: #28

import { WorkflowsAppConfig } from "@typebot.io/config";
import { createId } from "@typebot.io/lib/createId";
import { ResultsWorkflowsRpcClient } from "@typebot.io/results/workflows/rpc";
import { z } from "@typebot.io/zod";
import { Cause, Effect, Layer, Stream } from "effect";
import { authenticatedProcedure } from "@/helpers/server/trpc";

const MainLayer = Layer.provide(
  ResultsWorkflowsRpcClient.Default,
  WorkflowsAppConfig.layer,
);

export const startExportWorkflow = authenticatedProcedure
  .input(
    z.object({
      typebotId: z.string(),
    }),
  )
  .subscription(async function* ({ input: { typebotId } }) {
    const program = Effect.gen(function* () {
      const rpcClient = yield* ResultsWorkflowsRpcClient;

      yield* Effect.log("STARTING WORKFLOW");

      const stream = rpcClient.ExecuteExportResultsWorkflow({
        id: createId(),
        typebotId,
      });

      return yield* stream.pipe(
        Stream.tap((chunk) => Effect.logDebug("RPC chunk", chunk)),
        Stream.tapErrorCause((cause) =>
          Effect.sync(() => {
            console.error(
              "RPC stream failed:\n" +
                Cause.pretty(cause, { renderErrorCause: true }),
            );
          }),
        ),
        Stream.toAsyncIterableEffect,
      );
    });

    try {
      const asyncIterable = await Effect.runPromise(
        program.pipe(Effect.provide(MainLayer)),
      );
      for await (const update of asyncIterable) {
        yield update;
      }
    } catch (err) {
      console.log("FATAL");
      console.error(err);
    }
  });

export class ResultsWorkflowsRpc extends RpcGroup.make(
  Rpc.make("ExecuteExportResultsWorkflow", {
    success: ExportResultsWorkflowStatusChunk,
    error: Schema.Union(RedisSubscribeError, ExportResultsWorkflow.errorSchema),
    stream: true,
    payload: ExportResultsWorkflow.payloadSchema,
  })
) {}

export const ResultsWorkflowsRpcLayer = ResultsWorkflowsRpc.toLayer(
  Effect.succeed({
    ExecuteExportResultsWorkflow: (payload) =>
      Effect.gen(function* () {
        const redis = yield* RedisClient;

        yield* Effect.log("SUBSCRIBING TO PROGRESS STREAM");

        const progressStream = redis.subscribe(
          `${EXPORT_PROGRESS_CHANNEL_PREFIX}${payload.id}`,
        );

        yield* Effect.log("EXECUTING WORKFLOW");

        const workflowFiber = yield* ExportResultsWorkflow.execute(
          payload,
        ).pipe(Effect.fork);

        yield* Effect.log("FORKED WORKFLOW");

        return Stream.concat(
          Stream.succeed({
            status: "starting" as const,
            workflowId: payload.id,
          }),
          Stream.concat(
            Stream.map(progressStream, (message) => ({
              status: "in_progress" as const,
              progress: Number.parseFloat(message),
            })).pipe(Stream.takeWhile((message) => message.progress !== 100)),
            Stream.fromEffect(
              Fiber.join(workflowFiber).pipe(
                Effect.map((result) => ({
                  status: "completed" as const,
                  fileUrl: result.fileUrl.toString(),
                })),
              ),
            ),
          ),
        );
      }).pipe(Stream.unwrap)
  }),
);
Was this page helpful?