Using writer in workflow step with agent

Hello, I have a workflow step (see below) and I am trying to use the writer given the documentation, which I've followed. Lacking a specific example, I am: 1) unclear on how this is supposed to work, and 2) not seeing anything from my agent in the stream chunks via the browser client. Can someone tell me how this should work? My step:
export const advocateStep = createStep({
id: "advocate",
description: "Decide next action and draft external or ask user",
stateSchema,
inputSchema: messageSchema,
outputSchema: advocateOutputSchema,
execute: async ({
writer,
state,
setState,
inputData: { message, source },
}) => {
const { threadId, resourceId } = state;

const prompt = `
New message from ${source}: ${message}
`;

const stream = await consumerSupportAgent.stream(prompt, {
structuredOutput: {
schema: advocateOutputSchema,
},
maxSteps: 1,
memory: {
thread: threadId,
resource: resourceId,
},
});

await stream!.textStream.pipeTo(writer!);

const response = await stream.getFullOutput();

console.log("**", response);

const { responseToUser, responseToThirdParty, status, awaiting } =
response.object;

setState({ ...state, status, awaiting });

return { status, responseToUser, responseToThirdParty, awaiting };
},
});
export const advocateStep = createStep({
id: "advocate",
description: "Decide next action and draft external or ask user",
stateSchema,
inputSchema: messageSchema,
outputSchema: advocateOutputSchema,
execute: async ({
writer,
state,
setState,
inputData: { message, source },
}) => {
const { threadId, resourceId } = state;

const prompt = `
New message from ${source}: ${message}
`;

const stream = await consumerSupportAgent.stream(prompt, {
structuredOutput: {
schema: advocateOutputSchema,
},
maxSteps: 1,
memory: {
thread: threadId,
resource: resourceId,
},
});

await stream!.textStream.pipeTo(writer!);

const response = await stream.getFullOutput();

console.log("**", response);

const { responseToUser, responseToThirdParty, status, awaiting } =
response.object;

setState({ ...state, status, awaiting });

return { status, responseToUser, responseToThirdParty, awaiting };
},
});
6 Replies
flippyhead
flippyheadOP5d ago
The docs from here: https://mastra.ai/en/docs/streaming/workflow-streaming#inspecting-workflow-stream-payloads seem to show the written custom event on a result but the code doesn't indicate where that result should come from.
Workflow Streaming | Streaming | Mastra
Learn how to use workflow streaming in Mastra, including handling workflow execution events, step streaming, and workflow integration with agents and tools.
Mastra Triager
GitHub
[DISCORD:1428490649078071407] Using writer in workflow step with ag...
This issue was created from Discord post: https://discord.com/channels/1309558646228779139/1428490649078071407 Hello, I have a workflow step (see below) and I am trying to use the writer given the ...
_roamin_
_roamin_4d ago
Hey @flippyhead ! Could you try with await stream!.fullStream.pipeTo(writer!); instead?
gorpacrate
gorpacrate3d ago
I'm facing the same issue. Trying to stream a workflow that has a step with an agent stream inside. I can see agent's chunks appearing in stream = agent.stream(). I'm doing await stream.fullStream.pipeTo(writer); I don't see agent's chunks in workflowRun.streamVNext() @Romain Actually that happens only when resuming the stream workflowRun.resumeStreamVNext(). This one-liner seems to fix it – I now see agent chunks in workflow stream: https://github.com/mastra-ai/mastra/pull/9003 Though toAISdkFormat(stream, { from: "workflow" }) still cuts out the "workflow-step-output" chunks – I'm not very familiar with ai-sdk and not yet sure how it should translate. If I pass stream as-is without transforming it toAISdkFormat I can see these chunks on client too.
gorpacrate
gorpacrate2d ago
upd: this fixes toAISdkFormat for workflow streams. Probably it needs more thought because it looks like it won't work with nested workflows.
No description
flippyhead
flippyheadOP17h ago
Sorry, I am confused. I think what this means is that this does not work in the way the documentation says it should Is that right? @gorpacrate @Romain toAISdkFormat -- is that something that is in the PR or something additional we must do on the client, once the PR is merged?

Did you find this page helpful?