Workflows overview

Trust: ★★★☆☆ (0.90) · 0 validations · developer_reference

Published: 2026-05-10 · Source: crawler_authoritative

Tình huống

Mastra framework guide for defining and executing multi-step workflows with structured steps, input/output schemas, and built-in execution engine.

Insight

Mastra workflows enable developers to define complex sequences of tasks using clear, structured steps rather than relying on single agent reasoning. The core APIs are createStep() for defining building blocks with inputSchema and outputSchema using Standard JSON Schema (Zod, Valibot, ArkType), and createWorkflow() for composing steps with .then() and .commit(). Workflows support suspension, resumption, and streaming results. Workflow state allows sharing values across steps via stateSchema, accumulating results without passing through every step. Workflows can be composed as steps within larger workflows (workflows as steps), and cloned using cloneWorkflow() for independent execution tracking. The result type is a discriminated union with status: success, failed, suspended, tripwire, or paused. Unique properties vary by status: success has result, failed has error, suspended has suspendPayload and suspended array. Control flow composition methods are documented separately. Workflows run using the built-in execution engine by default or can be deployed to workflow runners like Inngest.

Hành động

To create a workflow step: use createStep() with id, inputSchema, outputSchema (both Standard JSON Schema), and execute function receiving inputData. To create a workflow: use createWorkflow() with inputSchema and outputSchema, add steps using .then(stepName), complete with .commit(). Register in Mastra instance: new Mastra({ workflows: { workflowName } }). Reference via mastra.getWorkflow(‘registrationKey’) for full type inference. Run using testWorkflow.createRun().start({ inputData }) for blocking execution or .stream() for progress tracking. Both return result with status property checked first before accessing status-specific fields. Restart active runs with restartAllActiveWorkflowRuns() for all runs, or restart() on a specific run instance. Access request context via requestContext.get() in step execute functions.

Kết quả

Workflows execute defined steps in sequence, return a discriminated union result with status, input, steps, and optional state. Success status includes the workflow output. Streaming returns fullStream chunks during execution plus final result. Active runs can be restarted from last active step on server reconnection.


Nội dung gốc (Original)

Workflows overview

Workflows let you define complex sequences of tasks using clear, structured steps rather than relying on the reasoning of a single agent. They give you full control over how tasks are broken down, how data moves between them, and what gets executed when. Workflows run using the built-in execution engine by default, or can be deployed to workflow runners like Inngest for managed infrastructure.

Workflows overview

When to use workflows

Use workflows for tasks that are clearly defined upfront and involve multiple steps with a specific execution order. They give you fine-grained control over how data flows and transforms between steps, and which primitives are called at each stage.

Tip: Watch an introduction to workflows, and how they compare to agents on YouTube (7 minutes).

Core principles

Mastra workflows operate using these principles:

  • Defining steps with createStep, specifying input/output schemas and business logic.
  • Composing steps with createWorkflow to define the execution flow.
  • Running workflows to execute the entire sequence, with built-in support for suspension, resumption, and streaming results.

Creating a workflow step

Steps are the building blocks of workflows. Create a step using createStep() with inputSchema and outputSchema to define the data it accepts and returns. You can define both schemas using Standard JSON Schema (Zod, Valibot, ArkType, etc.).

The execute function defines what the step does. Use it to call functions in your codebase, external APIs, agents, or tools.

Zod:

import { createStep } from '@mastra/core/workflows'
import { z } from 'zod'
 
const step1 = createStep({
  id: 'step-1',
  inputSchema: z.object({
    message: z.string(),
  }),
  outputSchema: z.object({
    formatted: z.string(),
  }),
  execute: async ({ inputData }) => {
    const { message } = inputData
 
    return {
      formatted: message.toUpperCase(),
    }
  },
})

Valibot:

import { createStep } from '@mastra/core/workflows'
import * as v from 'valibot'
import { toStandardJsonSchema } from '@valibot/to-json-schema'
 
const step1 = createStep({
  id: 'step-1',
  inputSchema: toStandardJsonSchema(
    v.object({
      message: v.string(),
    }),
  ),
  outputSchema: toStandardJsonSchema(
    v.object({
      formatted: v.string(),
    }),
  ),
  execute: async ({ inputData }) => {
    const { message } = inputData
 
    return {
      formatted: message.toUpperCase(),
    }
  },
})

ArkType:

import { createStep } from '@mastra/core/workflows'
import { type } from 'arktype'
 
const step1 = createStep({
  id: 'step-1',
  inputSchema: type({
    message: 'string',
  }),
  outputSchema: type({
    formatted: 'string',
  }),
  execute: async ({ inputData }) => {
    const { message } = inputData
 
    return {
      formatted: message.toUpperCase(),
    }
  },
})

Info: Visit Step for a full list of configuration options.

Using agents and tools

