Back
advanced
Optimization & Deployment

Monitoring & Observability for LLM Applications

Implement comprehensive monitoring and observability using LangSmith, Weights & Biases, and custom metrics

25 min read· monitoring· observability· langsmith· wandb

Monitoring & Observability for LLM Applications

Learn to implement comprehensive monitoring and observability for production LLM applications to track performance, costs, and quality.

What You'll Learn: Observability is critical for production LLM applications. We'll explore tools and techniques for tracing, metrics, logging, and debugging using industry-standard platforms.

Fundamentals of LLM Observability

Key Metrics to Track

python
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
import time
from enum import Enum

class MetricType(Enum):
    LATENCY = "latency"
    COST = "cost"
    QUALITY = "quality"
    USAGE = "usage"
    ERROR = "error"

@dataclass
class LLMMetrics:
    """Core metrics for LLM monitoring"""

    # Performance metrics
    total_latency: float  # Total time including API calls
    model_latency: float  # Time spent in model inference
    preprocessing_latency: float  # Time in preprocessing
    postprocessing_latency: float  # Time in postprocessing

    # Token metrics
    input_tokens: int
    output_tokens: int
    total_tokens: int

    # Cost metrics
    cost: float
    cost_per_token: float

    # Quality metrics
    quality_score: Optional[float] = None
    user_feedback: Optional[int] = None  # -1, 0, 1

    # Context
    model: str = ""
    timestamp: datetime = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class MetricsCollector:
    """Collect and aggregate LLM metrics"""

    def __init__(self):
        self.metrics: List[LLMMetrics] = []
        self.request_count = 0
        self.error_count = 0

    def record_request(self, metrics: LLMMetrics):
        """Record metrics for a request"""
        self.metrics.append(metrics)
        self.request_count += 1

    def record_error(self, error_type: str, error_message: str):
        """Record an error"""
        self.error_count += 1

    def get_summary(self, window_minutes: int = 60) -> Dict:
        """Get summary statistics for time window"""

        cutoff = datetime.now().timestamp() - (window_minutes * 60)
        recent_metrics = [
            m for m in self.metrics
            if m.timestamp.timestamp() > cutoff
        ]

        if not recent_metrics:
            return {}

        return {
            "request_count": len(recent_metrics),
            "error_count": self.error_count,
            "error_rate": self.error_count / max(len(recent_metrics), 1),

            # Latency stats
            "avg_latency": sum(m.total_latency for m in recent_metrics) / len(recent_metrics),
            "p50_latency": self._percentile([m.total_latency for m in recent_metrics], 50),
            "p95_latency": self._percentile([m.total_latency for m in recent_metrics], 95),
            "p99_latency": self._percentile([m.total_latency for m in recent_metrics], 99),

            # Token stats
            "total_tokens": sum(m.total_tokens for m in recent_metrics),
            "avg_input_tokens": sum(m.input_tokens for m in recent_metrics) / len(recent_metrics),
            "avg_output_tokens": sum(m.output_tokens for m in recent_metrics) / len(recent_metrics),

            # Cost stats
            "total_cost": sum(m.cost for m in recent_metrics),
            "avg_cost_per_request": sum(m.cost for m in recent_metrics) / len(recent_metrics),

            # Quality stats
            "avg_quality": sum(
                m.quality_score for m in recent_metrics if m.quality_score
            ) / max(sum(1 for m in recent_metrics if m.quality_score), 1),

            # Time window
            "window_minutes": window_minutes
        }

    def _percentile(self, values: List[float], percentile: int) -> float:
        """Calculate percentile"""
        sorted_values = sorted(values)
        index = int(len(sorted_values) * percentile / 100)
        return sorted_values[min(index, len(sorted_values) - 1)]

# Example usage
collector = MetricsCollector()

# Simulate some requests
for i in range(100):
    metrics = LLMMetrics(
        total_latency=0.5 + (i % 10) * 0.1,
        model_latency=0.3,
        preprocessing_latency=0.1,
        postprocessing_latency=0.1,
        input_tokens=500,
        output_tokens=200,
        total_tokens=700,
        cost=0.002,
        cost_per_token=0.000003,
        quality_score=0.8 + (i % 20) * 0.01,
        model="gpt-3.5-turbo"
    )
    collector.record_request(metrics)

