How to access stream from workflow as agent tool and resume workflow?

Hi, im using a workflow as part of an agent and having the agent execute a workflow. I have the following step and the workflow does suspend but the actual agent invoking it doesnt actually wait for the workflow to resume, it just continues. Also, I dont see the writer events ever making it through to the data stream so how do I pipe the writer through when invoking via an agent.
const showDataDictionaryStep = createStep({
id: 'show-data-dictionary',
inputSchema: z.object({
targetDocumentId: z.string(),
}),
outputSchema: z.object({
approved: z.boolean(),
}),
resumeSchema: z.object({
approved: z.boolean(),
}),
execute: async ({ inputData, resumeData, suspend, writer }) => {
// If we have resume data, user has approved
if (resumeData?.approved) {
return { approved: true };
}

// First time - load and show data dictionary
const csvPath = path.join(__dirname, 'data-dictionary.csv');
const csvContent = await fs.readFile(csvPath, 'utf-8');

// Stream data dictionary as artifact
await writer?.custom({
type: 'data-artifact',
data: {
id: 'data-dictionary',
title: 'Data Dictionary',
kind: 'sheet',
content: csvContent,
},
});

// Suspend and wait for user approval
return await suspend({
message:
'Before I begin, please review and approve (or edit then approve) the data dictionary to shred the document.',
targetDocumentId: inputData.targetDocumentId,
});
},
});
const showDataDictionaryStep = createStep({
id: 'show-data-dictionary',
inputSchema: z.object({
targetDocumentId: z.string(),
}),
outputSchema: z.object({
approved: z.boolean(),
}),
resumeSchema: z.object({
approved: z.boolean(),
}),
execute: async ({ inputData, resumeData, suspend, writer }) => {
// If we have resume data, user has approved
if (resumeData?.approved) {
return { approved: true };
}

// First time - load and show data dictionary
const csvPath = path.join(__dirname, 'data-dictionary.csv');
const csvContent = await fs.readFile(csvPath, 'utf-8');

// Stream data dictionary as artifact
await writer?.custom({
type: 'data-artifact',
data: {
id: 'data-dictionary',
title: 'Data Dictionary',
kind: 'sheet',
content: csvContent,
},
});

// Suspend and wait for user approval
return await suspend({
message:
'Before I begin, please review and approve (or edit then approve) the data dictionary to shred the document.',
targetDocumentId: inputData.targetDocumentId,
});
},
});
Route handler in backend:
const agent1 = mastra.getAgent('baseAgent');

const stream = createUIMessageStream({
execute: async ({ writer }) => {
const result1 = await agent1.stream(finalMessages, {
runtimeContext,
stopWhen: stepCountIs(100),
memory: {
resource: user.userID,
thread: chatId,
},
});
for await (const part of toAISdkFormat(result1, { from: 'agent' })) {
writer.write(part);
}
},
generateId: generateUUID,
onFinish: async ({ messages: finished }) => {
await saveMessages({
messages: finished
.filter((m) => m.role !== 'user')
.map((m) => ({
id: isUuidV4(m.id) ? m.id : generateUUID(),
role: m.role,
parts: m.parts as ChatMessagePart[],
createdAt: new Date(),
attachments: [],
chatId,
})),
});
},
onError: (error) => {
context.logger.error('UIMessageStream error', { error, chatId });
return 'Oops, an error occurred!';
},
});

const sseStream = stream.pipeThrough(new JsonToSseTransformStream());
const reader = sseStream.getReader();
const agent1 = mastra.getAgent('baseAgent');

const stream = createUIMessageStream({
execute: async ({ writer }) => {
const result1 = await agent1.stream(finalMessages, {
runtimeContext,
stopWhen: stepCountIs(100),
memory: {
resource: user.userID,
thread: chatId,
},
});
for await (const part of toAISdkFormat(result1, { from: 'agent' })) {
writer.write(part);
}
},
generateId: generateUUID,
onFinish: async ({ messages: finished }) => {
await saveMessages({
messages: finished
.filter((m) => m.role !== 'user')
.map((m) => ({
id: isUuidV4(m.id) ? m.id : generateUUID(),
role: m.role,
parts: m.parts as ChatMessagePart[],
createdAt: new Date(),
attachments: [],
chatId,
})),
});
},
onError: (error) => {
context.logger.error('UIMessageStream error', { error, chatId });
return 'Oops, an error occurred!';
},
});

const sseStream = stream.pipeThrough(new JsonToSseTransformStream());
const reader = sseStream.getReader();
part of agent definition:
model: openai('gpt-4.1'),
memory,
tools: {
listFilesTool,
readFileTool,
createTextDocumentTool,
updateTextDocumentTool,
applyAIPatchTool,
},
agents: {
codingAgent,
contentWritingAgent,
},
workflows: {
complianceMatrixWorkflow,
},
model: openai('gpt-4.1'),
memory,
tools: {
listFilesTool,
readFileTool,
createTextDocumentTool,
updateTextDocumentTool,
applyAIPatchTool,
},
agents: {
codingAgent,
contentWritingAgent,
},
workflows: {
complianceMatrixWorkflow,
},
3 Replies
Mastra Triager
GitHub
[DISCORD:1433125186961670185] How to access stream from workflow as...
This issue was created from Discord post: https://discord.com/channels/1309558646228779139/1433125186961670185 Hi, im using a workflow as part of an agent and having the agent execute a workflow. I...
_roamin_
_roamin_3w ago
Hi @sheerazz ! There isn't really a good way to do that right now, you basically need to add guidance/instructions in your system prompt to tell the llm what to do when it encounters a suspended workflow
sheerazz
sheerazzOP2w ago
Hi @Romain thanks for getting back to me on this. do networks support suspend / resume? we want to embed a workflow capabilities into a native chat feel where the chat can invoke a workflow whenever needed but users can continue using the chat after the workflow. or also the chat can invoke multiple workflows sequentially

Did you find this page helpful?