Workflow steps can also call registered agents or import and execute tools directly, visit the Using Tools page for more information.

Creating a workflow

Create a workflow using createWorkflow() with inputSchema and outputSchema to define the data it accepts and returns. You can define both schemas using Standard JSON Schema (Zod, Valibot, ArkType, etc.). Add steps using .then() and complete the workflow with .commit().

Zod:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
 
const step1 = createStep({...});
 
export const testWorkflow = createWorkflow({
  id: "test-workflow",
  inputSchema: z.object({
    message: z.string()
  }),
  outputSchema: z.object({
    output: z.string()
  })
})
  .then(step1)
  .commit();

Valibot:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import * as v from "valibot";
import { toStandardJsonSchema } from "@valibot/to-json-schema";
 
const step1 = createStep({...});
 
export const testWorkflow = createWorkflow({
  id: "test-workflow",
  inputSchema: toStandardJsonSchema(v.object({
    message: v.string()
  })),
  outputSchema: toStandardJsonSchema(v.object({
    output: v.string()
  }))
})
  .then(step1)
  .commit();

ArkType:

import { createWorkflow, createStep } from "@mastra/core/workflows";
import { type } from "arktype";
 
const step1 = createStep({...});
 
export const testWorkflow = createWorkflow({
  id: "test-workflow",
  inputSchema: type({
    message: "string"
  }),
  outputSchema: type({
    output: "string"
  })
})
  .then(step1)
  .commit();

Info: Visit Workflow Class for a full list of configuration options.

Understanding control flow

Workflows can be composed using a number of different methods. The method you choose determines how each step’s schema should be structured. Visit the Control Flow page for more information.

Studio

Open Studio and select a workflow from the Workflows tab.

  • Graph view: The center panel visualizes the workflow’s steps and execution flow.
  • Input form: The right sidebar generates a form from the workflow’s inputSchema. Fill it in and start the run.
  • Live status: During execution, the graph updates each step’s status in real time. The sidebar shows the workflow’s input, output, state, and logs.
  • Time travel: After a run completes, replay individual steps to inspect or retry them.

Workflow state

Workflow state lets you share values across steps without passing them through every step’s inputSchema and outputSchema. Use state for tracking progress, accumulating results, or sharing configuration across the entire workflow.

const step1 = createStep({
  id: 'step-1',
  inputSchema: z.object({ message: z.string() }),
  outputSchema: z.object({ formatted: z.string() }),
  stateSchema: z.object({ counter: z.number() }),
  execute: async ({ inputData, state, setState }) => {
    // Read from state
    console.log(state.counter)
 
    // Update state for subsequent steps
    setState({ ...state, counter: state.counter + 1 })
 
    return { formatted: inputData.message.toUpperCase() }
  },
})

Info: Visit Workflow State for complete documentation on state schemas, initial state, persistence across suspend/resume, and nested workflows.

Workflows as steps

Use a workflow as a step to reuse its logic within a larger composition. Input and output follow the same schema rules described in Core principles.

const step1 = createStep({...});
const step2 = createStep({...});
 
const childWorkflow = createWorkflow({
  id: "child-workflow",
  inputSchema: z.object({
    message: z.string()
  }),
  outputSchema: z.object({
    emphasized: z.string()
  })
})
  .then(step1)
  .then(step2)
  .commit();
 
export const testWorkflow = createWorkflow({
  id: "test-workflow",
  inputSchema: z.object({
    message: z.string()
  }),
  outputSchema: z.object({
    emphasized: z.string()
  })
})
  .then(childWorkflow)
  .commit();

Cloning a workflow

Clone a workflow using cloneWorkflow() when you want to reuse its logic but track it separately under a new ID. Each clone runs independently and shows up as a distinct workflow in logs and observability tools.

import { cloneWorkflow } from "@mastra/core/workflows";
 
const step1 = createStep({...});
 
const parentWorkflow = createWorkflow({...})
const clonedWorkflow = cloneWorkflow(parentWorkflow, { id: "cloned-workflow" });
 
export const testWorkflow = createWorkflow({...})
  .then(step1)
  .then(clonedWorkflow)
  .commit();

Registering a workflow

Register your workflow in the Mastra instance to make it available throughout your application. Once registered, it can be called from agents or tools and has access to shared resources such as logging and observability features:

import { Mastra } from '@mastra/core/mastra'
import { testWorkflow } from './workflows/test-workflow'
 
export const mastra = new Mastra({
  workflows: { testWorkflow },
})

Referencing a workflow

You can run workflows from agents, tools, the Mastra Client, or the command line. Get a reference by calling .getWorkflow() on your mastra or mastraClient instance, depending on your setup:

const testWorkflow = mastra.getWorkflow('testWorkflow')