# Get summary
summary = collector.get_summary(window_minutes=60)
print("Metrics Summary (60 min window):")
print(f"Requests: {summary['request_count']}")
print(f"Avg Latency: {summary['avg_latency']:.3f}s")
print(f"P95 Latency: {summary['p95_latency']:.3f}s")
print(f"Total Cost: ${summary['total_cost']:.4f}")
print(f"Avg Quality: {summary['avg_quality']:.2f}")

LangSmith Integration

LangSmith: LangChain's observability platform provides automatic tracing, debugging, and testing for LLM applications with minimal code changes.

LangSmith Setup and Tracing

python
from langsmith import Client
from langsmith.run_helpers import traceable
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
import os

# Set up LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-llm-app"

class LangSmithMonitor:
    """Monitor LLM application with LangSmith"""

    def __init__(self, project_name: str = "default"):
        self.client = Client()
        self.project_name = project_name

    @traceable(run_type="chain", name="process_user_query")
    def process_query(self, query: str, context: str = "") -> str:
        """Process query with automatic tracing"""

        # Create prompt
        prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful assistant. Context: {context}"),
            ("user", "{query}")
        ])

        # Create chain
        model = ChatOpenAI(model="gpt-3.5-turbo")
        chain = prompt | model | StrOutputParser()

        # Execute (automatically traced)
        response = chain.invoke({
            "context": context,
            "query": query
        })

        return response

    @traceable(run_type="tool", name="retrieve_context")
    def retrieve_context(self, query: str) -> str:
        """Retrieve context (traced as a tool)"""
        # Simulate context retrieval
        time.sleep(0.1)
        return f"Retrieved context for: {query}"

    @traceable(run_type="chain", name="rag_pipeline")
    def rag_pipeline(self, query: str) -> Dict:
        """Complete RAG pipeline with nested tracing"""

        # Step 1: Retrieve context (traced)
        context = self.retrieve_context(query)

        # Step 2: Process query (traced)
        response = self.process_query(query, context)

        return {
            "query": query,
            "context": context,
            "response": response
        }

    def get_run_statistics(self, project_name: Optional[str] = None) -> Dict:
        """Get statistics for runs"""

        project = project_name or self.project_name

        runs = self.client.list_runs(
            project_name=project,
            execution_order=1  # Only parent runs
        )

        run_list = list(runs)

        if not run_list:
            return {}

        # Calculate statistics
        latencies = [
            (run.end_time - run.start_time).total_seconds()
            for run in run_list
            if run.end_time and run.start_time
        ]

        errors = [run for run in run_list if run.error]

        return {
            "total_runs": len(run_list),
            "error_count": len(errors),
            "error_rate": len(errors) / len(run_list),
            "avg_latency": sum(latencies) / len(latencies) if latencies else 0,
            "total_tokens": sum(
                run.outputs.get("token_usage", {}).get("total_tokens", 0)
                for run in run_list
                if run.outputs
            )
        }

    def create_dataset(self, name: str, examples: List[Dict]):
        """Create evaluation dataset"""

        dataset = self.client.create_dataset(
            dataset_name=name,
            description="Test dataset for evaluation"
        )

        for example in examples:
            self.client.create_example(
                dataset_id=dataset.id,
                inputs=example["inputs"],
                outputs=example["outputs"]
            )

        return dataset

    def run_evaluation(self, dataset_name: str):
        """Run evaluation on dataset"""

        from langsmith.evaluation import evaluate

        def accuracy_evaluator(run, example):
            """Custom evaluator"""
            predicted = run.outputs.get("response", "")
            expected = example.outputs.get("response", "")

            # Simple exact match for demonstration
            return {"score": 1.0 if predicted == expected else 0.0}

        results = evaluate(
            lambda inputs: self.process_query(**inputs),
            data=dataset_name,
            evaluators=[accuracy_evaluator],
            experiment_prefix="experiment"
        )

        return results

# Example usage
monitor = LangSmithMonitor(project_name="my-llm-app")

