M
Mastra•3w ago
arpitBhalla

Nested Workflow with custom writer

Workflow -> .branch(condition, nestedWorkflow) in nestedWorkflow -> createStep -> writer.custom toAISDK(workflow.run().stream(), {from:workflow}) the writer isn't propagating the custom data-* to root stream
30 Replies
Mastra Triager
Mastra Triager•2w ago
šŸ“ Created GitHub issue: https://github.com/mastra-ai/mastra/issues/10433 šŸ” If you're experiencing an error, please provide a minimal reproducible example whenever possible to help us resolve it quickly. šŸ™ Thank you for helping us improve Mastra!
Aman Zishan
Aman Zishan•2w ago
I think i face the same problem
arpitBhalla
arpitBhallaOP•2w ago
This worked for me for now.
new TransformStream<ChunkType, WorkflowDataPart | ChunkType | {data?: string; type?: 'start' | 'finish'}>(
{
transform(chunk: ChunkType, controller) {
console.log('\x1b[36m%s\x1b[0m', 'mastra', chunk.type);
mastraChunks.push(chunk);

if (chunk.type === 'workflow-step-output') {
let outputChunk = chunk;
while (outputChunk.type === 'workflow-step-output') {
outputChunk = outputChunk.payload.output;
}
if (outputChunk.type?.startsWith('text-') || outputChunk.type?.startsWith('data-')) {
console.log('TRANSFORMER', outputChunk.type);
controller.enqueue(outputChunk);
}
} else {
controller.enqueue(chunk);
}
},
}
)
new TransformStream<ChunkType, WorkflowDataPart | ChunkType | {data?: string; type?: 'start' | 'finish'}>(
{
transform(chunk: ChunkType, controller) {
console.log('\x1b[36m%s\x1b[0m', 'mastra', chunk.type);
mastraChunks.push(chunk);

if (chunk.type === 'workflow-step-output') {
let outputChunk = chunk;
while (outputChunk.type === 'workflow-step-output') {
outputChunk = outputChunk.payload.output;
}
if (outputChunk.type?.startsWith('text-') || outputChunk.type?.startsWith('data-')) {
console.log('TRANSFORMER', outputChunk.type);
controller.enqueue(outputChunk);
}
} else {
controller.enqueue(chunk);
}
},
}
)
Vulcano
Vulcano•2w ago
@Ward this is the same issue as what I was facing when I reported this issue, which you fixed What the exact problem is then i dont know, but this is still happening apparently
Aman Zishan
Aman Zishan•2w ago
@Vulcano Based on @arpitBhalla code i patched the ai-sdk transformer as i use useChat and not client side runs
diff --git a/node_modules/@mastra/ai-sdk/dist/index.js b/node_modules/@mastra/ai-sdk/dist/index.js
index a847fbd..a7146e1 100644
--- a/node_modules/@mastra/ai-sdk/dist/index.js
+++ b/node_modules/@mastra/ai-sdk/dist/index.js
@@ -517,6 +517,17 @@ function AgentStreamToAISDKTransformer({
let finishEventSent = false;
return new TransformStream({
transform(chunk, controller) {
+ // Unwrap nested workflow-step-output chunks to extract data- chunks
+ if (chunk.type === "workflow-step-output") {
+ let outputChunk = chunk;
+ while (outputChunk.type === "workflow-step-output" && outputChunk.payload?.output) {
+ outputChunk = outputChunk.payload.output;
+ }
+ if (outputChunk.type?.startsWith?.("data-")) {
+ controller.enqueue(outputChunk);
+ return;
+ }
+ }
if (chunk.type === "tripwire") {
tripwireOccurred = true;
}
diff --git a/node_modules/@mastra/ai-sdk/dist/index.js b/node_modules/@mastra/ai-sdk/dist/index.js
index a847fbd..a7146e1 100644
--- a/node_modules/@mastra/ai-sdk/dist/index.js
+++ b/node_modules/@mastra/ai-sdk/dist/index.js
@@ -517,6 +517,17 @@ function AgentStreamToAISDKTransformer({
let finishEventSent = false;
return new TransformStream({
transform(chunk, controller) {
+ // Unwrap nested workflow-step-output chunks to extract data- chunks
+ if (chunk.type === "workflow-step-output") {
+ let outputChunk = chunk;
+ while (outputChunk.type === "workflow-step-output" && outputChunk.payload?.output) {
+ outputChunk = outputChunk.payload.output;
+ }
+ if (outputChunk.type?.startsWith?.("data-")) {
+ controller.enqueue(outputChunk);
+ return;
+ }
+ }
if (chunk.type === "tripwire") {
tripwireOccurred = true;
}
This seems to emit events in resume for me
Vulcano
Vulcano•2w ago
Sick, ill give it a try when I have time to see if this fixes my issue as well. I also use the AI SDK v5 useChat
Aman Zishan
Aman Zishan•2w ago
GitHub
Fix workflow-step-output handling in ai-sdk transformers by grayson...
Description Added explicit handling for workflow-step-output chunk type in AI-SDK transformers. This ensures that chunks from nested workflow steps, including embedded data-* chunks or the original...
arpitBhalla
arpitBhallaOP•2w ago
I'm not sure if this will fix the nested workflow issue. Could you try this in createUIMessageStream
for await (const chunk of toAISdkStream(stream.fullStream.pipeThrough(transformer), {})) {
console.log('aiv5', chunk.type);
writer.write(chunk as UIMessageChunk);
}
for await (const chunk of toAISdkStream(stream.fullStream.pipeThrough(transformer), {})) {
console.log('aiv5', chunk.type);
writer.write(chunk as UIMessageChunk);
}
this will work without patching the @mastra/ai-sdk
Abhi Aiyer
Abhi Aiyer•2w ago
Thanks for the investigation! We'll get it fixed!
Vulcano
Vulcano•2w ago
@arpitBhalla this does fix one of my issues, but the stream.fullStream in my case sometimes simply doesnt even contain some writer.custom events under very specific circumstances, so I will await the release that fixes this and see if that issue also gets fixed
Ward
Ward•2w ago
@Dero we should check if writer.custom works in any primitive (tool, workflow, agent) cause it should be fixed in the latest version of mastra core and ai-sdk
Abhi Aiyer
Abhi Aiyer•2w ago
GitHub
fix/workflow step output data chunks by TheIsrael1 Ā· Pull Request ...
fix(ai-sdk): propagate custom data chunks from nested workflows in branches to root stream changeset Description Related Issue(s) Type of Change Bug fix (non-breaking change that fixes an issu...
Abhi Aiyer
Abhi Aiyer•2w ago
This will be released in our next 0.x release 1.0.0-beta release tomorrow
arpitBhalla
arpitBhallaOP•2w ago
Hi, I'm still unable to get it for nested workflows.
dero
dero•2w ago
Hi @arpitBhalla I have tested the original example you linked here
Workflow -> .branch(condition, nestedWorkflow)
in nestedWorkflow -> createStep -> writer.custom
Workflow -> .branch(condition, nestedWorkflow)
in nestedWorkflow -> createStep -> writer.custom
and can verify it works with the latest 0.x We are going to ship an example for it in the UI Dojo. A PR is already open https://github.com/mastra-ai/ui-dojo/pull/13 If this is not exactly what you are trying to achieve, kindly help with more of your implementation detail šŸ‘
Aman Zishan
Aman Zishan•2w ago
@dero is this fix released in v1 beta now?
Ward
Ward•2w ago
both beta and latest
Aman Zishan
Aman Zishan•2w ago
@arpitBhalla were you able to fix this? for me when a nested workflow is resumed no custom events are emitted
arpitBhalla
arpitBhallaOP•2w ago
I'm still using custom transformer
Ward
Ward•2w ago
@dero it seems to be broken with resume
Aman Zishan
Aman Zishan•2w ago
@Ward would be great to post the PR here once its up so i can temp patch until the release When i inspected to stream workflow-step-output was not there on resume
Vulcano
Vulcano•2w ago
ye my issue was also with resuming nested workflows and that is still an issue
Abhi Aiyer
Abhi Aiyer•2w ago
Okay let me reopen this
Ward
Ward•5d ago
Should be fixed by https://github.com/mastra-ai/mastra/pull/10720 going to test it out
GitHub
fix(workflows): pass writable stream to workflow resume by roaminro...
Description Add missing writableStream parameter when resuming workflows to ensure output is properly streamed during workflow execution. Related Issue(s) slack Type of Change Bug fix (non-breaki...
Aman Zishan
Aman Zishan•5d ago
@Ward can confirm it works for me
Ward
Ward•5d ago
šŸ™
Abhi Aiyer
Abhi Aiyer•5d ago
Yayyyyyy
Vulcano
Vulcano•5d ago
I will also try it out tomorrow!
Aman Zishan
Aman Zishan•4d ago
@Ward when can we expect this fix in beta release šŸ™Œ I think it's already released..
Ward
Ward•4d ago
it's released in beta on wednesday morning GMT+1 let me know if it hasn't been fixed yet

Did you find this page helpful?