Info: mastra.getWorkflow() is preferred over a direct import for two reasons:

  1. It provides access to the Mastra instance configuration (logger, telemetry, storage, registered agents, and vector stores)
  2. It provides full TypeScript type inference for workflow input and output schemas

Note: Use getWorkflow() with the workflow’s registration key (the key used when adding it to Mastra). While getWorkflowById() is available for retrieving workflows by their id property, it doesn’t provide the same level of type inference.

Running workflows

Workflows can be run in two modes: start waits for all steps to complete before returning, and stream emits events during execution. Choose the approach that fits your use case: start when you only need the final result, and stream when you want to monitor progress or trigger actions as steps complete.

.start():

Create a workflow run instance using createRun(), then call .start() with inputData matching the workflow’s inputSchema. The workflow executes all steps and returns the final result.

const run = await testWorkflow.createRun()
 
const result = await run.start({
  inputData: {
    message: 'Hello world',
  },
})
 
if (result.status === 'success') {
  console.log(result.result)
}

.stream():

Create a workflow run instance using .createRun(), then call .stream() with inputData matching the workflow’s inputSchema. Iterate over fullStream to track progress, then await result to get the final workflow result.

const run = await testWorkflow.createRun()
 
const stream = run.stream({
  inputData: {
    message: 'Hello world',
  },
})
 
for await (const chunk of stream.fullStream) {
  console.log(chunk)
}
 
// Get the final result (same type as run.start())
const result = await stream.result
 
if (result.status === 'success') {
  console.log(result.result)
}

Workflow result type

Both run.start() and stream.result return a discriminated union based on the status property, which can be success, failed, suspended, tripwire, or paused. You can always safely access result.status, result.input, result.steps, and optionally result.state regardless of the status.

Additionally, depending on the status, different properties are available:

StatusUnique propertiesDescription
successresultThe workflow’s output data
failederrorThe error that caused the failure
tripwiretripwireContains reason, retry?, metadata?, processorId?
suspendedsuspendPayload, suspendedSuspension data and array of suspended step paths
paused(none)Only common properties available

To access status-specific properties, check the status first:

const result = await run.start({ inputData: { message: 'Hello world' } })
 
if (result.status === 'success') {
  console.log(result.result) // Only available when status is "success"
} else if (result.status === 'failed') {
  console.log(result.error.message)
} else if (result.status === 'suspended') {
  console.log(result.suspendPayload)
}

Workflow output

Here’s an example of a successful workflow result, showing the input, steps, and result properties:

{
  "status": "success",
  "steps": {
    "step-1": {
      "status": "success",
      "payload": {
        "message": "Hello world"
      },
      "output": {
        "formatted": "HELLO WORLD"
      }
    },
    "step-2": {
      "status": "success",
      "payload": {
        "formatted": "HELLO WORLD"
      },
      "output": {
        "emphasized": "HELLO WORLD!!!"
      }
    }
  },
  "input": {
    "message": "Hello world"
  },
  "result": {
    "emphasized": "HELLO WORLD!!!"
  }
}

Restarting active workflow runs

When a workflow run loses connection to the server, it can be restarted from the last active step. This is useful for long-running workflows that might still be running when the server loses connection. Restarting a workflow run will resume execution from the last active step, and the workflow will continue from there.

Restarting all active workflow runs of a workflow with restartAllActiveWorkflowRuns()

Use restartAllActiveWorkflowRuns() to restart all active workflow runs of a workflow. This helps restart all active workflow runs of a workflow, without having to manually loop through each run and restart.

workflow.restartAllActiveWorkflowRuns()

Restarting an active workflow run with restart()

Use restart() to restart an active workflow run from the last active step. This will resume execution from the last active step, and the workflow will continue from there.

const run = await workflow.createRun()
 
const result = await run.start({ inputData: { value: 'initial data' } })
 
const restartedResult = await run.restart()

Identifying active workflow runs

When a workflow run is active, it will have a status of running or waiting. You can check the workflow’s status to confirm it’s active, and use active to identify the active workflow run.

const activeRuns = await workflow.listActiveWorkflowRuns()
if (activeRuns.runs.length > 0) {
  console.log(activeRuns.runs)
}

Note: When running the local mastra server, all active workflow runs will be restarted automatically when the server starts.

Using RequestContext

Use RequestContext to access request-specific values. This lets you conditionally adjust behavior based on the context of the request.

export type UserTier = {
  'user-tier': 'enterprise' | 'pro'
}
 
const step1 = createStep({
  execute: async ({ requestContext }) => {
    const userTier = requestContext.get('user-tier') as UserTier['user-tier']
 
    const maxResults = userTier === 'enterprise' ? 1000 : 50
 
    return { maxResults }
  },
})

Info: Visit Request Context for more information.

Tip: For type-safe request context schema validation, see Schema Validation.

For a closer look at workflows, see our Workflow Guide, which walks through the core concepts with a practical example.

Liên kết

Xem thêm: