Error with tRPC Subscription Handler: All Fibers Interrupted Without Errors
Getting this error on my app server (tRPC subscription handler). Am I doing something obviously wrong here? 
timestamp=2025-12-12T11:40:01.310Z level=INFO fiber=#20 message="STARTING WORKFLOW"
RPC stream failed:
All fibers interrupted without errors.
FATAL
InterruptedException: Interrupted by fibers: #28timestamp=2025-12-12T11:40:01.310Z level=INFO fiber=#20 message="STARTING WORKFLOW"
RPC stream failed:
All fibers interrupted without errors.
FATAL
InterruptedException: Interrupted by fibers: #28import { WorkflowsAppConfig } from "@typebot.io/config";
import { createId } from "@typebot.io/lib/createId";
import { ResultsWorkflowsRpcClient } from "@typebot.io/results/workflows/rpc";
import { z } from "@typebot.io/zod";
import { Cause, Effect, Layer, Stream } from "effect";
import { authenticatedProcedure } from "@/helpers/server/trpc";
const MainLayer = Layer.provide(
ResultsWorkflowsRpcClient.Default,
WorkflowsAppConfig.layer,
);
export const startExportWorkflow = authenticatedProcedure
.input(
z.object({
typebotId: z.string(),
}),
)
.subscription(async function* ({ input: { typebotId } }) {
const program = Effect.gen(function* () {
const rpcClient = yield* ResultsWorkflowsRpcClient;
yield* Effect.log("STARTING WORKFLOW");
const stream = rpcClient.ExecuteExportResultsWorkflow({
id: createId(),
typebotId,
});
return yield* stream.pipe(
Stream.tap((chunk) => Effect.logDebug("RPC chunk", chunk)),
Stream.tapErrorCause((cause) =>
Effect.sync(() => {
console.error(
"RPC stream failed:\n" +
Cause.pretty(cause, { renderErrorCause: true }),
);
}),
),
Stream.toAsyncIterableEffect,
);
});
try {
const asyncIterable = await Effect.runPromise(
program.pipe(Effect.provide(MainLayer)),
);
for await (const update of asyncIterable) {
yield update;
}
} catch (err) {
console.log("FATAL");
console.error(err);
}
});import { WorkflowsAppConfig } from "@typebot.io/config";
import { createId } from "@typebot.io/lib/createId";
import { ResultsWorkflowsRpcClient } from "@typebot.io/results/workflows/rpc";
import { z } from "@typebot.io/zod";
import { Cause, Effect, Layer, Stream } from "effect";
import { authenticatedProcedure } from "@/helpers/server/trpc";
const MainLayer = Layer.provide(
ResultsWorkflowsRpcClient.Default,
WorkflowsAppConfig.layer,
);
export const startExportWorkflow = authenticatedProcedure
.input(
z.object({
typebotId: z.string(),
}),
)
.subscription(async function* ({ input: { typebotId } }) {
const program = Effect.gen(function* () {
const rpcClient = yield* ResultsWorkflowsRpcClient;
yield* Effect.log("STARTING WORKFLOW");
const stream = rpcClient.ExecuteExportResultsWorkflow({
id: createId(),
typebotId,
});
return yield* stream.pipe(
Stream.tap((chunk) => Effect.logDebug("RPC chunk", chunk)),
Stream.tapErrorCause((cause) =>
Effect.sync(() => {
console.error(
"RPC stream failed:\n" +
Cause.pretty(cause, { renderErrorCause: true }),
);
}),
),
Stream.toAsyncIterableEffect,
);
});
try {
const asyncIterable = await Effect.runPromise(
program.pipe(Effect.provide(MainLayer)),
);
for await (const update of asyncIterable) {
yield update;
}
} catch (err) {
console.log("FATAL");
console.error(err);
}
});export class ResultsWorkflowsRpc extends RpcGroup.make(
Rpc.make("ExecuteExportResultsWorkflow", {
success: ExportResultsWorkflowStatusChunk,
error: Schema.Union(RedisSubscribeError, ExportResultsWorkflow.errorSchema),
stream: true,
payload: ExportResultsWorkflow.payloadSchema,
})
) {}
export const ResultsWorkflowsRpcLayer = ResultsWorkflowsRpc.toLayer(
Effect.succeed({
ExecuteExportResultsWorkflow: (payload) =>
Effect.gen(function* () {
const redis = yield* RedisClient;
yield* Effect.log("SUBSCRIBING TO PROGRESS STREAM");
const progressStream = redis.subscribe(
`${EXPORT_PROGRESS_CHANNEL_PREFIX}${payload.id}`,
);
yield* Effect.log("EXECUTING WORKFLOW");
const workflowFiber = yield* ExportResultsWorkflow.execute(
payload,
).pipe(Effect.fork);
yield* Effect.log("FORKED WORKFLOW");
return Stream.concat(
Stream.succeed({
status: "starting" as const,
workflowId: payload.id,
}),
Stream.concat(
Stream.map(progressStream, (message) => ({
status: "in_progress" as const,
progress: Number.parseFloat(message),
})).pipe(Stream.takeWhile((message) => message.progress !== 100)),
Stream.fromEffect(
Fiber.join(workflowFiber).pipe(
Effect.map((result) => ({
status: "completed" as const,
fileUrl: result.fileUrl.toString(),
})),
),
),
),
);
}).pipe(Stream.unwrap)
}),
);export class ResultsWorkflowsRpc extends RpcGroup.make(
Rpc.make("ExecuteExportResultsWorkflow", {
success: ExportResultsWorkflowStatusChunk,
error: Schema.Union(RedisSubscribeError, ExportResultsWorkflow.errorSchema),
stream: true,
payload: ExportResultsWorkflow.payloadSchema,
})
) {}
export const ResultsWorkflowsRpcLayer = ResultsWorkflowsRpc.toLayer(
Effect.succeed({
ExecuteExportResultsWorkflow: (payload) =>
Effect.gen(function* () {
const redis = yield* RedisClient;
yield* Effect.log("SUBSCRIBING TO PROGRESS STREAM");
const progressStream = redis.subscribe(
`${EXPORT_PROGRESS_CHANNEL_PREFIX}${payload.id}`,
);
yield* Effect.log("EXECUTING WORKFLOW");
const workflowFiber = yield* ExportResultsWorkflow.execute(
payload,
).pipe(Effect.fork);
yield* Effect.log("FORKED WORKFLOW");
return Stream.concat(
Stream.succeed({
status: "starting" as const,
workflowId: payload.id,
}),
Stream.concat(
Stream.map(progressStream, (message) => ({
status: "in_progress" as const,
progress: Number.parseFloat(message),
})).pipe(Stream.takeWhile((message) => message.progress !== 100)),
Stream.fromEffect(
Fiber.join(workflowFiber).pipe(
Effect.map((result) => ({
status: "completed" as const,
fileUrl: result.fileUrl.toString(),
})),
),
),
),
);
}).pipe(Stream.unwrap)
}),
);