# Process queries (automatically traced)
result = monitor.rag_pipeline("What is machine learning?")
print(f"Response: {result['response']}")

# Get statistics
stats = monitor.get_run_statistics()
print(f"\nRun Statistics:")
print(f"Total runs: {stats.get('total_runs', 0)}")
print(f"Error rate: {stats.get('error_rate', 0):.2%}")
print(f"Avg latency: {stats.get('avg_latency', 0):.3f}s")

Advanced LangSmith Features

python
class AdvancedLangSmithMonitoring:
    """Advanced LangSmith monitoring features"""

    def __init__(self):
        self.client = Client()

    def add_custom_metadata(self, run_id: str, metadata: Dict):
        """Add custom metadata to a run"""

        self.client.update_run(
            run_id=run_id,
            extra=metadata
        )

    def add_feedback(
        self,
        run_id: str,
        key: str,
        score: float,
        comment: Optional[str] = None
    ):
        """Add feedback to a run"""

        self.client.create_feedback(
            run_id=run_id,
            key=key,
            score=score,
            comment=comment
        )

    def get_feedback_statistics(self, project_name: str) -> Dict:
        """Get feedback statistics"""

        feedbacks = list(self.client.list_feedback(
            project_name=project_name
        ))

        if not feedbacks:
            return {}

        # Aggregate by key
        by_key = {}
        for feedback in feedbacks:
            key = feedback.key
            if key not in by_key:
                by_key[key] = []
            by_key[key].append(feedback.score)

        return {
            key: {
                "count": len(scores),
                "avg_score": sum(scores) / len(scores),
                "min_score": min(scores),
                "max_score": max(scores)
            }
            for key, scores in by_key.items()
        }

    def create_annotation_queue(
        self,
        name: str,
        project_name: str,
        sampling_rate: float = 0.1
    ):
        """Create annotation queue for human review"""

        # Get runs to annotate
        runs = self.client.list_runs(
            project_name=project_name,
            execution_order=1
        )

        # Sample runs
        import random
        sampled_runs = [
            run for run in runs
            if random.random() < sampling_rate
        ]

        return {
            "queue_name": name,
            "runs_to_annotate": len(sampled_runs),
            "run_ids": [run.id for run in sampled_runs[:100]]  # Limit to 100
        }

# Example
advanced = AdvancedLangSmithMonitoring()

# Add feedback
# advanced.add_feedback(
#     run_id="some-run-id",
#     key="helpfulness",
#     score=0.9,
#     comment="Very helpful response"
# )

# Get feedback stats
# stats = advanced.get_feedback_statistics("my-llm-app")
# print("Feedback Statistics:", stats)

Weights & Biases Integration

Weights & Biases (W&B): Track experiments, visualize metrics, and compare model performance across runs with comprehensive dashboards.

W&B Setup for LLMs

python
import wandb
from typing import Any

