Workflow Snapshots
I am basically trying to reconnect to a running workflow. I use the watch and pass it the runID, but the issue is that when I reconnect, I am not aware of the current state of the workflow, so my ui can not refelect the state of the workflow until the next stream comes through. I tried to make a workaround where I get the snapshot and that can tell me where I am so I can initally show that state, but the snapshots do not handle parallel steps well from what I did.
i did this test to find the problem:
Here was the test workflow:
export const testSimpleWorkflow = createWorkflow({
id: "testSimpleWorkflow",
description: "Test workflow mirroring refreshUrlWorkflow structure",
inputSchema: TestInputSchema,
outputSchema: TestOutputSchema,
})
.then(getUrlOutlineWorkflowTest)
.parallel([
keywordAnalysisWorkflowTest,
serpResearchWorkflowTest,
refreshURLRedditResearchWorkflowTest,
])
.then(comprehensiveSynthesisStepTest)
.commit();
async function fetchSnapshot(runId: string) {
const store = mastra.getStorage();
if (!store) {
throw new Error("Storage not configured");
}
const workflowRun = await store.getWorkflowRunById({
runId,
workflowName: "testSimpleWorkflow",
});
if (!workflowRun || !workflowRun.snapshot) {
throw new Error("Workflow run not found");
}
return workflowRun.snapshot as {
status: string;
context?: Record<string, any>;
};
}
BELOW YOU CAN SEE THE SNAPSHOTS THAT WERE REUNTED USING THE ABOVE METHOD AT DIFFERENT POINTS IN WORKFLOW: Overarching issue is that I assumed that the output to the snapshots would contain all the currently running workflows. However, these are only returning one of the ones in parallel, not all of them. I did some more digging adn they are running in parallel, but the workflow snapshots do not show all 3 steps as active?



18 Replies
π Created GitHub issue: https://github.com/mastra-ai/mastra/issues/9027
GitHub
[DISCORD:1429605648806711408] Workflow Snapshots Β· Issue #9027 Β· ...
This issue was created from Discord post: https://discord.com/channels/1309558646228779139/1429605648806711408 I am basically trying to reconnect to a running workflow. I use the watch and pass it ...
Hi @wiz2202 ! You cannot currently resume a "running" workflow, but that is something we will allow eventually π
Hi @Romain - thanks. So we can resume watching it by calling the .watch right? Thereβs just no way for us to get the state that the workflow is currently in?
oh, well, actually, you can use
await workflow.getWorkflowRunExecutionResult(runId)
in that case
not sure the .watch(...)
works if it's already running π€
(if you're using the client sdk, it's await workflow.runExecutionResult(runId);
Oh @Romain thats interesting. This is how I was using the watch right now:
export async function GET(_: NextRequest, { params }: { params: Promise<{ workflowId: string; runId: string }> }) {
const { workflowId, runId } = await params;
const session = await requireAuth();
const userRun = await prisma.userWorkflowRun.findFirst({
where: { userId: session.user.id, runId, workflowName: workflowId },
});
if (!userRun) return new Response("Workflow run not found", { status: 404 });
const workflow = mastra.getWorkflow(workflowId);
const run = await workflow.createRunAsync({ runId });
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
let isClosed = false;
const send = (data: object) => {
if (isClosed) return;
try {
controller.enqueue(encoder.encode(
data: ${JSON.stringify(data)}\n\n
));
} catch (e) {
if (e instanceof TypeError && e.message.includes("closed")) isClosed = true;
else throw e;
}
};
run.watch((record) => {
if (record.type === "watch" && record.payload?.currentStep) {
const step = record.payload.currentStep;
const stepName = step.id?.split(".").pop() || "unknown";
send({ type: "workflow-step-start", payload: { stepName } });
if (step.status === "success") send({ type: "workflow-step-result", payload: { stepName, status: "success", output: step.output } });
}
send({ type: "transition", record });
});
const hb = setInterval(() => {
if (!isClosed) controller.enqueue(encoder.encode(event: ping\ndata: keep-alive\n\n
));
else clearInterval(hb);
}, 20000);
},
});
return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive" } });
}
@Romain Sorry, just to confirm. So right now, we should not expect to be able to let a user start a workflow, go to a different page and then come backa dn check in on the progress of the workflow and resume streaming the progress?Right now you can't really "resume" the stream of a "running" workflow, that is planned though. That's why you'd need to use the other ways to get updates on a workflow run status
@Romain Sounds good. I believe the underlying issue still remains though. When I call await workflow.runExecutionResult(runId); in the middle of parallel steps, the result only shows 1 step is running. This shouldn't be right though because 3 steps are running in parallel.
I am also confused by the difference between:
const store = mastra.getStorage();
const workflowRun = await store.getWorkflowRunById({
runId,
workflowName: "testSimpleWorkflow",
});
and
await workflow.runExecutionResult(runId);
but the problem appears in both methods for me
I believe these 2 methods do the same thing, is just one is called from a workflow object vs the other is called from the storage object, but the underlying data that gets retrieved should be the same.
Could you share a minimal repro example for the issue you're having with the parallel steps?
@Romain here is test example workflow:
export const testSimpleWorkflow = createWorkflow({
id: "testSimpleWorkflow",
description: "Test workflow mirroring refreshUrlWorkflow structure",
inputSchema: TestInputSchema,
outputSchema: TestOutputSchema,
})
.then(getUrlOutlineWorkflowTest)
.parallel([
keywordAnalysisWorkflowTest,
serpResearchWorkflowTest,
refreshURLRedditResearchWorkflowTest,
])
.then(comprehensiveSynthesisStepTest)
.commit();
When I run this workflow and poll the await workflow.runExecutionResult(runId);- I get this:
Most notably, in poll 5-7 it says serpResearchWorkflow is running but Reddit and keyword are not depite them running in parallel. Then in poll 8, it says that keyword suddenly finishes and serp and reddit are no longer running. Then in 10 serp and keyword are succesful, but reddit isnt running. The expected behavior would be that poll 5-7 would have all 3 steps running and then they finish and turn to compelted but all 3 states should show up in the snapshot as either running or completed.


This is the same behaviro as to why my frontend can not correctly pick up the current state of the workflow because things that are running are not appearing as running in the snapshot
Let me know if any more information would be helpful @Romain
i did the same thing with sequential workflow and it works as I would expect so it seems to only be in the parallel scenario
Yeah, that's likely a bug π€ Need to create a proper GH issue for that.
Ok I will. Sorry for bothering you
No worries, I can create one.
Great, thanks!
created https://github.com/mastra-ai/mastra/issues/9044
If you have a working repro example you could share on that GH issue that'd be awesome (otherwise I'll try to add one tomorrow) π
GitHub
[BUG]
workflow.runExecutionResult(...)
does not return the correc...Describe the Bug When steps are ran using .parallel(...), querying the workflow run status does not return the proper status of the steps running in parallel. Steps To Reproduce need to create Expe...
Just added a comment. Does that work?
Looks good, thanks!