MastraM
Mastra3mo ago
3 replies
sheerazz

How to access stream from workflow as agent tool and resume workflow?

Hi, im using a workflow as part of an agent and having the agent execute a workflow.

I have the following step and the workflow does suspend but the actual agent invoking it doesnt actually wait for the workflow to resume, it just continues. Also, I dont see the writer events ever making it through to the data stream so how do I pipe the writer through when invoking via an agent.



const showDataDictionaryStep = createStep({
    id: 'show-data-dictionary',
    inputSchema: z.object({
        targetDocumentId: z.string(),
    }),
    outputSchema: z.object({
        approved: z.boolean(),
    }),
    resumeSchema: z.object({
        approved: z.boolean(),
    }),
    execute: async ({ inputData, resumeData, suspend, writer }) => {
        // If we have resume data, user has approved
        if (resumeData?.approved) {
            return { approved: true };
        }

        // First time - load and show data dictionary
        const csvPath = path.join(__dirname, 'data-dictionary.csv');
        const csvContent = await fs.readFile(csvPath, 'utf-8');

        // Stream data dictionary as artifact
        await writer?.custom({
            type: 'data-artifact',
            data: {
                id: 'data-dictionary',
                title: 'Data Dictionary',
                kind: 'sheet',
                content: csvContent,
            },
        });

        // Suspend and wait for user approval
        return await suspend({
            message:
                'Before I begin, please review and approve (or edit then approve) the data dictionary to shred the document.',
            targetDocumentId: inputData.targetDocumentId,
        });
    },
});


Route handler in backend:

  const agent1 = mastra.getAgent('baseAgent');

  const stream = createUIMessageStream({
    execute: async ({ writer }) => {
      const result1 = await agent1.stream(finalMessages, {
        runtimeContext,
        stopWhen: stepCountIs(100),
        memory: {
          resource: user.userID,
          thread: chatId,
        },
      });
      for await (const part of toAISdkFormat(result1, { from: 'agent' })) {
        writer.write(part);
      }
    },
    generateId: generateUUID,
    onFinish: async ({ messages: finished }) => {
      await saveMessages({
        messages: finished
          .filter((m) => m.role !== 'user')
          .map((m) => ({
            id: isUuidV4(m.id) ? m.id : generateUUID(),
            role: m.role,
            parts: m.parts as ChatMessagePart[],
            createdAt: new Date(),
            attachments: [],
            chatId,
          })),
      });
    },
    onError: (error) => {
      context.logger.error('UIMessageStream error', { error, chatId });
      return 'Oops, an error occurred!';
    },
  });

  const sseStream = stream.pipeThrough(new JsonToSseTransformStream());
  const reader = sseStream.getReader();


part of agent definition:

  model: openai('gpt-4.1'),
  memory,
  tools: {
    listFilesTool,
    readFileTool,
    createTextDocumentTool,
    updateTextDocumentTool,
    applyAIPatchTool,
  },
  agents: {
    codingAgent,
    contentWritingAgent,
  },
  workflows: {
    complianceMatrixWorkflow,
  },
Was this page helpful?