class WandBLLMTracker:
    """Track LLM metrics with Weights & Biases"""

    def __init__(
        self,
        project: str,
        entity: Optional[str] = None,
        config: Optional[Dict] = None
    ):
        """Initialize W&B tracking"""

        self.run = wandb.init(
            project=project,
            entity=entity,
            config=config or {}
        )

        # Define custom metrics
        wandb.define_metric("latency", summary="mean")
        wandb.define_metric("cost", summary="sum")
        wandb.define_metric("quality_score", summary="mean")
        wandb.define_metric("tokens", summary="sum")

    def log_inference(
        self,
        prompt: str,
        response: str,
        metrics: LLMMetrics,
        metadata: Optional[Dict] = None
    ):
        """Log single inference"""

        log_data = {
            # Performance
            "latency": metrics.total_latency,
            "model_latency": metrics.model_latency,

            # Tokens
            "input_tokens": metrics.input_tokens,
            "output_tokens": metrics.output_tokens,
            "total_tokens": metrics.total_tokens,

            # Cost
            "cost": metrics.cost,
            "cost_per_token": metrics.cost_per_token,

            # Quality
            "quality_score": metrics.quality_score,

            # Model
            "model": metrics.model
        }

        # Add metadata
        if metadata:
            log_data.update(metadata)

        wandb.log(log_data)

        # Log text samples periodically
        if wandb.run.step % 100 == 0:
            wandb.log({
                "examples": wandb.Table(
                    columns=["prompt", "response", "quality"],
                    data=[[
                        prompt[:100],
                        response[:100],
                        metrics.quality_score
                    ]]
                )
            })

    def log_batch_metrics(self, batch_metrics: List[LLMMetrics]):
        """Log aggregated batch metrics"""

        avg_latency = sum(m.total_latency for m in batch_metrics) / len(batch_metrics)
        total_cost = sum(m.cost for m in batch_metrics)
        total_tokens = sum(m.total_tokens for m in batch_metrics)

        wandb.log({
            "batch_size": len(batch_metrics),
            "batch_avg_latency": avg_latency,
            "batch_total_cost": total_cost,
            "batch_total_tokens": total_tokens
        })

    def log_evaluation_results(
        self,
        eval_name: str,
        results: Dict[str, float]
    ):
        """Log evaluation results"""

        wandb.log({
            f"eval/{eval_name}/{metric}": value
            for metric, value in results.items()
        })

        # Create summary table
        wandb.log({
            f"eval/{eval_name}/summary": wandb.Table(
                columns=["Metric", "Value"],
                data=list(results.items())
            )
        })

    def log_model_comparison(
        self,
        model_results: Dict[str, Dict[str, float]]
    ):
        """Log comparison across models"""

        # Create comparison table
        columns = ["Model"] + list(next(iter(model_results.values())).keys())

        data = [
            [model] + list(metrics.values())
            for model, metrics in model_results.items()
        ]

        wandb.log({
            "model_comparison": wandb.Table(
                columns=columns,
                data=data
            )
        })

    def create_alert(
        self,
        metric: str,
        threshold: float,
        comparison: str = ">"
    ):
        """Create alert for metric threshold"""

        # W&B alerts are typically configured through UI
        # This is a placeholder for demonstration
        print(f"Alert configured: {metric} {comparison} {threshold}")

    def finish(self):
        """Finish W&B run"""
        wandb.finish()

# Example usage
tracker = WandBLLMTracker(
    project="llm-production",
    config={
        "model": "gpt-3.5-turbo",
        "temperature": 0.7,
        "max_tokens": 100
    }
)

# Log inferences
for i in range(50):
    metrics = LLMMetrics(
        total_latency=0.5,
        model_latency=0.3,
        preprocessing_latency=0.1,
        postprocessing_latency=0.1,
        input_tokens=500,
        output_tokens=200,
        total_tokens=700,
        cost=0.002,
        cost_per_token=0.000003,
        quality_score=0.85,
        model="gpt-3.5-turbo"
    )

    tracker.log_inference(
        prompt=f"Test prompt {i}",
        response=f"Test response {i}",
        metrics=metrics
    )

# Log evaluation
tracker.log_evaluation_results(
    "test_set_v1",
    {
        "accuracy": 0.92,
        "f1_score": 0.89,
        "avg_latency": 0.45
    }
)

# Log model comparison
tracker.log_model_comparison({
    "gpt-3.5-turbo": {"accuracy": 0.85, "latency": 0.3, "cost": 0.002},
    "gpt-4-turbo": {"accuracy": 0.92, "latency": 0.5, "cost": 0.01},
    "claude-3-sonnet": {"accuracy": 0.90, "latency": 0.4, "cost": 0.006}
})

tracker.finish()

Custom Monitoring Dashboard

Production Monitoring: Build custom dashboards to track application-specific metrics and set up alerts for anomalies and degradation.

python
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import threading
import time

class PrometheusMetrics:
    """Prometheus metrics for LLM monitoring"""

    def __init__(self):
        # Counters
        self.request_count = Counter(
            'llm_requests_total',
            'Total LLM requests',
            ['model', 'status']
        )

        self.token_count = Counter(
            'llm_tokens_total',
            'Total tokens processed',
            ['model', 'type']  # type: input, output
        )

        # Histograms
        self.latency = Histogram(
            'llm_latency_seconds',
            'Request latency',
            ['model', 'operation']
        )

        self.cost = Histogram(
            'llm_cost_dollars',
            'Request cost',
            ['model']
        )

        # Gauges
        self.active_requests = Gauge(
            'llm_active_requests',
            'Currently active requests',
            ['model']
        )

        self.cache_hit_rate = Gauge(
            'llm_cache_hit_rate',
            'Cache hit rate'
        )

    def record_request(
        self,
        model: str,
        status: str,
        latency: float,
        cost: float,
        input_tokens: int,
        output_tokens: int
    ):
        """Record request metrics"""

        # Count
        self.request_count.labels(model=model, status=status).inc()

        # Tokens
        self.token_count.labels(model=model, type='input').inc(input_tokens)
        self.token_count.labels(model=model, type='output').inc(output_tokens)

        # Latency
        self.latency.labels(model=model, operation='total').observe(latency)

        # Cost
        self.cost.labels(model=model).observe(cost)

class MonitoringDashboard:
    """Custom monitoring dashboard"""

    def __init__(self):
        self.metrics = PrometheusMetrics()
        self.app = Flask(__name__)
        self._setup_routes()

    def _setup_routes(self):
        """Setup Flask routes"""

        @self.app.route('/metrics')
        def metrics():
            """Prometheus metrics endpoint"""
            return Response(
                generate_latest(),
                mimetype='text/plain'
            )

        @self.app.route('/health')
        def health():
            """Health check endpoint"""
            return {"status": "healthy"}

    def start_server(self, port: int = 9090):
        """Start metrics server"""

        def run():
            self.app.run(host='0.0.0.0', port=port)

        thread = threading.Thread(target=run, daemon=True)
        thread.start()

        print(f"Metrics server started on port {port}")
        print(f"Metrics available at http://localhost:{port}/metrics")

class AlertingSystem:
    """Alert on metric thresholds"""

    def __init__(self):
        self.alerts = []
        self.alert_history = []

    def add_alert(
        self,
        name: str,
        metric: str,
        threshold: float,
        comparison: str = '>',
        severity: str = 'warning'
    ):
        """Add alert rule"""

        self.alerts.append({
            'name': name,
            'metric': metric,
            'threshold': threshold,
            'comparison': comparison,
            'severity': severity,
            'triggered': False
        })

    def check_alerts(self, current_metrics: Dict[str, float]):
        """Check alert conditions"""

        triggered_alerts = []

        for alert in self.alerts:
            metric_value = current_metrics.get(alert['metric'])

            if metric_value is None:
                continue

            # Check condition
            triggered = False
            if alert['comparison'] == '>':
                triggered = metric_value > alert['threshold']
            elif alert['comparison'] == '<':
                triggered = metric_value < alert['threshold']
            elif alert['comparison'] == '>=':
                triggered = metric_value >= alert['threshold']
            elif alert['comparison'] == '<=':
                triggered = metric_value <= alert['threshold']

            if triggered and not alert['triggered']:
                # New alert
                alert_event = {
                    'name': alert['name'],
                    'severity': alert['severity'],
                    'metric': alert['metric'],
                    'value': metric_value,
                    'threshold': alert['threshold'],
                    'timestamp': datetime.now()
                }

                triggered_alerts.append(alert_event)
                self.alert_history.append(alert_event)

                # Send alert (implement your notification logic)
                self._send_alert(alert_event)

            alert['triggered'] = triggered

        return triggered_alerts

    def _send_alert(self, alert: Dict):
        """Send alert notification"""
        print(f"\n🚨 ALERT: {alert['name']}")
        print(f"   Severity: {alert['severity']}")
        print(f"   {alert['metric']} = {alert['value']:.4f} (threshold: {alert['threshold']})")
        print(f"   Time: {alert['timestamp']}")

# Example usage
dashboard = MonitoringDashboard()
dashboard.start_server(port=9090)

alerting = AlertingSystem()

# Add alerts
alerting.add_alert(
    name="High Latency",
    metric="avg_latency",
    threshold=1.0,
    comparison='>',
    severity='warning'
)

alerting.add_alert(
    name="High Error Rate",
    metric="error_rate",
    threshold=0.05,
    comparison='>',
    severity='critical'
)

