Agent Orchestration Patterns
Building production multi-agent systems requires more than wiring agents together. The orchestration pattern you choose fundamentally determines your system's reliability, scalability, and maintainability. This lesson covers the six essential orchestration patterns and how to implement them.
Orchestration Pattern: A reusable architectural blueprint that defines how agents communicate, who controls execution flow, and how state is managed across a multi-agent system.
The Six Key Patterns
┌──────────────────────────────────────────────────────────────┐
│ Orchestration Patterns │
│ │
│ 1. Sequential Pipeline A ──▶ B ──▶ C │
│ │
│ 2. Router/Dispatcher R ──┬▶ A │
│ ├▶ B │
│ └▶ C │
│ │
│ 3. Supervisor S ──┬▶ W1 │
│ ├▶ W2 (S delegates) │
│ └▶ W3 │
│ │
│ 4. Collaborative A ◀──▶ B ◀──▶ C (discuss) │
│ │
│ 5. Hierarchical Manager │
│ ├─ Team Lead A ─┬─ Worker 1 │
│ └─ Team Lead B └─ Worker 2 │
│ │
│ 6. Map-Reduce ┌▶ A ─┐ │
│ Fan─┼▶ B ─┼─▶ Aggregate │
│ out └▶ C ─┘ │
└──────────────────────────────────────────────────────────────┘
Pattern 1: Sequential Pipeline
The simplest pattern. Agents execute in a fixed order, with each agent's output feeding the next.
Input ──▶ [Research Agent] ──▶ [Analysis Agent] ──▶ [Writing Agent] ──▶ Output
When to use: Linear workflows where each step depends on the previous step's complete output.
from typing import Any
from dataclasses import dataclass, field
@dataclass
class PipelineState:
"""State object passed through the pipeline."""
input: str
results: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
class PipelineAgent:
"""Base agent for sequential pipelines."""
def __init__(self, name: str, system_prompt: str, model: str = "gpt-4o"):
self.name = name
self.system_prompt = system_prompt
self.model = model
def run(self, state: PipelineState) -> PipelineState:
from openai import OpenAI
client = OpenAI()
# Build context from previous steps
context = "\n\n".join(
f"[{k}]: {v}" for k, v in state.results.items()
)
user_message = f"Task: {state.input}\n\nPrevious steps:\n{context}" if context else state.input
response = client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": user_message},
],
)
state.results[self.name] = response.choices[0].message.content
return state
class SequentialPipeline:
"""Execute agents in a fixed sequence."""
def __init__(self, agents: list[PipelineAgent]):
self.agents = agents
def run(self, input_text: str) -> PipelineState:
state = PipelineState(input=input_text)
for agent in self.agents:
print(f" Running: {agent.name}")
state = agent.run(state)
return state
# Usage
pipeline = SequentialPipeline([
PipelineAgent("researcher", "You are a researcher. Gather key facts about the topic."),
PipelineAgent("analyst", "You are an analyst. Identify patterns and insights from the research."),
PipelineAgent("writer", "You are a writer. Create a clear, polished summary from the analysis."),
])
result = pipeline.run("Impact of AI agents on software development in 2025")
print(result.results["writer"])
Pros: Simple, predictable, easy to debug. Cons: Rigid, no parallelism, one failure blocks everything.
Pattern 2: Router/Dispatcher
A routing agent analyzes the input and dispatches to the most appropriate specialized agent.
┌──▶ [SQL Agent] ──┐
Input ──▶ [Router] ──▶ [Search Agent] ──▶ Output
└──▶ [Math Agent] ──┘
When to use: Diverse input types that require different specialized handling.
from openai import OpenAI
import json
class RouterDispatcher:
"""Route requests to specialized agents based on intent."""
def __init__(self, agents: dict[str, PipelineAgent]):
self.agents = agents
self.client = OpenAI()
def classify_intent(self, query: str) -> str:
"""Use an LLM to classify which agent should handle the query."""
agent_descriptions = "\n".join(
f"- {name}: {agent.system_prompt[:100]}"
for name, agent in self.agents.items()
)
response = self.client.chat.completions.create(
model="gpt-4o-mini", # Use a fast, cheap model for routing
messages=[
{
"role": "system",
"content": (
f"Classify the user's request into one of these categories. "
f"Reply with ONLY the category name.\n\n{agent_descriptions}"
),
},
{"role": "user", "content": query},
],
temperature=0,
)
intent = response.choices[0].message.content.strip().lower()
# Fallback to default if classification fails
if intent not in self.agents:
return list(self.agents.keys())[0]
return intent
def run(self, query: str) -> str:
intent = self.classify_intent(query)
print(f" Routed to: {intent}")
agent = self.agents[intent]
state = PipelineState(input=query)
state = agent.run(state)
return state.results[agent.name]
# Usage
router = RouterDispatcher({
"sql": PipelineAgent("sql", "You are a SQL expert. Write queries to answer data questions."),
"search": PipelineAgent("search", "You are a search agent. Find and summarize information."),
"math": PipelineAgent("math", "You are a math expert. Solve calculations step by step."),
"code": PipelineAgent("code", "You are a programmer. Write clean Python code."),
})
answer = router.run("What is the time complexity of merge sort?")
Cost Optimization: Use a small, fast model (like
gpt-4o-miniPros: Efficient specialization, easy to add new agents. Cons: Routing errors cascade, single point of failure.
Pattern 3: Supervisor
A supervisor agent dynamically delegates tasks to worker agents and synthesizes results. Unlike the router, the supervisor can issue multiple tasks, review results, and re-delegate.
┌──▶ [Worker 1] ──┐
[Supervisor] ──────┤ ├──▶ [Supervisor reviews & synthesizes]
└──▶ [Worker 2] ──┘
▲ │
└───────┘ (re-delegate if needed)
When to use: Complex tasks requiring dynamic decomposition and quality control.
class SupervisorAgent:
"""Supervisor that delegates to and coordinates worker agents."""
def __init__(self, workers: dict[str, PipelineAgent], model: str = "gpt-4o"):
self.workers = workers
self.model = model
self.client = OpenAI()
def run(self, task: str, max_iterations: int = 3) -> str:
worker_descriptions = "\n".join(
f"- {name}: {w.system_prompt[:80]}" for name, w in self.workers.items()
)
messages = [
{
"role": "system",
"content": (
f"You are a supervisor managing these workers:\n{worker_descriptions}\n\n"
f"Break down tasks and delegate to workers. "
f"Respond with JSON: {{\"delegate\": \"worker_name\", \"subtask\": \"description\"}} "
f"or {{\"final_answer\": \"your synthesized answer\"}} when done."
),
},
{"role": "user", "content": task},
]
all_results = {}
for i in range(max_iterations):
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
response_format={"type": "json_object"},
)
decision = json.loads(response.choices[0].message.content)
# Check if supervisor is done
if "final_answer" in decision:
return decision["final_answer"]
# Delegate to worker
worker_name = decision.get("delegate", "")
subtask = decision.get("subtask", "")
if worker_name in self.workers:
print(f" Delegating to {worker_name}: {subtask[:60]}...")
worker = self.workers[worker_name]
state = PipelineState(input=subtask)
state = worker.run(state)
result = state.results[worker.name]
all_results[worker_name] = result
messages.append({"role": "assistant", "content": response.choices[0].message.content})
messages.append({"role": "user", "content": f"Result from {worker_name}:\n{result}"})
# Force final synthesis if max iterations reached
messages.append({"role": "user", "content": "Synthesize all results into a final answer now."})
response = self.client.chat.completions.create(model=self.model, messages=messages)
return response.choices[0].message.content
# Usage
supervisor = SupervisorAgent({
"researcher": PipelineAgent("researcher", "Research facts and data about topics."),
"analyst": PipelineAgent("analyst", "Analyze data and identify patterns."),
"writer": PipelineAgent("writer", "Write clear, polished content."),
})
result = supervisor.run("Create a competitive analysis of the top 3 cloud providers")
Pros: Flexible, adaptive, quality control via review loops. Cons: Higher token cost, supervisor is a bottleneck.
Pattern 4: Collaborative
Agents discuss and iterate on a shared problem, building on each other's contributions. No single agent controls the flow.
[Agent A] ──▶ shared state ◀── [Agent B]
▲ │
└──────── [Agent C] ◀─────────┘
(iterate until convergence)
When to use: Creative tasks, brainstorming, quality refinement through multiple perspectives.
class CollaborativeTeam:
"""Agents collaborate through shared discussion."""
def __init__(self, agents: list[PipelineAgent], max_rounds: int = 3):
self.agents = agents
self.max_rounds = max_rounds
self.client = OpenAI()
def run(self, task: str) -> str:
conversation = [f"Task: {task}"]
for round_num in range(self.max_rounds):
print(f" Round {round_num + 1}/{self.max_rounds}")
for agent in self.agents:
context = "\n\n".join(conversation[-6:]) # Keep recent context
response = self.client.chat.completions.create(
model=agent.model,
messages=[
{"role": "system", "content": agent.system_prompt},
{
"role": "user",
"content": (
f"Discussion so far:\n{context}\n\n"
f"Contribute your perspective. Build on others' ideas, "
f"identify issues, or propose improvements."
),
},
],
)
reply = response.choices[0].message.content
conversation.append(f"[{agent.name}]: {reply}")
# Return the final state of discussion
return conversation[-1]
# Usage
team = CollaborativeTeam([
PipelineAgent("designer", "You are a system designer. Focus on architecture and scalability."),
PipelineAgent("security_expert", "You are a security expert. Identify vulnerabilities and mitigations."),
PipelineAgent("integrator", "You synthesize ideas into a coherent plan. Resolve conflicts."),
], max_rounds=2)
result = team.run("Design an authentication system for a multi-tenant SaaS platform")
Pros: Diverse perspectives, iterative refinement. Cons: Unpredictable convergence, high token usage, potential for circular discussions.
Pattern 5: Hierarchical
Nested teams with managers at each level. This mirrors organizational structures for complex projects.
┌────────────────┐
│ Director │
└───────┬────────┘
┌───────────┴────────────┐
┌─────┴─────┐ ┌──────┴─────┐
│ Team Lead │ │ Team Lead │
│ Backend │ │ Frontend │
└─────┬─────┘ └──────┬─────┘
┌────┴────┐ ┌─────┴────┐
[API Dev] [DB Dev] [UI Dev] [UX Dev]
When to use: Large-scale projects requiring domain decomposition and parallel team execution.
class HierarchicalOrchestrator:
"""Nested teams with managers at each level."""
def __init__(self, structure: dict):
"""
structure = {
"manager": PipelineAgent(...),
"teams": {
"backend": {
"lead": PipelineAgent(...),
"workers": [PipelineAgent(...), ...]
},
...
}
}
"""
self.structure = structure
self.client = OpenAI()
def run(self, task: str) -> dict:
manager = self.structure["manager"]
teams = self.structure["teams"]
# Manager decomposes task into team assignments
team_names = ", ".join(teams.keys())
decomposition = self.client.chat.completions.create(
model=manager.model,
messages=[
{
"role": "system",
"content": (
f"You manage these teams: {team_names}. "
f"Decompose the task into team assignments. "
f'Reply with JSON: {{"assignments": {{"team_name": "subtask"}}}}'
),
},
{"role": "user", "content": task},
],
response_format={"type": "json_object"},
)
assignments = json.loads(decomposition.choices[0].message.content)["assignments"]
team_results = {}
# Each team executes its assignment
for team_name, subtask in assignments.items():
if team_name in teams:
team = teams[team_name]
print(f" Team '{team_name}' working on: {subtask[:60]}...")
# Team lead coordinates workers
lead_pipeline = SequentialPipeline(
[team["lead"]] + team["workers"]
)
result = lead_pipeline.run(subtask)
team_results[team_name] = result.results
# Manager synthesizes team results
synthesis_prompt = "Team results:\n" + json.dumps(
{k: list(v.values())[-1] for k, v in team_results.items()},
indent=2
)
final = self.client.chat.completions.create(
model=manager.model,
messages=[
{"role": "system", "content": manager.system_prompt},
{"role": "user", "content": f"Original task: {task}\n\n{synthesis_prompt}\n\nSynthesize into a final deliverable."},
],
)
return {"team_results": team_results, "final": final.choices[0].message.content}
Pros: Scales to large problems, parallel team execution, clear responsibility. Cons: Complex setup, coordination overhead, slow for simple tasks.
Pattern 6: Map-Reduce
Parallelize work across agents, then aggregate results. Ideal for tasks that can be decomposed into independent subtasks.
┌──▶ [Agent] ── result 1 ──┐
Input ───┼──▶ [Agent] ── result 2 ──┼──▶ [Aggregator] ──▶ Output
└──▶ [Agent] ── result 3 ──┘
When to use: Analyzing multiple documents, processing parallel queries, any embarrassingly parallel task.
import asyncio
from openai import AsyncOpenAI
class MapReduceOrchestrator:
"""Parallel map step followed by a reduce/aggregation step."""
def __init__(self, mapper_prompt: str, reducer_prompt: str, model: str = "gpt-4o"):
self.mapper_prompt = mapper_prompt
self.reducer_prompt = reducer_prompt
self.model = model
self.async_client = AsyncOpenAI()
async def map_single(self, item: str) -> str:
"""Process a single item (map step)."""
response = await self.async_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.mapper_prompt},
{"role": "user", "content": item},
],
)
return response.choices[0].message.content
async def run(self, items: list[str]) -> str:
# Map: process all items in parallel
print(f" Mapping {len(items)} items in parallel...")
map_results = await asyncio.gather(
*[self.map_single(item) for item in items]
)
# Reduce: aggregate all results
print(f" Reducing {len(map_results)} results...")
combined = "\n\n".join(
f"[Item {i+1}]:\n{result}"
for i, result in enumerate(map_results)
)
response = await self.async_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": self.reducer_prompt},
{"role": "user", "content": combined},
],
)
return response.choices[0].message.content
# Usage
orchestrator = MapReduceOrchestrator(
mapper_prompt="Analyze this document and extract the 3 most important findings.",
reducer_prompt="Synthesize all findings into a unified executive summary.",
)
documents = [
"Q1 earnings report: Revenue grew 15%...",
"Customer satisfaction survey: NPS increased to 72...",
"Engineering report: System uptime at 99.97%...",
"Market analysis: Competitor X launched new product...",
]
result = asyncio.run(orchestrator.run(documents))
Async for Parallelism: The map-reduce pattern benefits enormously from async execution. Using
asyncio.gatherPros: Fast (parallel execution), scalable, simple to reason about. Cons: Items must be independent, aggregation can lose nuance.
Implementing Patterns with LangGraph
LangGraph provides a framework for building stateful, graph-based agent workflows. It is particularly powerful for patterns that need explicit state management and conditional routing.
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Literal
from langchain_openai import ChatOpenAI
import operator
class AgentState(TypedDict):
"""State shared across all nodes in the graph."""
messages: Annotated[list, operator.add]
task: str
research: str
analysis: str
final_output: str
next_step: str
def research_node(state: AgentState) -> dict:
"""Research node: gathers information."""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(
f"Research this topic thoroughly:\n{state['task']}"
)
return {"research": response.content, "messages": [response]}
def analysis_node(state: AgentState) -> dict:
"""Analysis node: analyzes research findings."""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(
f"Analyze these findings:\n{state['research']}"
)
return {"analysis": response.content, "messages": [response]}
def quality_check(state: AgentState) -> Literal["revise", "publish"]:
"""Conditional edge: check if analysis meets quality bar."""
llm = ChatOpenAI(model="gpt-4o-mini")
check = llm.invoke(
f"Does this analysis adequately address the task?\n"
f"Task: {state['task']}\n"
f"Analysis: {state['analysis']}\n"
f"Reply with exactly 'pass' or 'fail'."
)
return "publish" if "pass" in check.content.lower() else "revise"
def publish_node(state: AgentState) -> dict:
"""Publish node: formats the final output."""
llm = ChatOpenAI(model="gpt-4o")
response = llm.invoke(
f"Create a polished final output from:\n{state['analysis']}"
)
return {"final_output": response.content}
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("research", research_node)
workflow.add_node("analysis", analysis_node)
workflow.add_node("publish", publish_node)
# Add edges
workflow.set_entry_point("research")
workflow.add_edge("research", "analysis")
workflow.add_conditional_edges("analysis", quality_check, {
"revise": "research", # Loop back if quality is low
"publish": "publish", # Move forward if quality passes
})
workflow.add_edge("publish", END)
# Compile and run
app = workflow.compile()
result = app.invoke({
"task": "Analyze the impact of LLMs on software testing",
"messages": [],
"research": "",
"analysis": "",
"final_output": "",
"next_step": "",
})
print(result["final_output"])
LangGraph Execution Flow:
┌──────────┐ ┌──────────┐ ┌─────────────┐
│ Research │────▶│ Analysis │────▶│ Quality │
│ Node │ │ Node │ │ Check │
└──────────┘ └──────────┘ └──────┬──────┘
▲ │
│ "revise" "publish" │
└────────────────────────────────────▼
┌──────────┐
│ Publish │──▶ END
│ Node │
└──────────┘
Infinite Loops: When using conditional edges that loop back (like the quality check above), always include a maximum iteration guard. Without it, a strict quality check can loop indefinitely and burn through your API budget.
Choosing the Right Pattern
| Scenario | Recommended Pattern |
|---|---|
| Linear data processing pipeline | Sequential Pipeline |
| Customer support with specialized teams | Router/Dispatcher |
| Research with dynamic task decomposition | Supervisor |
| Document review with multiple reviewers | Collaborative |
| Large project with distinct workstreams | Hierarchical |
| Analyzing a batch of documents | Map-Reduce |
| Workflow with retry loops and conditionals | LangGraph + any pattern |
Key Takeaways
- Sequential pipelines are the simplest and should be your default unless you need more flexibility
- Router/dispatcher is the most cost-effective pattern for handling diverse input types
- Supervisor provides dynamic task management at the cost of token-heavy coordination
- Collaborative works best for creative tasks where diverse perspectives add value
- Hierarchical scales to large problems but adds organizational complexity
- Map-reduce maximizes throughput for parallelizable tasks
- LangGraph adds state management and conditional routing to any of these patterns