Project: Production Multi-Agent Pipeline
In this capstone project, you will build a complete production-ready multi-agent system. The system takes a user request, routes it to specialized agents (research, analysis, writing), orchestrates their collaboration with tool use, evaluates and refines results, and includes logging, tracing, error recovery, and containerized deployment.
This ties together everything from the module: agent design, tool use, orchestration patterns, evaluation, and production infrastructure.
Project Goal: Build a production multi-agent content pipeline that takes a topic, produces a research-backed technical report, and does so with full observability, error handling, evaluation, and deployment infrastructure. By the end, you'll have a system you can actually deploy.
System Architecture
┌──────────────────────────────────────────────────────────────────┐
│ FastAPI Gateway │
│ (Auth, Rate Limiting, Routing) │
└──────────────────────────┬───────────────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────────────┐
│ Pipeline Orchestrator │
│ (LangGraph State Machine) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Router │──►│ Research │──►│ Analysis │──►│ Writing │ │
│ │ Agent │ │ Agent │ │ Agent │ │ Agent │ │
│ └──────────┘ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │ Search │ │ Analyze │ │ Format │ │
│ │ Tool │ │ Tool │ │ Tool │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ Quality Gate (Evaluator) │ │
│ │ Score < threshold? → Revise loop │ │
│ └───────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
│
┌──────────────────────────▼───────────────────────────────────────┐
│ Monitoring & Observability │
│ (Structured Logging, Traces, Prometheus Metrics) │
└──────────────────────────────────────────────────────────────────┘
Step 1: Project Setup
mkdir production-pipeline && cd production-pipeline
python -m venv venv
source venv/bin/activate
pip install langgraph langchain-openai langchain-core fastapi uvicorn \
pydantic python-dotenv structlog prometheus-client httpx
# .env
OPENAI_API_KEY=your_key_here
Step 2: Define the Pipeline State
LangGraph uses a typed state object that flows through the graph. Every agent reads from and writes to this shared state.
# state.py
from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages
class PipelineState(TypedDict):
"""Shared state for the multi-agent pipeline."""
# Input
topic: str
requirements: Optional[str]
# Agent outputs
research: Optional[str]
analysis: Optional[str]
draft: Optional[str]
final_report: Optional[str]
# Quality tracking
quality_score: float
revision_count: int
max_revisions: int
# Metadata
messages: Annotated[list, add_messages]
errors: list[str]
trace: list[dict]
Why LangGraph? LangGraph models the pipeline as a state machine with typed state, conditional edges, and cycle support. This gives you explicit control over agent coordination, retry logic, and quality gates -- essential for production systems.
Step 3: Build the Specialized Agents
Each agent is a Python function that takes the pipeline state, does its work, and returns updated state.
# agents.py
import time
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from state import PipelineState
llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
creative_llm = ChatOpenAI(model="gpt-4o", temperature=0.7)
def router_agent(state: PipelineState) -> PipelineState:
"""Route the request and validate the topic."""
start = time.time()
response = llm.invoke([
SystemMessage(content="""You are a request router. Analyze the topic and determine:
1. Is this a valid research topic? (yes/no)
2. What are the key subtopics to research?
3. What type of report should be produced? (technical, business, overview)
Respond in JSON: {"valid": bool, "subtopics": [...], "report_type": "..."}"""),
HumanMessage(content=f"Topic: {state['topic']}\nRequirements: {state.get('requirements', 'None')}")
])
result = json.loads(response.content)
state["trace"].append({
"agent": "router",
"duration_ms": (time.time() - start) * 1000,
"result": result,
})
if not result.get("valid", False):
state["errors"].append(f"Invalid topic: {state['topic']}")
return state
def research_agent(state: PipelineState) -> PipelineState:
"""Gather comprehensive information on the topic."""
start = time.time()
response = llm.invoke([
SystemMessage(content="""You are a senior research analyst. Produce a thorough research brief.
Include:
- Overview of the current state
- Key players, technologies, or concepts
- Recent developments (last 12 months)
- Data points and statistics where available
- Different perspectives and debates
- Practical implications
Be factual, specific, and cite sources where possible.
Target length: 800-1200 words."""),
HumanMessage(content=f"Research this topic thoroughly: {state['topic']}")
])
state["research"] = response.content
state["trace"].append({
"agent": "research",
"duration_ms": (time.time() - start) * 1000,
"output_length": len(response.content),
})
return state
def analysis_agent(state: PipelineState) -> PipelineState:
"""Analyze the research and extract key insights."""
start = time.time()
response = llm.invoke([
SystemMessage(content="""You are a strategic analyst. Given research data, produce:
1. Key Themes: 3-5 major themes with supporting evidence
2. Trends: Emerging patterns and where things are heading
3. Implications: What this means for practitioners
4. Gaps: What questions remain unanswered
5. Recommendations: Actionable takeaways
Be analytical, not just descriptive. Draw connections between data points."""),
HumanMessage(content=f"Analyze this research:\n\n{state['research']}")
])
state["analysis"] = response.content
state["trace"].append({
"agent": "analysis",
"duration_ms": (time.time() - start) * 1000,
"output_length": len(response.content),
})
return state
def writing_agent(state: PipelineState) -> PipelineState:
"""Write the final report based on research and analysis."""
start = time.time()
revision_context = ""
if state["revision_count"] > 0:
revision_context = f"""
This is revision #{state['revision_count']}. The previous draft scored {state['quality_score']:.1f}/10.
Focus on improving: clarity, depth, specific examples, and actionable insights.
Previous draft:
{state.get('draft', '')}
"""
response = creative_llm.invoke([
SystemMessage(content=f"""You are an expert technical writer. Produce a polished report.
Requirements:
- Compelling opening that establishes why this matters
- Clear structure with descriptive headings
- Specific examples and data integrated naturally
- Analysis that goes beyond surface-level description
- Conclusion with actionable takeaways
- Professional, authoritative tone
- 1500-2500 words
- Markdown format
{revision_context}"""),
HumanMessage(content=f"""Write a comprehensive report on: {state['topic']}
Research:
{state['research']}
Analysis:
{state['analysis']}""")
])
state["draft"] = response.content
state["trace"].append({
"agent": "writing",
"duration_ms": (time.time() - start) * 1000,
"output_length": len(response.content),
"revision": state["revision_count"],
})
return state
Step 4: Build the Quality Gate
The quality gate evaluates the draft and decides whether it is good enough to ship or needs revision.
# evaluator.py
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from state import PipelineState
evaluator_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
def quality_gate(state: PipelineState) -> PipelineState:
"""Evaluate the draft quality and decide: publish or revise."""
response = evaluator_llm.invoke([
SystemMessage(content="""You are a quality evaluator for technical reports.
Score the report on these dimensions (1-10 each):
1. Accuracy: Are claims well-supported?
2. Depth: Does it go beyond surface-level?
3. Clarity: Is it well-structured and easy to follow?
4. Actionability: Does it provide practical takeaways?
5. Engagement: Is it compelling to read?
Respond in JSON:
{
"scores": {"accuracy": int, "depth": int, "clarity": int, "actionability": int, "engagement": int},
"overall": float,
"strengths": ["..."],
"weaknesses": ["..."],
"publish_ready": bool
}"""),
HumanMessage(content=f"Evaluate this report:\n\n{state['draft']}")
])
result = json.loads(response.content)
state["quality_score"] = result["overall"]
state["trace"].append({
"agent": "evaluator",
"scores": result["scores"],
"overall": result["overall"],
"publish_ready": result["publish_ready"],
})
return state
def should_revise(state: PipelineState) -> str:
"""Conditional edge: decide whether to revise or finalize."""
if state["quality_score"] >= 7.0:
return "finalize"
if state["revision_count"] >= state["max_revisions"]:
return "finalize" # Max revisions reached, ship what we have
return "revise"
def finalize(state: PipelineState) -> PipelineState:
"""Finalize the report for delivery."""
state["final_report"] = state["draft"]
state["trace"].append({
"agent": "finalize",
"quality_score": state["quality_score"],
"revisions": state["revision_count"],
})
return state
def prepare_revision(state: PipelineState) -> PipelineState:
"""Increment revision counter before sending back to writing agent."""
state["revision_count"] += 1
return state
Quality gates with revision loops are a production pattern. Instead of accepting the first draft, the system evaluates it against quality criteria and routes it back to the writing agent if the score is below the threshold. Set a max revision count to prevent infinite loops.
Step 5: Build the LangGraph Pipeline
Wire the agents together into a state machine with conditional edges.
# pipeline.py
from langgraph.graph import StateGraph, END
from state import PipelineState
from agents import router_agent, research_agent, analysis_agent, writing_agent
from evaluator import quality_gate, should_revise, finalize, prepare_revision
def build_pipeline():
"""Build the multi-agent pipeline graph."""
graph = StateGraph(PipelineState)
# Add nodes
graph.add_node("router", router_agent)
graph.add_node("research", research_agent)
graph.add_node("analysis", analysis_agent)
graph.add_node("writing", writing_agent)
graph.add_node("quality_gate", quality_gate)
graph.add_node("prepare_revision", prepare_revision)
graph.add_node("finalize", finalize)
# Define edges
graph.set_entry_point("router")
graph.add_edge("router", "research")
graph.add_edge("research", "analysis")
graph.add_edge("analysis", "writing")
graph.add_edge("writing", "quality_gate")
# Conditional edge: revise or finalize
graph.add_conditional_edges(
"quality_gate",
should_revise,
{
"revise": "prepare_revision",
"finalize": "finalize",
}
)
graph.add_edge("prepare_revision", "writing")
graph.add_edge("finalize", END)
return graph.compile()
# The compiled graph:
#
# router → research → analysis → writing → quality_gate
# ▲ │
# │ ┌──────┴──────┐
# │ ▼ ▼
# prepare_revision finalize → END
Step 6: Add Structured Logging and Metrics
Production systems need observability. We add structured logging with
structlog# monitoring.py
import structlog
from prometheus_client import Counter, Histogram, Gauge
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.add_log_level,
structlog.processors.JSONRenderer(),
]
)
logger = structlog.get_logger()
# Prometheus metrics
pipeline_duration = Histogram(
"pipeline_duration_seconds",
"Time to complete the full pipeline",
buckets=[5, 10, 30, 60, 120, 300]
)
pipeline_quality = Histogram(
"pipeline_quality_score",
"Quality score of pipeline outputs",
buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
)
pipeline_revisions = Counter(
"pipeline_revisions_total",
"Total number of revisions across all runs"
)
pipeline_errors = Counter(
"pipeline_errors_total",
"Total pipeline errors",
["error_type"]
)
active_pipelines = Gauge(
"active_pipelines",
"Number of currently running pipelines"
)
Step 7: Build the FastAPI Server
Wrap the pipeline in a production API with authentication, rate limiting, and health checks.
# server.py
import time
import uuid
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response
from pipeline import build_pipeline
from monitoring import (
logger, pipeline_duration, pipeline_quality,
pipeline_revisions, pipeline_errors, active_pipelines
)
app = FastAPI(title="Multi-Agent Pipeline API", version="1.0.0")
security = HTTPBearer()
pipeline = build_pipeline()
# Simple API key auth (use a real auth system in production)
VALID_API_KEYS = {"your-secret-api-key-here"}
def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
if credentials.credentials not in VALID_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
return credentials.credentials
class PipelineRequest(BaseModel):
topic: str
requirements: str = ""
max_revisions: int = 2
class PipelineResponse(BaseModel):
run_id: str
topic: str
report: str
quality_score: float
revisions: int
duration_seconds: float
trace_summary: list[dict]
@app.post("/pipeline/run", response_model=PipelineResponse)
async def run_pipeline(
request: PipelineRequest,
api_key: str = Depends(verify_auth)
):
"""Run the multi-agent pipeline on a topic."""
run_id = str(uuid.uuid4())[:8]
start_time = time.time()
logger.info("pipeline_started", run_id=run_id, topic=request.topic)
active_pipelines.inc()
try:
# Initialize state
initial_state = {
"topic": request.topic,
"requirements": request.requirements,
"research": None,
"analysis": None,
"draft": None,
"final_report": None,
"quality_score": 0.0,
"revision_count": 0,
"max_revisions": request.max_revisions,
"messages": [],
"errors": [],
"trace": [],
}
# Run the pipeline
result = pipeline.invoke(initial_state)
duration = time.time() - start_time
# Record metrics
pipeline_duration.observe(duration)
pipeline_quality.observe(result["quality_score"])
pipeline_revisions.inc(result["revision_count"])
logger.info(
"pipeline_completed",
run_id=run_id,
quality_score=result["quality_score"],
revisions=result["revision_count"],
duration_seconds=round(duration, 2),
)
return PipelineResponse(
run_id=run_id,
topic=request.topic,
report=result["final_report"] or result.get("draft", "No output generated"),
quality_score=result["quality_score"],
revisions=result["revision_count"],
duration_seconds=round(duration, 2),
trace_summary=result["trace"],
)
except Exception as e:
pipeline_errors.labels(error_type=type(e).__name__).inc()
logger.error("pipeline_failed", run_id=run_id, error=str(e))
raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}")
finally:
active_pipelines.dec()
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers."""
return {"status": "healthy", "version": "1.0.0"}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST
)
Step 8: Containerize with Docker
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: "3.8"
services:
pipeline:
build: .
ports:
- "8000:8000"
env_file:
- .env
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
depends_on:
- prometheus
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "pipeline"
static_configs:
- targets: ["pipeline:8000"]
metrics_path: "/metrics"
Step 9: Run and Test the System
# Run locally
uvicorn server:app --reload
# Or with Docker
docker compose up --build
Test the pipeline:
curl -X POST http://localhost:8000/pipeline/run \
-H "Authorization: Bearer your-secret-api-key-here" \
-H "Content-Type: application/json" \
-d '{
"topic": "The impact of AI agents on software development in 2025",
"requirements": "Focus on practical implications for engineering teams",
"max_revisions": 2
}'
Expected response:
{
"run_id": "a1b2c3d4",
"topic": "The impact of AI agents on software development in 2025",
"report": "# The Impact of AI Agents on Software Development\n\n...",
"quality_score": 7.8,
"revisions": 1,
"duration_seconds": 45.2,
"trace_summary": [
{"agent": "router", "duration_ms": 1200},
{"agent": "research", "duration_ms": 8500, "output_length": 2400},
{"agent": "analysis", "duration_ms": 5200, "output_length": 1800},
{"agent": "writing", "duration_ms": 12000, "output_length": 4200, "revision": 0},
{"agent": "evaluator", "overall": 6.5, "publish_ready": false},
{"agent": "writing", "duration_ms": 14000, "output_length": 4800, "revision": 1},
{"agent": "evaluator", "overall": 7.8, "publish_ready": true},
{"agent": "finalize", "quality_score": 7.8, "revisions": 1}
]
}
Step 10: Monitor with Grafana
After starting the Docker Compose stack, open Grafana at
http://localhost:3000- Pipeline duration -- histogram of how long runs take
- Quality scores -- distribution of output quality over time
- Revision counts -- how often drafts need revision
- Error rates -- pipeline failures by error type
- Active pipelines -- current concurrency
These metrics tell you whether your agents are degrading (quality dropping), getting slower (duration increasing), or failing more often (error rate spiking).
Project File Structure
production-pipeline/
├── .env # API keys (gitignored)
├── state.py # Pipeline state definition
├── agents.py # Specialized agent functions
├── evaluator.py # Quality gate and revision logic
├── pipeline.py # LangGraph pipeline assembly
├── monitoring.py # Logging and Prometheus metrics
├── server.py # FastAPI server
├── Dockerfile # Container definition
├── docker-compose.yml # Full stack (app + Prometheus + Grafana)
├── prometheus.yml # Prometheus scrape config
└── requirements.txt # Python dependencies
Key Takeaways
What You Built:
- A multi-agent pipeline with Router, Research, Analysis, and Writing agents
- LangGraph state machine with typed state and conditional edges
- A quality gate with automatic revision loops (score-based routing)
- Structured logging with structlog for production debugging
- Prometheus metrics for pipeline duration, quality, revisions, and errors
- A FastAPI server with auth, health checks, and error handling
- Docker Compose deployment with Prometheus and Grafana monitoring
- A fully traceable system where every agent step is logged and measurable
This architecture is the foundation for production agent systems. The key insight is that production agents need more than just prompts and tools -- they need observability, quality gates, error recovery, and deployment infrastructure. Every component we built serves a specific production need.