Error Handling in Mastra Workflows
Trust: ★★★☆☆ (0.90) · 0 validations · developer_reference
Published: 2026-05-10 · Source: crawler_authoritative
Tình huống
Documentation for developers configuring error handling, retries, and lifecycle callbacks in Mastra workflow executions.
Insight
Mastra workflows provide three error handling mechanisms: result status checks after execution, retry policies for transient failures, and lifecycle callbacks for centralized error management. The result object contains status (one of ‘success’, ‘failed’, ‘suspended’, or ‘tripwire’), result (workflow output on success), error (error details on failure), and steps (individual step results with their own status). Developers can inspect individual step results to locate failures by iterating result.steps and checking each stepResult.status. Lifecycle callbacks include onFinish, called when a workflow completes with any status, receiving status, result, error, steps, tripwire info, runId, workflowId, resourceId, getInitData(), mastra instance, requestContext, logger, and state. The onError callback fires only on ‘failed’ or ‘tripwire’ status, receiving the same properties. Both callbacks can be used together on the same workflow options object. Errors thrown inside callbacks are caught and logged but do not affect the workflow result. Retry configuration supports workflow-level retryConfig with attempts and delay properties applied to all steps, and step-level retries property that overrides workflow-level config. Conditional branching via .branch() accepts an array of [condition, step] tuples. Developers can use getStepResult() to inspect previous step results in a step’s execute function. The bail() helper exits a step early with successful result, while throw new Error() exits with failure. The stream() method allows monitoring workflow progress including stats via stream.stream iteration.
Hành động
To handle workflow results: run workflow with workflow.createRun().start(), then switch on result.status to handle ‘success’, ‘failed’, ‘suspended’, or ‘tripwire’ cases. To use lifecycle callbacks: pass onFinish and/or onError functions in the options object when calling createWorkflow(). The onFinish callback receives a result object with status, result, error, steps, tripwire, runId, workflowId, resourceId, getInitData(), mastra, requestContext, logger, and state properties. The onError callback receives errorInfo with status, error, steps, tripwire, runId, workflowId, resourceId, getInitData(), mastra, requestContext, logger, and state. For workflow-level retries: set retryConfig: { attempts: 5, delay: 2000 } in createWorkflow(). For step-level retries: add retries: 3 to the step definition object. For conditional branching: chain .branch([...]) with array of [conditionFunction, step] pairs after .then(). To check previous results: destructure getStepResult from execute function params and call getStepResult(step1). To exit early with success: call bail({ result: 'value' }) in step execute. To exit with error: throw new Error('message'). To monitor via stream: call run.stream() and iterate stream.stream for chunks including payload.output.stats.
Kết quả
Workflows return result objects with status ‘success’ when completed, ‘failed’ when errors occur, ‘suspended’ when awaiting manual resume, or ‘tripwire’ when human approval is needed. Individual steps within failed workflows can be identified by iterating result.steps. Lifecycle callbacks onFinish fires on any workflow completion status; onError fires only on ‘failed’ or ‘tripwire’ status. Retries automatically re-execute failed steps according to configured attempts and delay. Conditional branching routes workflow execution to different steps based on condition evaluation. The bail() function returns the provided payload as step output and terminates workflow execution early. Errors thrown in callbacks are caught and logged without affecting the workflow result.
Điều kiện áp dụng
Requires @mastra/core/workflows package. Workflow-level retryConfig and step-level retries properties may be used together; step-level retries override workflow-level config for that specific step.
Nội dung gốc (Original)
Error handling
Mastra workflows support error handling through result status checks after execution, retry policies for transient failures, and lifecycle callbacks for centralized error logging or alerting.
Handling workflow results
When you run a workflow, the result object contains the status and any errors that occurred.
Checking the result status
import { mastra } from './mastra'
const workflow = mastra.getWorkflow('myWorkflow')
const run = await workflow.createRun()
const result = await run.start({ inputData: { value: 'test' } })
switch (result.status) {
case 'success':
console.log('Workflow completed:', result.result)
break
case 'failed':
console.error('Workflow failed:', result.error)
break
case 'suspended':
console.log('Workflow suspended, waiting for resume')
break
}Result object structure
The result object contains:
status- The workflow status:'success','failed','suspended', or'tripwire'result- The workflow output (when status is'success')error- Error details (when status is'failed')steps- Individual step results with their status and output
Accessing step results
You can inspect individual step results to understand where a failure occurred:
const result = await run.start({ inputData: { value: 'test' } })
if (result.status === 'failed') {
// Find which step failed
for (const [stepId, stepResult] of Object.entries(result.steps)) {
if (stepResult.status === 'failed') {
console.error(`Step ${stepId} failed:`, stepResult.error)
}
}
}Lifecycle callbacks
For scenarios where you need to handle workflow completion without awaiting the result—such as background jobs, fire-and-forget workflows, or centralized logging—you can use lifecycle callbacks.
onFinish
Called when a workflow completes with any status (success, failed, suspended, or tripwire):
import { createWorkflow } from '@mastra/core/workflows'
import { z } from 'zod'
const orderWorkflow = createWorkflow({
id: 'order-processing',
inputSchema: z.object({ orderId: z.string() }),
outputSchema: z.object({ orderId: z.string(), status: z.string() }),
options: {
onFinish: async result => {
if (result.status === 'success') {
await db.updateOrderStatus(result.result.orderId, result.status)
}
await analytics.track('workflow_completed', {
workflowId: 'order-processing',
status: result.status,
})
},
},
})The onFinish callback receives:
status- The workflow statusresult- The workflow output (on success)error- Error details (on failure)steps- Individual step resultstripwire- Tripwire info (if status is'tripwire')runId- The unique identifier for this workflow runworkflowId- The workflow’s identifierresourceId- Optional resource identifier (if provided when creating the run)getInitData<any>()- Function that returns the initial input datamastra- The Mastra instance (if workflow is registered with Mastra)requestContext- Request-scoped context datalogger- The workflow’s logger instancestate- The workflow’s current state object
onError
Called only when a workflow fails (status is 'failed' or 'tripwire'):
import { createWorkflow } from '@mastra/core/workflows'
import { z } from 'zod'
const paymentWorkflow = createWorkflow({
id: 'payment-processing',
inputSchema: z.object({ amount: z.number() }),
outputSchema: z.object({ transactionId: z.string() }),
options: {
onError: async errorInfo => {
await alertService.notify({
channel: 'payments-alerts',
message: `Payment workflow failed: ${errorInfo.error?.message}`,
})
await errorTracker.capture(errorInfo.error)
},
},
})The onError callback receives:
status- Either'failed'or'tripwire'error- Error detailssteps- Individual step resultstripwire- Tripwire info (if status is'tripwire')runId- The unique identifier for this workflow runworkflowId- The workflow’s identifierresourceId- Optional resource identifier (if provided when creating the run)getInitData<any>()- Function that returns the initial input datamastra- The Mastra instance (if workflow is registered with Mastra)requestContext- Request-scoped context datalogger- The workflow’s logger instancestate- The workflow’s current state object
Using both callbacks
You can use both callbacks together:
import { createWorkflow } from '@mastra/core/workflows'
import { z } from 'zod'
const pipelineWorkflow = createWorkflow({
id: 'data-pipeline',
inputSchema: z.object({ source: z.string() }),
outputSchema: z.object({ recordsProcessed: z.number() }),
options: {
onFinish: async result => {
// Always log completion
await logger.info('Pipeline completed', { status: result.status })
},
onError: async errorInfo => {
// Alert on failures
await pagerDuty.alert('Data pipeline failed', errorInfo.error)
},
},
})Error handling in callbacks
Errors thrown inside callbacks are caught and logged—they won’t affect the workflow result or cause it to fail. This ensures that callback issues don’t break your workflows in production.
options: {
onFinish: async (result) => {
// If this throws, it's logged but the workflow result is unchanged
await externalService.notify(result);
},
}Retries
Mastra has a retry mechanism for workflows or steps that fail due to transient errors, for example when steps interact with external services or resources that may be temporarily unavailable.
Workflow-level using retryConfig
You can configure retries at the workflow level, which applies to all steps in the workflow:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
export const testWorkflow = createWorkflow({
retryConfig: {
attempts: 5,
delay: 2000
}
})
.then(step1)
.commit();Step-level using retries
You can configure retries for individual steps using the retries property. This overrides the workflow-level retry configuration for that specific step:
import { createWorkflow, createStep } from '@mastra/core/workflows'
import { z } from 'zod'
const step1 = createStep({
execute: async () => {
const response = await fetch('example-url')
if (!response.ok) {
throw new Error('Error')
}
return {
value: '',
}
},
retries: 3,
})Conditional branching
You can create alternative workflow paths based on the success or failure of previous steps using conditional logic:
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
execute: async () => {
try {
const response = await fetch('example-url');
if (!response.ok) {
throw new Error('error');
}
return {
status: "ok"
};
} catch (error) {
return {
status: "error"
};
}
}
});
const step2 = createStep({...});
const fallback = createStep({...});
export const testWorkflow = createWorkflow({})
.then(step1)
.branch([
[async ({ inputData: { status } }) => status === "ok", step2],
[async ({ inputData: { status } }) => status === "error", fallback]
])
.commit();Check previous step results
Use getStepResult() to inspect a previous step’s results.
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({...});
const step2 = createStep({
execute: async ({ getStepResult }) => {
const step1Result = getStepResult(step1);
return {
value: ""
};
}
});Exiting early with bail()
Use bail() in a step to exit early with a successful result. This returns the provided payload as the step output and ends workflow execution.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async ({ bail }) => {
return bail({ result: 'bailed' });
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();Exiting early with Error()
Use throw new Error() in a step to exit with an error.
import { createWorkflow, createStep } from "@mastra/core/workflows";
import { z } from "zod";
const step1 = createStep({
id: 'step1',
execute: async () => {
throw new Error('error');
},
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
});
export const testWorkflow = createWorkflow({...})
.then(step1)
.commit();You can monitor workflows for errors using stream:
import { mastra } from '../src/mastra'
const workflow = mastra.getWorkflow('testWorkflow')
const run = await workflow.createRun()
const stream = await run.stream({
inputData: {
value: 'initial data',
},
})
for await (const chunk of stream.stream) {
console.log(chunk.payload.output.stats)
}Related
Liên kết
- Nền tảng: Dev Framework · Mastra
- Nguồn: https://mastra.ai/docs/workflows/error-handling
Xem thêm: