Back
advanced
Production Agentic Systems

Agent Orchestration Patterns

Master the architectural patterns for orchestrating complex multi-agent systems in production

30 min read· Orchestration· Patterns· Architecture· Multi-Agent

Agent Orchestration Patterns

Building production multi-agent systems requires more than wiring agents together. The orchestration pattern you choose fundamentally determines your system's reliability, scalability, and maintainability. This lesson covers the six essential orchestration patterns and how to implement them.

Orchestration Pattern: A reusable architectural blueprint that defines how agents communicate, who controls execution flow, and how state is managed across a multi-agent system.

The Six Key Patterns

┌──────────────────────────────────────────────────────────────┐
│                  Orchestration Patterns                       │
│                                                              │
│  1. Sequential Pipeline    A ──▶ B ──▶ C                    │
│                                                              │
│  2. Router/Dispatcher      R ──┬▶ A                         │
│                                ├▶ B                         │
│                                └▶ C                         │
│                                                              │
│  3. Supervisor             S ──┬▶ W1                        │
│                                ├▶ W2     (S delegates)      │
│                                └▶ W3                        │
│                                                              │
│  4. Collaborative          A ◀──▶ B ◀──▶ C  (discuss)      │
│                                                              │
│  5. Hierarchical           Manager                          │
│                            ├─ Team Lead A ─┬─ Worker 1      │
│                            └─ Team Lead B  └─ Worker 2      │
│                                                              │
│  6. Map-Reduce             ┌▶ A ─┐                          │
│                        Fan─┼▶ B ─┼─▶ Aggregate              │
│                        out └▶ C ─┘                          │
└──────────────────────────────────────────────────────────────┘

Pattern 1: Sequential Pipeline

The simplest pattern. Agents execute in a fixed order, with each agent's output feeding the next.

Input ──▶ [Research Agent] ──▶ [Analysis Agent] ──▶ [Writing Agent] ──▶ Output

When to use: Linear workflows where each step depends on the previous step's complete output.

python
from typing import Any
from dataclasses import dataclass, field

@dataclass
class PipelineState:
    """State object passed through the pipeline."""
    input: str
    results: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)

class PipelineAgent:
    """Base agent for sequential pipelines."""
    def __init__(self, name: str, system_prompt: str, model: str = "gpt-4o"):
        self.name = name
        self.system_prompt = system_prompt
        self.model = model

    def run(self, state: PipelineState) -> PipelineState:
        from openai import OpenAI
        client = OpenAI()

        # Build context from previous steps
        context = "\n\n".join(
            f"[{k}]: {v}" for k, v in state.results.items()
        )
        user_message = f"Task: {state.input}\n\nPrevious steps:\n{context}" if context else state.input

        response = client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.system_prompt},
                {"role": "user", "content": user_message},
            ],
        )

        state.results[self.name] = response.choices[0].message.content
        return state


class SequentialPipeline:
    """Execute agents in a fixed sequence."""
    def __init__(self, agents: list[PipelineAgent]):
        self.agents = agents

    def run(self, input_text: str) -> PipelineState:
        state = PipelineState(input=input_text)

        for agent in self.agents:
            print(f"  Running: {agent.name}")
            state = agent.run(state)

        return state


# Usage
pipeline = SequentialPipeline([
    PipelineAgent("researcher", "You are a researcher. Gather key facts about the topic."),
    PipelineAgent("analyst", "You are an analyst. Identify patterns and insights from the research."),
    PipelineAgent("writer", "You are a writer. Create a clear, polished summary from the analysis."),
])

result = pipeline.run("Impact of AI agents on software development in 2025")
print(result.results["writer"])

Pros: Simple, predictable, easy to debug. Cons: Rigid, no parallelism, one failure blocks everything.

Pattern 2: Router/Dispatcher

A routing agent analyzes the input and dispatches to the most appropriate specialized agent.

                 ┌──▶ [SQL Agent]      ──┐
Input ──▶ [Router] ──▶ [Search Agent]    ──▶ Output
                 └──▶ [Math Agent]     ──┘

When to use: Diverse input types that require different specialized handling.

python
from openai import OpenAI
import json

class RouterDispatcher:
    """Route requests to specialized agents based on intent."""

    def __init__(self, agents: dict[str, PipelineAgent]):
        self.agents = agents
        self.client = OpenAI()

    def classify_intent(self, query: str) -> str:
        """Use an LLM to classify which agent should handle the query."""
        agent_descriptions = "\n".join(
            f"- {name}: {agent.system_prompt[:100]}"
            for name, agent in self.agents.items()
        )

        response = self.client.chat.completions.create(
            model="gpt-4o-mini",  # Use a fast, cheap model for routing
            messages=[
                {
                    "role": "system",
                    "content": (
                        f"Classify the user's request into one of these categories. "
                        f"Reply with ONLY the category name.\n\n{agent_descriptions}"
                    ),
                },
                {"role": "user", "content": query},
            ],
            temperature=0,
        )

        intent = response.choices[0].message.content.strip().lower()

        # Fallback to default if classification fails
        if intent not in self.agents:
            return list(self.agents.keys())[0]
        return intent

    def run(self, query: str) -> str:
        intent = self.classify_intent(query)
        print(f"  Routed to: {intent}")

        agent = self.agents[intent]
        state = PipelineState(input=query)
        state = agent.run(state)
        return state.results[agent.name]


# Usage
router = RouterDispatcher({
    "sql": PipelineAgent("sql", "You are a SQL expert. Write queries to answer data questions."),
    "search": PipelineAgent("search", "You are a search agent. Find and summarize information."),
    "math": PipelineAgent("math", "You are a math expert. Solve calculations step by step."),
    "code": PipelineAgent("code", "You are a programmer. Write clean Python code."),
})

answer = router.run("What is the time complexity of merge sort?")

Cost Optimization: Use a small, fast model (like

gpt-4o-mini
) for the routing decision and reserve larger models for the specialized agents. The router only needs to classify intent, not solve the full problem.

Pros: Efficient specialization, easy to add new agents. Cons: Routing errors cascade, single point of failure.

Pattern 3: Supervisor

A supervisor agent dynamically delegates tasks to worker agents and synthesizes results. Unlike the router, the supervisor can issue multiple tasks, review results, and re-delegate.

                    ┌──▶ [Worker 1] ──┐
[Supervisor] ──────┤                  ├──▶ [Supervisor reviews & synthesizes]
                    └──▶ [Worker 2] ──┘
                         ▲       │
                         └───────┘  (re-delegate if needed)

When to use: Complex tasks requiring dynamic decomposition and quality control.

python
class SupervisorAgent:
    """Supervisor that delegates to and coordinates worker agents."""

    def __init__(self, workers: dict[str, PipelineAgent], model: str = "gpt-4o"):
        self.workers = workers
        self.model = model
        self.client = OpenAI()

    def run(self, task: str, max_iterations: int = 3) -> str:
        worker_descriptions = "\n".join(
            f"- {name}: {w.system_prompt[:80]}" for name, w in self.workers.items()
        )

        messages = [
            {
                "role": "system",
                "content": (
                    f"You are a supervisor managing these workers:\n{worker_descriptions}\n\n"
                    f"Break down tasks and delegate to workers. "
                    f"Respond with JSON: {{\"delegate\": \"worker_name\", \"subtask\": \"description\"}} "
                    f"or {{\"final_answer\": \"your synthesized answer\"}} when done."
                ),
            },
            {"role": "user", "content": task},
        ]

        all_results = {}

        for i in range(max_iterations):
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                response_format={"type": "json_object"},
            )

            decision = json.loads(response.choices[0].message.content)

            # Check if supervisor is done
            if "final_answer" in decision:
                return decision["final_answer"]

            # Delegate to worker
            worker_name = decision.get("delegate", "")
            subtask = decision.get("subtask", "")

            if worker_name in self.workers:
                print(f"  Delegating to {worker_name}: {subtask[:60]}...")
                worker = self.workers[worker_name]
                state = PipelineState(input=subtask)
                state = worker.run(state)
                result = state.results[worker.name]
                all_results[worker_name] = result

                messages.append({"role": "assistant", "content": response.choices[0].message.content})
                messages.append({"role": "user", "content": f"Result from {worker_name}:\n{result}"})

        # Force final synthesis if max iterations reached
        messages.append({"role": "user", "content": "Synthesize all results into a final answer now."})
        response = self.client.chat.completions.create(model=self.model, messages=messages)
        return response.choices[0].message.content


# Usage
supervisor = SupervisorAgent({
    "researcher": PipelineAgent("researcher", "Research facts and data about topics."),
    "analyst": PipelineAgent("analyst", "Analyze data and identify patterns."),
    "writer": PipelineAgent("writer", "Write clear, polished content."),
})

result = supervisor.run("Create a competitive analysis of the top 3 cloud providers")

Pros: Flexible, adaptive, quality control via review loops. Cons: Higher token cost, supervisor is a bottleneck.

Pattern 4: Collaborative

Agents discuss and iterate on a shared problem, building on each other's contributions. No single agent controls the flow.

[Agent A] ──▶ shared state ◀── [Agent B]
     ▲                              │
     └──────── [Agent C] ◀─────────┘
               (iterate until convergence)

When to use: Creative tasks, brainstorming, quality refinement through multiple perspectives.

python
class CollaborativeTeam:
    """Agents collaborate through shared discussion."""

    def __init__(self, agents: list[PipelineAgent], max_rounds: int = 3):
        self.agents = agents
        self.max_rounds = max_rounds
        self.client = OpenAI()

    def run(self, task: str) -> str:
        conversation = [f"Task: {task}"]

        for round_num in range(self.max_rounds):
            print(f"  Round {round_num + 1}/{self.max_rounds}")

            for agent in self.agents:
                context = "\n\n".join(conversation[-6:])  # Keep recent context

                response = self.client.chat.completions.create(
                    model=agent.model,
                    messages=[
                        {"role": "system", "content": agent.system_prompt},
                        {
                            "role": "user",
                            "content": (
                                f"Discussion so far:\n{context}\n\n"
                                f"Contribute your perspective. Build on others' ideas, "
                                f"identify issues, or propose improvements."
                            ),
                        },
                    ],
                )

                reply = response.choices[0].message.content
                conversation.append(f"[{agent.name}]: {reply}")

        # Return the final state of discussion
        return conversation[-1]


# Usage
team = CollaborativeTeam([
    PipelineAgent("designer", "You are a system designer. Focus on architecture and scalability."),
    PipelineAgent("security_expert", "You are a security expert. Identify vulnerabilities and mitigations."),
    PipelineAgent("integrator", "You synthesize ideas into a coherent plan. Resolve conflicts."),
], max_rounds=2)

result = team.run("Design an authentication system for a multi-tenant SaaS platform")

Pros: Diverse perspectives, iterative refinement. Cons: Unpredictable convergence, high token usage, potential for circular discussions.

Pattern 5: Hierarchical

Nested teams with managers at each level. This mirrors organizational structures for complex projects.

                  ┌────────────────┐
                  │   Director     │
                  └───────┬────────┘
              ┌───────────┴────────────┐
        ┌─────┴─────┐           ┌──────┴─────┐
        │ Team Lead │           │ Team Lead  │
        │  Backend  │           │  Frontend  │
        └─────┬─────┘           └──────┬─────┘
         ┌────┴────┐             ┌─────┴────┐
      [API Dev] [DB Dev]     [UI Dev] [UX Dev]

When to use: Large-scale projects requiring domain decomposition and parallel team execution.

python
class HierarchicalOrchestrator:
    """Nested teams with managers at each level."""

    def __init__(self, structure: dict):
        """
        structure = {
            "manager": PipelineAgent(...),
            "teams": {
                "backend": {
                    "lead": PipelineAgent(...),
                    "workers": [PipelineAgent(...), ...]
                },
                ...
            }
        }
        """
        self.structure = structure
        self.client = OpenAI()

    def run(self, task: str) -> dict:
        manager = self.structure["manager"]
        teams = self.structure["teams"]

        # Manager decomposes task into team assignments
        team_names = ", ".join(teams.keys())
        decomposition = self.client.chat.completions.create(
            model=manager.model,
            messages=[
                {
                    "role": "system",
                    "content": (
                        f"You manage these teams: {team_names}. "
                        f"Decompose the task into team assignments. "
                        f'Reply with JSON: {{"assignments": {{"team_name": "subtask"}}}}'
                    ),
                },
                {"role": "user", "content": task},
            ],
            response_format={"type": "json_object"},
        )

        assignments = json.loads(decomposition.choices[0].message.content)["assignments"]
        team_results = {}

        # Each team executes its assignment
        for team_name, subtask in assignments.items():
            if team_name in teams:
                team = teams[team_name]
                print(f"  Team '{team_name}' working on: {subtask[:60]}...")

                # Team lead coordinates workers
                lead_pipeline = SequentialPipeline(
                    [team["lead"]] + team["workers"]
                )
                result = lead_pipeline.run(subtask)
                team_results[team_name] = result.results

        # Manager synthesizes team results
        synthesis_prompt = "Team results:\n" + json.dumps(
            {k: list(v.values())[-1] for k, v in team_results.items()},
            indent=2
        )

        final = self.client.chat.completions.create(
            model=manager.model,
            messages=[
                {"role": "system", "content": manager.system_prompt},
                {"role": "user", "content": f"Original task: {task}\n\n{synthesis_prompt}\n\nSynthesize into a final deliverable."},
            ],
        )

        return {"team_results": team_results, "final": final.choices[0].message.content}

Pros: Scales to large problems, parallel team execution, clear responsibility. Cons: Complex setup, coordination overhead, slow for simple tasks.

Pattern 6: Map-Reduce

Parallelize work across agents, then aggregate results. Ideal for tasks that can be decomposed into independent subtasks.

         ┌──▶ [Agent] ── result 1 ──┐
Input ───┼──▶ [Agent] ── result 2 ──┼──▶ [Aggregator] ──▶ Output
         └──▶ [Agent] ── result 3 ──┘

When to use: Analyzing multiple documents, processing parallel queries, any embarrassingly parallel task.

python
import asyncio
from openai import AsyncOpenAI

class MapReduceOrchestrator:
    """Parallel map step followed by a reduce/aggregation step."""

    def __init__(self, mapper_prompt: str, reducer_prompt: str, model: str = "gpt-4o"):
        self.mapper_prompt = mapper_prompt
        self.reducer_prompt = reducer_prompt
        self.model = model
        self.async_client = AsyncOpenAI()

    async def map_single(self, item: str) -> str:
        """Process a single item (map step)."""
        response = await self.async_client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.mapper_prompt},
                {"role": "user", "content": item},
            ],
        )
        return response.choices[0].message.content

    async def run(self, items: list[str]) -> str:
        # Map: process all items in parallel
        print(f"  Mapping {len(items)} items in parallel...")
        map_results = await asyncio.gather(
            *[self.map_single(item) for item in items]
        )

        # Reduce: aggregate all results
        print(f"  Reducing {len(map_results)} results...")
        combined = "\n\n".join(
            f"[Item {i+1}]:\n{result}"
            for i, result in enumerate(map_results)
        )

        response = await self.async_client.chat.completions.create(
            model=self.model,
            messages=[
                {"role": "system", "content": self.reducer_prompt},
                {"role": "user", "content": combined},
            ],
        )

        return response.choices[0].message.content


# Usage
orchestrator = MapReduceOrchestrator(
    mapper_prompt="Analyze this document and extract the 3 most important findings.",
    reducer_prompt="Synthesize all findings into a unified executive summary.",
)

documents = [
    "Q1 earnings report: Revenue grew 15%...",
    "Customer satisfaction survey: NPS increased to 72...",
    "Engineering report: System uptime at 99.97%...",
    "Market analysis: Competitor X launched new product...",
]

result = asyncio.run(orchestrator.run(documents))

Async for Parallelism: The map-reduce pattern benefits enormously from async execution. Using

asyncio.gather
processes all map tasks concurrently, reducing wall-clock time from O(n) to O(1) for LLM calls.

Pros: Fast (parallel execution), scalable, simple to reason about. Cons: Items must be independent, aggregation can lose nuance.

Implementing Patterns with LangGraph

LangGraph provides a framework for building stateful, graph-based agent workflows. It is particularly powerful for patterns that need explicit state management and conditional routing.

python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, Literal
from langchain_openai import ChatOpenAI
import operator

class AgentState(TypedDict):
    """State shared across all nodes in the graph."""
    messages: Annotated[list, operator.add]
    task: str
    research: str
    analysis: str
    final_output: str
    next_step: str

def research_node(state: AgentState) -> dict:
    """Research node: gathers information."""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(
        f"Research this topic thoroughly:\n{state['task']}"
    )
    return {"research": response.content, "messages": [response]}

def analysis_node(state: AgentState) -> dict:
    """Analysis node: analyzes research findings."""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(
        f"Analyze these findings:\n{state['research']}"
    )
    return {"analysis": response.content, "messages": [response]}

def quality_check(state: AgentState) -> Literal["revise", "publish"]:
    """Conditional edge: check if analysis meets quality bar."""
    llm = ChatOpenAI(model="gpt-4o-mini")
    check = llm.invoke(
        f"Does this analysis adequately address the task?\n"
        f"Task: {state['task']}\n"
        f"Analysis: {state['analysis']}\n"
        f"Reply with exactly 'pass' or 'fail'."
    )
    return "publish" if "pass" in check.content.lower() else "revise"

def publish_node(state: AgentState) -> dict:
    """Publish node: formats the final output."""
    llm = ChatOpenAI(model="gpt-4o")
    response = llm.invoke(
        f"Create a polished final output from:\n{state['analysis']}"
    )
    return {"final_output": response.content}

# Build the graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("research", research_node)
workflow.add_node("analysis", analysis_node)
workflow.add_node("publish", publish_node)

# Add edges
workflow.set_entry_point("research")
workflow.add_edge("research", "analysis")
workflow.add_conditional_edges("analysis", quality_check, {
    "revise": "research",   # Loop back if quality is low
    "publish": "publish",   # Move forward if quality passes
})
workflow.add_edge("publish", END)

# Compile and run
app = workflow.compile()
result = app.invoke({
    "task": "Analyze the impact of LLMs on software testing",
    "messages": [],
    "research": "",
    "analysis": "",
    "final_output": "",
    "next_step": "",
})

print(result["final_output"])
LangGraph Execution Flow:

  ┌──────────┐     ┌──────────┐     ┌─────────────┐
  │ Research  │────▶│ Analysis │────▶│ Quality     │
  │  Node    │     │  Node    │     │ Check       │
  └──────────┘     └──────────┘     └──────┬──────┘
       ▲                                    │
       │ "revise"                  "publish" │
       └────────────────────────────────────▼
                                     ┌──────────┐
                                     │ Publish  │──▶ END
                                     │  Node    │
                                     └──────────┘

Infinite Loops: When using conditional edges that loop back (like the quality check above), always include a maximum iteration guard. Without it, a strict quality check can loop indefinitely and burn through your API budget.

Choosing the Right Pattern

ScenarioRecommended Pattern
Linear data processing pipelineSequential Pipeline
Customer support with specialized teamsRouter/Dispatcher
Research with dynamic task decompositionSupervisor
Document review with multiple reviewersCollaborative
Large project with distinct workstreamsHierarchical
Analyzing a batch of documentsMap-Reduce
Workflow with retry loops and conditionalsLangGraph + any pattern

Key Takeaways

  1. Sequential pipelines are the simplest and should be your default unless you need more flexibility
  2. Router/dispatcher is the most cost-effective pattern for handling diverse input types
  3. Supervisor provides dynamic task management at the cost of token-heavy coordination
  4. Collaborative works best for creative tasks where diverse perspectives add value
  5. Hierarchical scales to large problems but adds organizational complexity
  6. Map-reduce maximizes throughput for parallelizable tasks
  7. LangGraph adds state management and conditional routing to any of these patterns

Quiz