Workflow streaming

Trust: ★★★☆☆ (0.90) · 0 validations · developer_reference

Published: 2026-05-10 · Source: crawler_authoritative

Tình huống

Mastra developer guide for configuring workflow streaming to send incremental results and partial progress to users or upstream agents during workflow execution.

Insight

Workflow streaming in Mastra enables workflows to send incremental results while they execute, rather than waiting until completion. Streams can be written to in two main ways: from within a workflow step using the writer argument (a writable stream), or by piping an agent’s stream output directly into a workflow step’s writer. Every workflow step’s execute function receives a writer argument that can emit custom events, data, or values into the active stream. Custom events are written using await writer?.write({ type: "custom-event", status: "pending" }) and should include a type and status. Stream chunks can be inspected to access any custom fields such as event types, intermediate values, or step-specific data. When a workflow is suspended (status === ‘suspended’), it can be resumed using the resumeStream method which returns a new ReadableStream. Agent streaming can be combined with workflow streams by piping an agent’s textStream to the workflow step’s writer using .pipeTo(writer!). Mastra automatically aggregates the agent’s usage into the workflow run when using this pattern. The execute function receives inputData, writer, and mastra arguments when needed to access agents.

Hành động

To enable workflow streaming: 1) Import createStep from @mastra/core/workflows. 2) In your step’s execute function, use the writer argument to emit events with await writer?.write({ type: "custom-event", status: "..." }). 3) Important: Always await the writer.write() call to avoid ‘WritableStream is locked’ errors. 4) Access stream output by calling workflow.createRun() then run.stream({ inputData: {...} }) and iterating with for await (const chunk of stream). 5) To resume an interrupted stream, check if result.status === 'suspended' and call run.resumeStream({ resumeData: {...} }). 6) To pipe agent output, get agent with mastra?.getAgent('agentName'), call agent.stream() to get a stream, then pipe with stream.textStream.pipeTo(writer!). The execute function receives an object with inputData, writer, and optionally mastra for accessing agents.

Kết quả

The workflow emits incremental stream chunks containing custom event data and intermediate results. Interrupted or suspended workflows can be resumed with a new stream. Agent text streams are automatically piped into the workflow writer and their usage is aggregated into the workflow run.

Điều kiện áp dụng

The writer argument requires async/await to prevent stream locking errors. Agent streaming via textStream.pipeTo() requires a Mastra agent instance accessible via mastra?.getAgent(). The resumeStream method only applies when workflow status is ‘suspended’.


Nội dung gốc (Original)

Workflow streaming

Workflow streaming in Mastra enables workflows to send incremental results while they execute, rather than waiting until completion. This allows you to surface partial progress, intermediate states, or progressive data directly to users or upstream agents and workflows.

Streams can be written to in two main ways:

  • From within a workflow step: every workflow step receives a writer argument, which is a writable stream you can use to push updates as execution progresses.
  • From an agent stream: you can also pipe an agent’s stream output directly into a workflow step’s writer, making it convenient to chain agent responses into workflow results without extra glue code.

By combining writable workflow streams with agent streaming, you gain fine-grained control over how intermediate results flow through your system and into the user experience.

Using the writer argument

The writer argument is passed to a workflow step’s execute function and can be used to emit custom events, data, or values into the active stream. This enables workflow steps to provide intermediate results or status updates while execution is still in progress.

Warning: You must await the call to writer.write(...) or else you will lock the stream and get a WritableStream is locked error.

import { createStep } from "@mastra/core/workflows";
 
export const testStep = createStep({
  execute: async ({ inputData, writer }) => {
    const { value } = inputData;
 
    await writer?.write({
      type: "custom-event",
      status: "pending"
    });
 
    const response = await fetch(...);
 
    await writer?.write({
      type: "custom-event",
      status: "success"
    });
 
    return {
      value: ""
    };
  },
});

Inspecting workflow stream payloads

Events written to the stream are included in the emitted chunks. These chunks can be inspected to access any custom fields, such as event types, intermediate values, or step-specific data.

const testWorkflow = mastra.getWorkflow('testWorkflow')
 
const run = await testWorkflow.createRun()
 
const stream = await run.stream({
  inputData: {
    value: 'initial data',
  },
})
 
for await (const chunk of stream) {
  console.log(chunk)
}
 
if (result!.status === 'suspended') {
  // if the workflow is suspended, we can resume it with the resumeStream method
  const resumedStream = await run.resumeStream({
    resumeData: { value: 'resume data' },
  })
 
  for await (const chunk of resumedStream) {
    console.log(chunk)
  }
}

Resuming an interrupted workflow stream

If a workflow stream is closed or interrupted for any reason, you can resume it with the resumeStream method. This will return a new ReadableStream that you can use to observe the workflow events.

const newStream = await run.resumeStream()
 
for await (const chunk of newStream) {
  console.log(chunk)
}

Workflow using an agent

Pipe an agent’s textStream to the workflow step’s writer. This streams partial output, and Mastra automatically aggregates the agent’s usage into the workflow run.

import { createStep } from '@mastra/core/workflows'
import { z } from 'zod'
 
export const testStep = createStep({
  execute: async ({ inputData, mastra, writer }) => {
    const { city } = inputData
 
    const testAgent = mastra?.getAgent('testAgent')
    const stream = await testAgent?.stream(`What is the weather in ${city}$?`)
 
    await stream!.textStream.pipeTo(writer!)
 
    return {
      value: await stream!.text,
    }
  },
})

Liên kết

Xem thêm: