Back
advanced
Production Agentic Systems

Project: Production Multi-Agent Pipeline

Build a complete production-ready multi-agent system with monitoring, evaluation, and deployment

60 min read· Project· Production· Multi-Agent· Pipeline

Project: Production Multi-Agent Pipeline

In this capstone project, you will build a complete production-ready multi-agent system. The system takes a user request, routes it to specialized agents (research, analysis, writing), orchestrates their collaboration with tool use, evaluates and refines results, and includes logging, tracing, error recovery, and containerized deployment.

This ties together everything from the module: agent design, tool use, orchestration patterns, evaluation, and production infrastructure.

Project Goal: Build a production multi-agent content pipeline that takes a topic, produces a research-backed technical report, and does so with full observability, error handling, evaluation, and deployment infrastructure. By the end, you'll have a system you can actually deploy.

System Architecture

┌──────────────────────────────────────────────────────────────────┐
│                        FastAPI Gateway                            │
│                  (Auth, Rate Limiting, Routing)                    │
└──────────────────────────┬───────────────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────────────┐
│                     Pipeline Orchestrator                          │
│                (LangGraph State Machine)                           │
│                                                                    │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────────┐  │
│  │  Router   │──►│ Research │──►│ Analysis │──►│   Writing     │  │
│  │  Agent    │   │  Agent   │   │  Agent   │   │   Agent       │  │
│  └──────────┘   └────┬─────┘   └────┬─────┘   └──────┬───────┘  │
│                      │              │                  │           │
│                 ┌────▼────┐    ┌────▼────┐       ┌────▼────┐     │
│                 │ Search  │    │ Analyze │       │ Format  │     │
│                 │  Tool   │    │  Tool   │       │  Tool   │     │
│                 └─────────┘    └─────────┘       └─────────┘     │
│                                                                    │
│  ┌───────────────────────────────────────────────────────────┐    │
│  │                  Quality Gate (Evaluator)                   │    │
│  │            Score < threshold? → Revise loop                │    │
│  └───────────────────────────────────────────────────────────┘    │
└──────────────────────────────────────────────────────────────────┘
                           │
┌──────────────────────────▼───────────────────────────────────────┐
│               Monitoring & Observability                          │
│         (Structured Logging, Traces, Prometheus Metrics)          │
└──────────────────────────────────────────────────────────────────┘

Step 1: Project Setup

bash
mkdir production-pipeline && cd production-pipeline
python -m venv venv
source venv/bin/activate

pip install langgraph langchain-openai langchain-core fastapi uvicorn \
    pydantic python-dotenv structlog prometheus-client httpx
bash
# .env
OPENAI_API_KEY=your_key_here

Step 2: Define the Pipeline State

LangGraph uses a typed state object that flows through the graph. Every agent reads from and writes to this shared state.

python
# state.py
from typing import TypedDict, Annotated, Optional
from langgraph.graph.message import add_messages


class PipelineState(TypedDict):
    """Shared state for the multi-agent pipeline."""
    # Input
    topic: str
    requirements: Optional[str]

    # Agent outputs
    research: Optional[str]
    analysis: Optional[str]
    draft: Optional[str]
    final_report: Optional[str]

    # Quality tracking
    quality_score: float
    revision_count: int
    max_revisions: int

    # Metadata
    messages: Annotated[list, add_messages]
    errors: list[str]
    trace: list[dict]

Why LangGraph? LangGraph models the pipeline as a state machine with typed state, conditional edges, and cycle support. This gives you explicit control over agent coordination, retry logic, and quality gates -- essential for production systems.

Step 3: Build the Specialized Agents

Each agent is a Python function that takes the pipeline state, does its work, and returns updated state.

python
# agents.py
import time
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from state import PipelineState


llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
creative_llm = ChatOpenAI(model="gpt-4o", temperature=0.7)


def router_agent(state: PipelineState) -> PipelineState:
    """Route the request and validate the topic."""
    start = time.time()

    response = llm.invoke([
        SystemMessage(content="""You are a request router. Analyze the topic and determine:
1. Is this a valid research topic? (yes/no)
2. What are the key subtopics to research?
3. What type of report should be produced? (technical, business, overview)

Respond in JSON: {"valid": bool, "subtopics": [...], "report_type": "..."}"""),
        HumanMessage(content=f"Topic: {state['topic']}\nRequirements: {state.get('requirements', 'None')}")
    ])

    result = json.loads(response.content)

    state["trace"].append({
        "agent": "router",
        "duration_ms": (time.time() - start) * 1000,
        "result": result,
    })

    if not result.get("valid", False):
        state["errors"].append(f"Invalid topic: {state['topic']}")

    return state


def research_agent(state: PipelineState) -> PipelineState:
    """Gather comprehensive information on the topic."""
    start = time.time()

    response = llm.invoke([
        SystemMessage(content="""You are a senior research analyst. Produce a thorough research brief.

Include:
- Overview of the current state
- Key players, technologies, or concepts
- Recent developments (last 12 months)
- Data points and statistics where available
- Different perspectives and debates
- Practical implications

Be factual, specific, and cite sources where possible.
Target length: 800-1200 words."""),
        HumanMessage(content=f"Research this topic thoroughly: {state['topic']}")
    ])

    state["research"] = response.content
    state["trace"].append({
        "agent": "research",
        "duration_ms": (time.time() - start) * 1000,
        "output_length": len(response.content),
    })

    return state


def analysis_agent(state: PipelineState) -> PipelineState:
    """Analyze the research and extract key insights."""
    start = time.time()

    response = llm.invoke([
        SystemMessage(content="""You are a strategic analyst. Given research data, produce:

1. Key Themes: 3-5 major themes with supporting evidence
2. Trends: Emerging patterns and where things are heading
3. Implications: What this means for practitioners
4. Gaps: What questions remain unanswered
5. Recommendations: Actionable takeaways

Be analytical, not just descriptive. Draw connections between data points."""),
        HumanMessage(content=f"Analyze this research:\n\n{state['research']}")
    ])

    state["analysis"] = response.content
    state["trace"].append({
        "agent": "analysis",
        "duration_ms": (time.time() - start) * 1000,
        "output_length": len(response.content),
    })

    return state


def writing_agent(state: PipelineState) -> PipelineState:
    """Write the final report based on research and analysis."""
    start = time.time()

    revision_context = ""
    if state["revision_count"] > 0:
        revision_context = f"""
This is revision #{state['revision_count']}. The previous draft scored {state['quality_score']:.1f}/10.
Focus on improving: clarity, depth, specific examples, and actionable insights.

Previous draft:
{state.get('draft', '')}
"""

    response = creative_llm.invoke([
        SystemMessage(content=f"""You are an expert technical writer. Produce a polished report.

Requirements:
- Compelling opening that establishes why this matters
- Clear structure with descriptive headings
- Specific examples and data integrated naturally
- Analysis that goes beyond surface-level description
- Conclusion with actionable takeaways
- Professional, authoritative tone
- 1500-2500 words
- Markdown format
{revision_context}"""),
        HumanMessage(content=f"""Write a comprehensive report on: {state['topic']}

Research:
{state['research']}

Analysis:
{state['analysis']}""")
    ])

    state["draft"] = response.content
    state["trace"].append({
        "agent": "writing",
        "duration_ms": (time.time() - start) * 1000,
        "output_length": len(response.content),
        "revision": state["revision_count"],
    })

    return state

Step 4: Build the Quality Gate

The quality gate evaluates the draft and decides whether it is good enough to ship or needs revision.

python
# evaluator.py
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
from state import PipelineState

evaluator_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)


def quality_gate(state: PipelineState) -> PipelineState:
    """Evaluate the draft quality and decide: publish or revise."""
    response = evaluator_llm.invoke([
        SystemMessage(content="""You are a quality evaluator for technical reports.
Score the report on these dimensions (1-10 each):
1. Accuracy: Are claims well-supported?
2. Depth: Does it go beyond surface-level?
3. Clarity: Is it well-structured and easy to follow?
4. Actionability: Does it provide practical takeaways?
5. Engagement: Is it compelling to read?

Respond in JSON:
{
    "scores": {"accuracy": int, "depth": int, "clarity": int, "actionability": int, "engagement": int},
    "overall": float,
    "strengths": ["..."],
    "weaknesses": ["..."],
    "publish_ready": bool
}"""),
        HumanMessage(content=f"Evaluate this report:\n\n{state['draft']}")
    ])

    result = json.loads(response.content)
    state["quality_score"] = result["overall"]

    state["trace"].append({
        "agent": "evaluator",
        "scores": result["scores"],
        "overall": result["overall"],
        "publish_ready": result["publish_ready"],
    })

    return state


def should_revise(state: PipelineState) -> str:
    """Conditional edge: decide whether to revise or finalize."""
    if state["quality_score"] >= 7.0:
        return "finalize"
    if state["revision_count"] >= state["max_revisions"]:
        return "finalize"  # Max revisions reached, ship what we have
    return "revise"


def finalize(state: PipelineState) -> PipelineState:
    """Finalize the report for delivery."""
    state["final_report"] = state["draft"]
    state["trace"].append({
        "agent": "finalize",
        "quality_score": state["quality_score"],
        "revisions": state["revision_count"],
    })
    return state


def prepare_revision(state: PipelineState) -> PipelineState:
    """Increment revision counter before sending back to writing agent."""
    state["revision_count"] += 1
    return state

Quality gates with revision loops are a production pattern. Instead of accepting the first draft, the system evaluates it against quality criteria and routes it back to the writing agent if the score is below the threshold. Set a max revision count to prevent infinite loops.

Step 5: Build the LangGraph Pipeline

Wire the agents together into a state machine with conditional edges.

python
# pipeline.py
from langgraph.graph import StateGraph, END
from state import PipelineState
from agents import router_agent, research_agent, analysis_agent, writing_agent
from evaluator import quality_gate, should_revise, finalize, prepare_revision


def build_pipeline():
    """Build the multi-agent pipeline graph."""
    graph = StateGraph(PipelineState)

    # Add nodes
    graph.add_node("router", router_agent)
    graph.add_node("research", research_agent)
    graph.add_node("analysis", analysis_agent)
    graph.add_node("writing", writing_agent)
    graph.add_node("quality_gate", quality_gate)
    graph.add_node("prepare_revision", prepare_revision)
    graph.add_node("finalize", finalize)

    # Define edges
    graph.set_entry_point("router")
    graph.add_edge("router", "research")
    graph.add_edge("research", "analysis")
    graph.add_edge("analysis", "writing")
    graph.add_edge("writing", "quality_gate")

    # Conditional edge: revise or finalize
    graph.add_conditional_edges(
        "quality_gate",
        should_revise,
        {
            "revise": "prepare_revision",
            "finalize": "finalize",
        }
    )
    graph.add_edge("prepare_revision", "writing")
    graph.add_edge("finalize", END)

    return graph.compile()


# The compiled graph:
#
# router → research → analysis → writing → quality_gate
#                                    ▲            │
#                                    │     ┌──────┴──────┐
#                                    │     ▼             ▼
#                              prepare_revision      finalize → END

Step 6: Add Structured Logging and Metrics

Production systems need observability. We add structured logging with

structlog
and Prometheus metrics.

python
# monitoring.py
import structlog
from prometheus_client import Counter, Histogram, Gauge

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.add_log_level,
        structlog.processors.JSONRenderer(),
    ]
)

logger = structlog.get_logger()

# Prometheus metrics
pipeline_duration = Histogram(
    "pipeline_duration_seconds",
    "Time to complete the full pipeline",
    buckets=[5, 10, 30, 60, 120, 300]
)

pipeline_quality = Histogram(
    "pipeline_quality_score",
    "Quality score of pipeline outputs",
    buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
)

pipeline_revisions = Counter(
    "pipeline_revisions_total",
    "Total number of revisions across all runs"
)

pipeline_errors = Counter(
    "pipeline_errors_total",
    "Total pipeline errors",
    ["error_type"]
)

active_pipelines = Gauge(
    "active_pipelines",
    "Number of currently running pipelines"
)

Step 7: Build the FastAPI Server

Wrap the pipeline in a production API with authentication, rate limiting, and health checks.

python
# server.py
import time
import uuid
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from starlette.responses import Response

from pipeline import build_pipeline
from monitoring import (
    logger, pipeline_duration, pipeline_quality,
    pipeline_revisions, pipeline_errors, active_pipelines
)

app = FastAPI(title="Multi-Agent Pipeline API", version="1.0.0")
security = HTTPBearer()
pipeline = build_pipeline()

# Simple API key auth (use a real auth system in production)
VALID_API_KEYS = {"your-secret-api-key-here"}


def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
    if credentials.credentials not in VALID_API_KEYS:
        raise HTTPException(status_code=401, detail="Invalid API key")
    return credentials.credentials


class PipelineRequest(BaseModel):
    topic: str
    requirements: str = ""
    max_revisions: int = 2


class PipelineResponse(BaseModel):
    run_id: str
    topic: str
    report: str
    quality_score: float
    revisions: int
    duration_seconds: float
    trace_summary: list[dict]


@app.post("/pipeline/run", response_model=PipelineResponse)
async def run_pipeline(
    request: PipelineRequest,
    api_key: str = Depends(verify_auth)
):
    """Run the multi-agent pipeline on a topic."""
    run_id = str(uuid.uuid4())[:8]
    start_time = time.time()

    logger.info("pipeline_started", run_id=run_id, topic=request.topic)
    active_pipelines.inc()

    try:
        # Initialize state
        initial_state = {
            "topic": request.topic,
            "requirements": request.requirements,
            "research": None,
            "analysis": None,
            "draft": None,
            "final_report": None,
            "quality_score": 0.0,
            "revision_count": 0,
            "max_revisions": request.max_revisions,
            "messages": [],
            "errors": [],
            "trace": [],
        }

        # Run the pipeline
        result = pipeline.invoke(initial_state)

        duration = time.time() - start_time

        # Record metrics
        pipeline_duration.observe(duration)
        pipeline_quality.observe(result["quality_score"])
        pipeline_revisions.inc(result["revision_count"])

        logger.info(
            "pipeline_completed",
            run_id=run_id,
            quality_score=result["quality_score"],
            revisions=result["revision_count"],
            duration_seconds=round(duration, 2),
        )

        return PipelineResponse(
            run_id=run_id,
            topic=request.topic,
            report=result["final_report"] or result.get("draft", "No output generated"),
            quality_score=result["quality_score"],
            revisions=result["revision_count"],
            duration_seconds=round(duration, 2),
            trace_summary=result["trace"],
        )

    except Exception as e:
        pipeline_errors.labels(error_type=type(e).__name__).inc()
        logger.error("pipeline_failed", run_id=run_id, error=str(e))
        raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}")

    finally:
        active_pipelines.dec()