# Simulate monitoring
for i in range(10):
    # Record metrics
    dashboard.metrics.record_request(
        model="gpt-3.5-turbo",
        status="success",
        latency=0.5 + i * 0.1,
        cost=0.002,
        input_tokens=500,
        output_tokens=200
    )

    # Check alerts
    current_metrics = {
        "avg_latency": 0.5 + i * 0.1,
        "error_rate": 0.01 if i < 8 else 0.08
    }

    triggered = alerting.check_alerts(current_metrics)

    time.sleep(1)

print("\nAlert History:")
for alert in alerting.alert_history:
    print(f"  {alert['timestamp']}: {alert['name']} ({alert['severity']})")

Production Monitoring Best Practices

python
class ProductionMonitoringSystem:
    """Complete production monitoring system"""

    def __init__(self):
        self.metrics_collector = MetricsCollector()
        self.langsmith = LangSmithMonitor()
        self.wandb_tracker = None  # Initialize when needed
        self.prometheus = PrometheusMetrics()
        self.alerting = AlertingSystem()

    def initialize(self, config: Dict):
        """Initialize all monitoring components"""

        # Set up alerts
        self.alerting.add_alert("High Latency", "p95_latency", 2.0, '>')
        self.alerting.add_alert("High Cost", "hourly_cost", 10.0, '>')
        self.alerting.add_alert("Low Quality", "avg_quality", 0.7, '<')
        self.alerting.add_alert("High Error Rate", "error_rate", 0.05, '>')

        print("Monitoring system initialized")

    def track_request(
        self,
        prompt: str,
        response: str,
        metrics: LLMMetrics,
        metadata: Optional[Dict] = None
    ):
        """Track a single request across all systems"""

        # Collect metrics
        self.metrics_collector.record_request(metrics)

        # Prometheus
        self.prometheus.record_request(
            model=metrics.model,
            status="success",
            latency=metrics.total_latency,
            cost=metrics.cost,
            input_tokens=metrics.input_tokens,
            output_tokens=metrics.output_tokens
        )

        # Check alerts
        summary = self.metrics_collector.get_summary(window_minutes=60)
        if summary:
            self.alerting.check_alerts({
                "p95_latency": summary.get("p95_latency", 0),
                "hourly_cost": summary.get("total_cost", 0),
                "avg_quality": summary.get("avg_quality", 1.0),
                "error_rate": summary.get("error_rate", 0)
            })

    def get_health_status(self) -> Dict:
        """Get overall system health"""

        summary = self.metrics_collector.get_summary(window_minutes=5)

        if not summary:
            return {"status": "unknown"}

        # Determine health
        is_healthy = (
            summary.get("error_rate", 1.0) < 0.05 and
            summary.get("p95_latency", 100) < 2.0 and
            summary.get("avg_quality", 0) > 0.7
        )

        return {
            "status": "healthy" if is_healthy else "degraded",
            "metrics": summary,
            "timestamp": datetime.now().isoformat()
        }

# Example usage
monitoring = ProductionMonitoringSystem()
monitoring.initialize({})

# Track requests
for i in range(100):
    metrics = LLMMetrics(
        total_latency=0.5,
        model_latency=0.3,
        preprocessing_latency=0.1,
        postprocessing_latency=0.1,
        input_tokens=500,
        output_tokens=200,
        total_tokens=700,
        cost=0.002,
        cost_per_token=0.000003,
        quality_score=0.85,
        model="gpt-3.5-turbo"
    )

    monitoring.track_request(
        prompt=f"Test {i}",
        response=f"Response {i}",
        metrics=metrics
    )

# Check health
health = monitoring.get_health_status()
print(f"\nSystem Health: {health['status']}")

Quiz

Test your understanding of monitoring and observability:

Summary

In this lesson, you learned:

  • Key metrics: Latency, cost, quality, and usage metrics for LLM applications
  • LangSmith: Automatic tracing, debugging, and evaluation
  • Weights & Biases: Experiment tracking and visualization
  • Custom monitoring: Prometheus metrics and alerting systems
  • Production practices: Comprehensive monitoring and health checks

Effective monitoring and observability are essential for maintaining reliable, high-quality LLM applications in production.