AI agent multi-step workflows: building complex pipelines
How to design and build multi-step agent workflows. sequential chains, parallel execution, conditional branching, and human-in-the-loop checkpoints.
TL;DR: The first time I ran an agent with 3 sequential steps, it lost track by step 4. The state was gone, the error handling was missing, and the API bill kept growing. Multi-step workflows need state machines, not prompt engineering. Here are the 4 patterns that work.
Research from the ReAct paper (Yao et al., 2022) shows that interleaving reasoning steps with tool use significantly outperforms single-step LLM calls on complex tasks. The LangGraph documentation provides the graph-based architecture for multi-step agent workflows.
A single LLM call is not an agent workflow. It’s a completion. The real power, and the real complexity, starts when you chain multiple steps together: fetch data, analyse it, make decisions, take actions, verify results.
I’ve built multi-step workflows for document processing, content generation, customer support triage, and code review agents. The patterns repeat across all of them. Here’s what I’ve learned about orchestrating complex agent pipelines.
Key takeaways:
- Four core patterns: sequential, parallel fan-out, conditional branching, and loop with HITL checkpoints
- State management is the hardest part: persist state between steps so workflows survive failures
- Error handling at each step: retry → fallback → flag for human review
- LangGraph works well for complex state machines; custom works better for cost-sensitive or tightly integrated workflows
- Multi-step workflows cost 3-10x more than single-step agents: budget accordingly
What are the four multi-step workflow patterns?
Every multi-step agent workflow is a combination of these four patterns. Master these, and you can orchestrate anything.
1. Sequential chain
The simplest pattern: step A feeds into step B, which feeds into step C. Each step depends on the previous one.
class SequentialWorkflow:
def __init__(self, steps: list):
self.steps = steps # List of (name, handler) tuples
async def run(self, initial_input: dict) -> dict:
context = initial_input
for step_name, handler in self.steps:
print(f" → Running step: {step_name}")
try:
result = await handler(context)
context[step_name] = result
context["last_step"] = step_name
except Exception as e:
return {
"success": False,
"error": f"Step '{step_name}' failed: {str(e)}",
"context": context
}
return {"success": True, "context": context}
When to use: Any workflow where each step builds on the previous one. Document processing (extract → classify → redact → store), content generation (research → outline → draft → review), data pipelines.
Watch out for: Long chains where an error in step 2 wastes the work of step 1. Always check whether earlier steps can be rolled back or compensated.
2. Parallel fan-out
One agent analyses the input, determines sub-tasks, and spawns multiple worker agents that run in parallel. The results are collected and merged.
import asyncio
class ParallelFanOutWorkflow:
def __init__(self, planner, workers: list, merger):
self.planner = planner # Determines sub-tasks
self.workers = workers # List of worker agents
self.merger = merger # Combines results
async def run(self, task: str) -> dict:
# Step 1: Plan: break task into sub-tasks
sub_tasks = await self.planner.plan(task)
print(f" → Generated {len(sub_tasks)} sub-tasks")
# Step 2: Execute all sub-tasks in parallel
async def execute_worker(sub_task):
worker = self.workers[sub_task.type]
return await worker.run(sub_task)
results = await asyncio.gather(
*[execute_worker(st) for st in sub_tasks],
return_exceptions=True
)
# Step 3: Merge results
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
final = await self.merger.merge(successful)
return {
"success": len(failed) == 0,
"result": final,
"stats": {"total": len(sub_tasks), "succeeded": len(successful), "failed": len(failed)}
}
When to use: Research agents that search multiple sources, code review agents that analyse multiple files, content agents that generate multiple variations. Any task that can be decomposed into independent sub-tasks.
Watch out for: Cost explosion. If each worker makes multiple LLM calls, a 10-worker fan-out can generate 30-50 LLM calls per workflow run. Set budget limits per worker.
Always set a timeout for parallel workers. One stuck worker should not block the entire workflow. I use asyncio.wait_for(worker.run(task), timeout=30) per worker.
3. Conditional branching
The agent evaluates a condition and routes to different paths based on the result. If-else logic for agents.
class ConditionalBranchingWorkflow:
def __init__(self, router, branches: dict):
self.router = router # Evaluates conditions
self.branches = branches # {"condition_name": handler}
async def run(self, context: dict) -> dict:
# Evaluate routing condition
decision = await self.router.evaluate(context)
print(f" → Routing decision: {decision}")
# Execute the matching branch
handler = self.branches.get(decision)
if not handler:
return {"success": False, "error": f"No handler for decision: {decision}"}
result = await handler.run(context)
return {"success": True, "decision": decision, "result": result}
When to use: Support ticket triage (route to billing, technical, or account team), content moderation (allow, flag, or reject), dynamic workflow routing where the next step depends on data quality or content type.
Real example from a document processing pipeline I built:
async def route_document(context):
"""Router: decides which branch to take based on document type and confidence."""
doc_type = context.get("classification", {}).get("type")
confidence = context.get("classification", {}).get("confidence", 0)
if confidence < 0.6:
return "manual_review" # Low confidence: human needs to look
elif doc_type == "invoice":
return "invoice_processing" # Standard invoice path
elif doc_type == "contract":
return "contract_review" # Contract needs legal review
else:
return "general_processing" # Everything else
4. Loop with human-in-the-loop
The agent runs autonomously until it reaches a checkpoint that requires human approval. It pauses, waits for input, then continues based on the human’s decision.
class HumanInTheLoopWorkflow:
def __init__(self, agent, checkpoints: list):
self.agent = agent
self.checkpoints = checkpoints # Steps that need human approval
async def run(self, task: str, notify_human, wait_for_approval):
context = {"task": task, "step": 0}
while True:
context["step"] += 1
# Run the agent for one step
result = await self.agent.step(context)
# Check if this step needs human approval
if result.get("checkpoint"):
# Notify human and wait
await notify_human({
"step": context["step"],
"summary": result.get("summary"),
"decision_needed": result.get("decision_point")
})
# This blocks until the human responds
approval = await wait_for_approval()
if approval.get("action") == "approve":
context["human_feedback"] = approval.get("notes", "")
continue
elif approval.get("action") == "reject":
return {"success": False, "reason": "Rejected by human", "context": context}
elif approval.get("action") == "modify":
context["modifications"] = approval.get("changes", {})
continue
else:
context["result"] = result
return {"success": True, "context": context}
When to use: Any workflow where mistakes have significant cost. Content publishing (review before publish), financial operations (approve before executing payments), code deployment (approve before merging), email campaigns (review before sending to 10K subscribers).
How I implement notifications: For production systems, I use Telegram bot notifications with inline buttons (Approve / Reject / Modify). For internal tools, a simple Slack message with threaded replies works. The key is making the human response asynchronous: the agent shouldn’t block waiting for a response; it should save state and resume when the human responds.
When should I use LangGraph vs custom workflows?
I’ve used both approaches extensively. Here’s my framework for deciding:
Use LangGraph when:
- Your workflow has complex state transitions (many possible states, conditional edges)
- You need built-in persistence (checkpointing, save/restore)
- Your team has existing LangChain experience
- The workflow has 5+ distinct stages
Build custom when:
- You need per-step cost tracking (LangGraph doesn’t have built-in budget management)
- Your workflow integrates with existing systems (queues, databases, monitoring)
- Error recovery requirements are specific (not just “retry 3 times”)
- You want to control which model each step uses
Here’s a custom state machine that I use for most production workflows:
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable
class WorkflowStatus(Enum):
PENDING = "pending"
RUNNING = "running"
AWAITING_HUMAN = "awaiting_human"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class WorkflowState:
workflow_id: str
status: WorkflowStatus
current_step: str = ""
step_history: list = field(default_factory=list)
data: dict = field(default_factory=dict)
errors: list = field(default_factory=list)
total_cost: float = 0.0
total_steps: int = 0
class WorkflowEngine:
def __init__(self, persistence=None):
self.persistence = persistence # Optional DB/s3 persistence
async def run(self, workflow_id: str, steps: dict, initial_data: dict):
"""steps: {"step_name": {"handler": callable, "next": str or callable}}"""
state = WorkflowState(
workflow_id=workflow_id,
status=WorkflowStatus.RUNNING,
data=initial_data
)
current = "start"
while current and state.status == WorkflowStatus.RUNNING:
step_def = steps.get(current)
if not step_def:
state.status = WorkflowStatus.FAILED
state.errors.append(f"Unknown step: {current}")
break
state.current_step = current
await self._persist(state)
try:
result = await step_def["handler"](state.data)
state.step_history.append({
"step": current,
"result": result.get("summary", "completed"),
"cost": result.get("cost", 0),
"timestamp": "2026-06-01T00:00:00Z"
})
state.total_cost += result.get("cost", 0)
state.total_steps += 1
# Check for human-in-the-loop checkpoint
if result.get("awaiting_human"):
state.status = WorkflowStatus.AWAITING_HUMAN
await self._persist(state)
# Workflow paused: will resume when human responds
return {"status": "awaiting_human", "state": state}
# Determine next step
next_step = step_def.get("next")
if callable(next_step):
current = next_step(result)
else:
current = next_step
except Exception as e:
state.errors.append({"step": current, "error": str(e)})
# Check for retry logic
retry = step_def.get("retry", 0)
if len([e for e in state.errors if e.get("step") == current]) <= retry:
continue # Retry the same step
if step_def.get("fallback"):
current = step_def["fallback"]
else:
state.status = WorkflowStatus.FAILED
break
if state.status == WorkflowStatus.RUNNING:
state.status = WorkflowStatus.COMPLETED
await self._persist(state)
return {"status": state.status.value, "state": state}
async def _persist(self, state: WorkflowState):
if self.persistence:
await self.persistence.save(state)
How do you handle errors across multi-step workflows?
The hardest problem in multi-step workflows: what happens when step 2 fails after step 1 succeeded?
You have three options:
1. Rollback. Undo the effects of earlier steps. This works when steps have clear compensation actions (e.g., if email step fails, delete the draft). It’s hard when steps have side effects that can’t be undone.
2. Compensate. Execute a compensating action instead of rolling back. If an API call to create a resource failed, archive the created resource instead of deleting it.
3. Flag for manual review. The safest option. Save the state, mark the workflow as needing human attention, and let a human decide what to do.
I use a combination: automatic retry for transient errors (3 retries with exponential backoff), compensation for known failure modes, and manual review for everything else.
async def execute_with_recovery(step_name: str, handler, context: dict, retries=3):
last_error = None
for attempt in range(retries):
try:
return await handler(context)
except TemporaryError as e:
wait = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
print(f" → Step {step_name} temporary failure (attempt {attempt+1}/{retries}), retrying in {wait}s")
await asyncio.sleep(wait)
last_error = e
except PermanentError as e:
print(f" → Step {step_name} permanent failure: {e}")
raise
# All retries exhausted: try compensation
print(f" → Step {step_name} failed after {retries} attempts, executing fallback")
return {
"success": False,
"error": str(last_error),
"fallback": "Flagged for manual review",
"context": context
}
How do you manage state across multiple agent steps?
State is the backbone of any multi-step workflow. Every step reads from it and writes to it. Getting state management right is the difference between a workflow you can debug and one you can’t.
What to include in state:
@dataclass
class WorkflowState:
# Identity
workflow_id: str
workflow_type: str
# Progress
status: str # pending, running, awaiting_human, completed, failed
current_step: str
completed_steps: list
# Data
input: dict # Original input
intermediate: dict # Step outputs, keyed by step name
final_output: dict # Final result
# Costs
total_cost: float
step_costs: dict # Per-step cost breakdown
# Errors
errors: list # Structured error log
retry_count: int
# Control
max_steps: int = 50
max_cost: float = 10.0
Persistence: I save state to a database (SQLite for simple workflows, Postgres for production) after every step. This means if the server crashes mid-workflow, we can resume from the last checkpoint.
# Save checkpoint after each step
await db.execute(
"INSERT INTO workflow_checkpoints (workflow_id, step, state) VALUES (?, ?, ?)
ON CONFLICT(workflow_id) DO UPDATE SET step = excluded.step, state = excluded.state",
[state.workflow_id, state.current_step, json.dumps(asdict(state))]
)
What are the cost implications of multi-step workflows?
Multi-step workflows are expensive. Here’s a real example from a content creation agent I built:
| Step | Model | Calls | Cost per call | Total |
|---|---|---|---|---|
| Research brief | gpt-4o | 1 | $0.03 | $0.03 |
| Search execution | gpt-4o | 3 (parallel) | $0.02 | $0.06 |
| Outline generation | gpt-4o | 1 | $0.04 | $0.04 |
| Draft section 1 | gpt-4o | 1 | $0.06 | $0.06 |
| Draft section 2 | gpt-4o | 1 | $0.05 | $0.05 |
| Draft section 3 | gpt-4o | 1 | $0.07 | $0.07 |
| Review and polish | gpt-4o | 1 | $0.03 | $0.03 |
| Total | 9 | $0.34 |
A single article costs $0.34 in API calls. Generate 100 articles: $34. That’s manageable.
But add retries (each retry reruns the step), add branching (some paths are longer than others), add human review loops (resume generates more calls), and the effective cost can be 3-5x the base estimate.
I set cost limits per workflow:
class BudgetAwareWorkflow:
def __init__(self, max_cost_per_run=2.0):
self.max_cost = max_cost_per_run
self.running_cost = 0.0
async def step(self, handler, context):
if self.running_cost >= self.max_cost:
return {"error": "Budget exceeded", "cost": self.running_cost}
result = await handler(context)
self.running_cost += result.get("cost", 0)
return result
Related: AI agent logging and monitoring: seeing inside your agent’s head: how to log, trace, and monitor multi-step workflows in production.
Related: CrewAI vs LangGraph: which AI agent framework should you use?: how the choice between CrewAI and LangGraph affects multi-step workflow design.
How do workflow patterns compose in production?
Here’s a real multi-step agent I built for content generation. It uses sequential, parallel, and conditional patterns together:
# Content Generation Agent Workflow
# Pattern: Sequential + Parallel + Conditional + HITL
async def content_workflow(topic: str, publish: bool = False):
workflow = WorkflowEngine(persistence=Database())
steps = {
"research": {
"handler": research_topic,
"next": "outline"
},
"outline": {
"handler": generate_outline,
"next": "write_sections"
},
"write_sections": {
# Parallel fan-out: write each section independently
"handler": parallel_write_sections,
"next": "review"
},
"review": {
# Conditional: if quality check fails, loop back
"handler": quality_check,
"next": lambda r: "rewrite" if r.get("quality") < 0.8 else "human_review"
},
"rewrite": {
"handler": rewrite_section,
"next": "review", # Loop back for re-check
"retry": 2 # Max 3 attempts (initial + 2 retries)
},
"human_review": {
# HITL checkpoint
"handler": request_human_approval,
"next": lambda r: "publish" if r.get("approved") else "rejected"
},
"publish": {
"handler": publish_article if publish else save_draft,
"next": None # End
},
"rejected": {
"handler": notify_rejection,
"next": None
}
}
return await workflow.run(f"content-{slugify(topic)}", steps, {"topic": topic})
The key insight: all four patterns compose naturally. A sequential workflow can have parallel steps within it. A parallel fan-out can have conditional branches per worker. A HITL checkpoint can appear at any point.
Master the patterns individually, then compose them freely. That’s how you build production agent workflows that handle real complexity.
FAQ
What are the four main multi-step agent workflow patterns? Sequential chains (step A → B → C), parallel fan-out (one agent spawns many workers), conditional branching (if X go left, if Y go right), and loop with human-in-the-loop checkpoint (automated work with manual approval gates).
When should I use LangGraph vs building custom workflows? LangGraph is good for complex sequential workflows with clear stages and state management needs. Build custom when you need full control over state, cost tracking, error recovery, or integration with existing systems.
How do I handle errors in multi-step agent workflows? Each step should have retry logic with exponential backoff, a fallback handler, and clear error propagation. When step 2 fails after step 1 succeeded, you need a compensation strategy: either roll back step 1 or flag for manual intervention.
What’s the cost of complex multi-step workflows? Each step adds LLM call costs. A 5-step agent workflow processing 100 inputs/day at $0.03 per step costs $15/day in API calls alone. Complex workflows with branching can double or triple this due to wasted steps and retries.
Related Posts
- Build a state machine for your AI agent in a weekend. The 6-state FSM that drives the turn lifecycle, the foundation for any workflow orchestration
- The policy gate every agent needs before production. How policy gates and human-in-the-loop checkpoints secure multi-step agent workflows
- AI agent error handling patterns. Retry strategies, fallback behaviors, and graceful degradation across workflow steps
- LangGraph tutorial for beginners. Build your first state graph workflow with conditional branching and human-in-the-loop
This article was published on Agentic Up (https://agenticup.dev): practical guides for developers and founders building with AI agents. Reach me at hello@agenticup.dev.