@app.get("/health")
async def health_check():
    """Health check endpoint for load balancers."""
    return {"status": "healthy", "version": "1.0.0"}


@app.get("/metrics")
async def metrics():
    """Prometheus metrics endpoint."""
    return Response(
        content=generate_latest(),
        media_type=CONTENT_TYPE_LATEST
    )

Step 8: Containerize with Docker

dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
yaml
# docker-compose.yml
version: "3.8"

services:
  pipeline:
    build: .
    ports:
      - "8000:8000"
    env_file:
      - .env
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    depends_on:
      - prometheus
yaml
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "pipeline"
    static_configs:
      - targets: ["pipeline:8000"]
    metrics_path: "/metrics"

Step 9: Run and Test the System

bash
# Run locally
uvicorn server:app --reload

# Or with Docker
docker compose up --build

Test the pipeline:

bash
curl -X POST http://localhost:8000/pipeline/run \
  -H "Authorization: Bearer your-secret-api-key-here" \
  -H "Content-Type: application/json" \
  -d '{
    "topic": "The impact of AI agents on software development in 2025",
    "requirements": "Focus on practical implications for engineering teams",
    "max_revisions": 2
  }'

Expected response:

json
{
  "run_id": "a1b2c3d4",
  "topic": "The impact of AI agents on software development in 2025",
  "report": "# The Impact of AI Agents on Software Development\n\n...",
  "quality_score": 7.8,
  "revisions": 1,
  "duration_seconds": 45.2,
  "trace_summary": [
    {"agent": "router", "duration_ms": 1200},
    {"agent": "research", "duration_ms": 8500, "output_length": 2400},
    {"agent": "analysis", "duration_ms": 5200, "output_length": 1800},
    {"agent": "writing", "duration_ms": 12000, "output_length": 4200, "revision": 0},
    {"agent": "evaluator", "overall": 6.5, "publish_ready": false},
    {"agent": "writing", "duration_ms": 14000, "output_length": 4800, "revision": 1},
    {"agent": "evaluator", "overall": 7.8, "publish_ready": true},
    {"agent": "finalize", "quality_score": 7.8, "revisions": 1}
  ]
}

Step 10: Monitor with Grafana

After starting the Docker Compose stack, open Grafana at

http://localhost:3000
and create dashboards for:

  • Pipeline duration -- histogram of how long runs take
  • Quality scores -- distribution of output quality over time
  • Revision counts -- how often drafts need revision
  • Error rates -- pipeline failures by error type
  • Active pipelines -- current concurrency

These metrics tell you whether your agents are degrading (quality dropping), getting slower (duration increasing), or failing more often (error rate spiking).

Project File Structure

production-pipeline/
├── .env                      # API keys (gitignored)
├── state.py                  # Pipeline state definition
├── agents.py                 # Specialized agent functions
├── evaluator.py              # Quality gate and revision logic
├── pipeline.py               # LangGraph pipeline assembly
├── monitoring.py             # Logging and Prometheus metrics
├── server.py                 # FastAPI server
├── Dockerfile                # Container definition
├── docker-compose.yml        # Full stack (app + Prometheus + Grafana)
├── prometheus.yml            # Prometheus scrape config
└── requirements.txt          # Python dependencies

Key Takeaways

What You Built:

  1. A multi-agent pipeline with Router, Research, Analysis, and Writing agents
  2. LangGraph state machine with typed state and conditional edges
  3. A quality gate with automatic revision loops (score-based routing)
  4. Structured logging with structlog for production debugging
  5. Prometheus metrics for pipeline duration, quality, revisions, and errors
  6. A FastAPI server with auth, health checks, and error handling
  7. Docker Compose deployment with Prometheus and Grafana monitoring
  8. A fully traceable system where every agent step is logged and measurable

This architecture is the foundation for production agent systems. The key insight is that production agents need more than just prompts and tools -- they need observability, quality gates, error recovery, and deployment infrastructure. Every component we built serves a specific production need.


Quiz