M
MastraAIβ€’2d ago
wiz2202

Workflow Snapshots

I am basically trying to reconnect to a running workflow. I use the watch and pass it the runID, but the issue is that when I reconnect, I am not aware of the current state of the workflow, so my ui can not refelect the state of the workflow until the next stream comes through. I tried to make a workaround where I get the snapshot and that can tell me where I am so I can initally show that state, but the snapshots do not handle parallel steps well from what I did. i did this test to find the problem: Here was the test workflow: export const testSimpleWorkflow = createWorkflow({ id: "testSimpleWorkflow", description: "Test workflow mirroring refreshUrlWorkflow structure", inputSchema: TestInputSchema, outputSchema: TestOutputSchema, }) .then(getUrlOutlineWorkflowTest) .parallel([ keywordAnalysisWorkflowTest, serpResearchWorkflowTest, refreshURLRedditResearchWorkflowTest, ]) .then(comprehensiveSynthesisStepTest) .commit(); async function fetchSnapshot(runId: string) { const store = mastra.getStorage(); if (!store) { throw new Error("Storage not configured"); } const workflowRun = await store.getWorkflowRunById({ runId, workflowName: "testSimpleWorkflow", }); if (!workflowRun || !workflowRun.snapshot) { throw new Error("Workflow run not found"); } return workflowRun.snapshot as { status: string; context?: Record<string, any>; }; } BELOW YOU CAN SEE THE SNAPSHOTS THAT WERE REUNTED USING THE ABOVE METHOD AT DIFFERENT POINTS IN WORKFLOW: Overarching issue is that I assumed that the output to the snapshots would contain all the currently running workflows. However, these are only returning one of the ones in parallel, not all of them. I did some more digging adn they are running in parallel, but the workflow snapshots do not show all 3 steps as active?
No description
No description
No description
18 Replies
Mastra Triager
Mastra Triagerβ€’2d ago
GitHub
[DISCORD:1429605648806711408] Workflow Snapshots Β· Issue #9027 Β· ...
This issue was created from Discord post: https://discord.com/channels/1309558646228779139/1429605648806711408 I am basically trying to reconnect to a running workflow. I use the watch and pass it ...
_roamin_
_roamin_β€’20h ago
Hi @wiz2202 ! You cannot currently resume a "running" workflow, but that is something we will allow eventually πŸ˜‰
wiz2202
wiz2202OPβ€’20h ago
Hi @Romain - thanks. So we can resume watching it by calling the .watch right? There’s just no way for us to get the state that the workflow is currently in?
_roamin_
_roamin_β€’19h ago
oh, well, actually, you can use await workflow.getWorkflowRunExecutionResult(runId) in that case not sure the .watch(...) works if it's already running πŸ€” (if you're using the client sdk, it's await workflow.runExecutionResult(runId);
wiz2202
wiz2202OPβ€’17h ago
Oh @Romain thats interesting. This is how I was using the watch right now: export async function GET(_: NextRequest, { params }: { params: Promise<{ workflowId: string; runId: string }> }) { const { workflowId, runId } = await params; const session = await requireAuth(); const userRun = await prisma.userWorkflowRun.findFirst({ where: { userId: session.user.id, runId, workflowName: workflowId }, }); if (!userRun) return new Response("Workflow run not found", { status: 404 }); const workflow = mastra.getWorkflow(workflowId); const run = await workflow.createRunAsync({ runId }); const encoder = new TextEncoder(); const stream = new ReadableStream({ start(controller) { let isClosed = false; const send = (data: object) => { if (isClosed) return; try { controller.enqueue(encoder.encode(data: ${JSON.stringify(data)}\n\n)); } catch (e) { if (e instanceof TypeError && e.message.includes("closed")) isClosed = true; else throw e; } }; run.watch((record) => { if (record.type === "watch" && record.payload?.currentStep) { const step = record.payload.currentStep; const stepName = step.id?.split(".").pop() || "unknown"; send({ type: "workflow-step-start", payload: { stepName } }); if (step.status === "success") send({ type: "workflow-step-result", payload: { stepName, status: "success", output: step.output } }); } send({ type: "transition", record }); }); const hb = setInterval(() => { if (!isClosed) controller.enqueue(encoder.encode(event: ping\ndata: keep-alive\n\n)); else clearInterval(hb); }, 20000); }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", Connection: "keep-alive" } }); } @Romain Sorry, just to confirm. So right now, we should not expect to be able to let a user start a workflow, go to a different page and then come backa dn check in on the progress of the workflow and resume streaming the progress?
_roamin_
_roamin_β€’17h ago
Right now you can't really "resume" the stream of a "running" workflow, that is planned though. That's why you'd need to use the other ways to get updates on a workflow run status
wiz2202
wiz2202OPβ€’17h ago
@Romain Sounds good. I believe the underlying issue still remains though. When I call await workflow.runExecutionResult(runId); in the middle of parallel steps, the result only shows 1 step is running. This shouldn't be right though because 3 steps are running in parallel. I am also confused by the difference between: const store = mastra.getStorage(); const workflowRun = await store.getWorkflowRunById({ runId, workflowName: "testSimpleWorkflow", }); and await workflow.runExecutionResult(runId); but the problem appears in both methods for me
_roamin_
_roamin_β€’17h ago
I believe these 2 methods do the same thing, is just one is called from a workflow object vs the other is called from the storage object, but the underlying data that gets retrieved should be the same. Could you share a minimal repro example for the issue you're having with the parallel steps?
wiz2202
wiz2202OPβ€’16h ago
@Romain here is test example workflow: export const testSimpleWorkflow = createWorkflow({ id: "testSimpleWorkflow", description: "Test workflow mirroring refreshUrlWorkflow structure", inputSchema: TestInputSchema, outputSchema: TestOutputSchema, }) .then(getUrlOutlineWorkflowTest) .parallel([ keywordAnalysisWorkflowTest, serpResearchWorkflowTest, refreshURLRedditResearchWorkflowTest, ]) .then(comprehensiveSynthesisStepTest) .commit(); When I run this workflow and poll the await workflow.runExecutionResult(runId);- I get this:
wiz2202
wiz2202OPβ€’16h ago
Most notably, in poll 5-7 it says serpResearchWorkflow is running but Reddit and keyword are not depite them running in parallel. Then in poll 8, it says that keyword suddenly finishes and serp and reddit are no longer running. Then in 10 serp and keyword are succesful, but reddit isnt running. The expected behavior would be that poll 5-7 would have all 3 steps running and then they finish and turn to compelted but all 3 states should show up in the snapshot as either running or completed.
No description
No description
wiz2202
wiz2202OPβ€’16h ago
This is the same behaviro as to why my frontend can not correctly pick up the current state of the workflow because things that are running are not appearing as running in the snapshot Let me know if any more information would be helpful @Romain i did the same thing with sequential workflow and it works as I would expect so it seems to only be in the parallel scenario
_roamin_
_roamin_β€’16h ago
Yeah, that's likely a bug πŸ€” Need to create a proper GH issue for that.
wiz2202
wiz2202OPβ€’16h ago
Ok I will. Sorry for bothering you
_roamin_
_roamin_β€’16h ago
No worries, I can create one.
wiz2202
wiz2202OPβ€’16h ago
Great, thanks!
_roamin_
_roamin_β€’16h ago
created https://github.com/mastra-ai/mastra/issues/9044 If you have a working repro example you could share on that GH issue that'd be awesome (otherwise I'll try to add one tomorrow) πŸ˜‰
GitHub
[BUG] workflow.runExecutionResult(...) does not return the correc...
Describe the Bug When steps are ran using .parallel(...), querying the workflow run status does not return the proper status of the steps running in parallel. Steps To Reproduce need to create Expe...
wiz2202
wiz2202OPβ€’16h ago
Just added a comment. Does that work?
_roamin_
_roamin_β€’16h ago
Looks good, thanks!

Did you find this page helpful?