Monitoring & Observability for LLM Applications
Learn to implement comprehensive monitoring and observability for production LLM applications to track performance, costs, and quality.
What You'll Learn: Observability is critical for production LLM applications. We'll explore tools and techniques for tracing, metrics, logging, and debugging using industry-standard platforms.
Fundamentals of LLM Observability
Key Metrics to Track
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
import time
from enum import Enum
class MetricType(Enum):
LATENCY = "latency"
COST = "cost"
QUALITY = "quality"
USAGE = "usage"
ERROR = "error"
@dataclass
class LLMMetrics:
"""Core metrics for LLM monitoring"""
# Performance metrics
total_latency: float # Total time including API calls
model_latency: float # Time spent in model inference
preprocessing_latency: float # Time in preprocessing
postprocessing_latency: float # Time in postprocessing
# Token metrics
input_tokens: int
output_tokens: int
total_tokens: int
# Cost metrics
cost: float
cost_per_token: float
# Quality metrics
quality_score: Optional[float] = None
user_feedback: Optional[int] = None # -1, 0, 1
# Context
model: str = ""
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class MetricsCollector:
"""Collect and aggregate LLM metrics"""
def __init__(self):
self.metrics: List[LLMMetrics] = []
self.request_count = 0
self.error_count = 0
def record_request(self, metrics: LLMMetrics):
"""Record metrics for a request"""
self.metrics.append(metrics)
self.request_count += 1
def record_error(self, error_type: str, error_message: str):
"""Record an error"""
self.error_count += 1
def get_summary(self, window_minutes: int = 60) -> Dict:
"""Get summary statistics for time window"""
cutoff = datetime.now().timestamp() - (window_minutes * 60)
recent_metrics = [
m for m in self.metrics
if m.timestamp.timestamp() > cutoff
]
if not recent_metrics:
return {}
return {
"request_count": len(recent_metrics),
"error_count": self.error_count,
"error_rate": self.error_count / max(len(recent_metrics), 1),
# Latency stats
"avg_latency": sum(m.total_latency for m in recent_metrics) / len(recent_metrics),
"p50_latency": self._percentile([m.total_latency for m in recent_metrics], 50),
"p95_latency": self._percentile([m.total_latency for m in recent_metrics], 95),
"p99_latency": self._percentile([m.total_latency for m in recent_metrics], 99),
# Token stats
"total_tokens": sum(m.total_tokens for m in recent_metrics),
"avg_input_tokens": sum(m.input_tokens for m in recent_metrics) / len(recent_metrics),
"avg_output_tokens": sum(m.output_tokens for m in recent_metrics) / len(recent_metrics),
# Cost stats
"total_cost": sum(m.cost for m in recent_metrics),
"avg_cost_per_request": sum(m.cost for m in recent_metrics) / len(recent_metrics),
# Quality stats
"avg_quality": sum(
m.quality_score for m in recent_metrics if m.quality_score
) / max(sum(1 for m in recent_metrics if m.quality_score), 1),
# Time window
"window_minutes": window_minutes
}
def _percentile(self, values: List[float], percentile: int) -> float:
"""Calculate percentile"""
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
# Example usage
collector = MetricsCollector()
# Simulate some requests
for i in range(100):
metrics = LLMMetrics(
total_latency=0.5 + (i % 10) * 0.1,
model_latency=0.3,
preprocessing_latency=0.1,
postprocessing_latency=0.1,
input_tokens=500,
output_tokens=200,
total_tokens=700,
cost=0.002,
cost_per_token=0.000003,
quality_score=0.8 + (i % 20) * 0.01,
model="gpt-3.5-turbo"
)
collector.record_request(metrics)
# Get summary
summary = collector.get_summary(window_minutes=60)
print("Metrics Summary (60 min window):")
print(f"Requests: {summary['request_count']}")
print(f"Avg Latency: {summary['avg_latency']:.3f}s")
print(f"P95 Latency: {summary['p95_latency']:.3f}s")
print(f"Total Cost: ${summary['total_cost']:.4f}")
print(f"Avg Quality: {summary['avg_quality']:.2f}")
LangSmith Integration
LangSmith: LangChain's observability platform provides automatic tracing, debugging, and testing for LLM applications with minimal code changes.
LangSmith Setup and Tracing
from langsmith import Client
from langsmith.run_helpers import traceable
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
import os
# Set up LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-llm-app"
class LangSmithMonitor:
"""Monitor LLM application with LangSmith"""
def __init__(self, project_name: str = "default"):
self.client = Client()
self.project_name = project_name
@traceable(run_type="chain", name="process_user_query")
def process_query(self, query: str, context: str = "") -> str:
"""Process query with automatic tracing"""
# Create prompt
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Context: {context}"),
("user", "{query}")
])
# Create chain
model = ChatOpenAI(model="gpt-3.5-turbo")
chain = prompt | model | StrOutputParser()
# Execute (automatically traced)
response = chain.invoke({
"context": context,
"query": query
})
return response
@traceable(run_type="tool", name="retrieve_context")
def retrieve_context(self, query: str) -> str:
"""Retrieve context (traced as a tool)"""
# Simulate context retrieval
time.sleep(0.1)
return f"Retrieved context for: {query}"
@traceable(run_type="chain", name="rag_pipeline")
def rag_pipeline(self, query: str) -> Dict:
"""Complete RAG pipeline with nested tracing"""
# Step 1: Retrieve context (traced)
context = self.retrieve_context(query)
# Step 2: Process query (traced)
response = self.process_query(query, context)
return {
"query": query,
"context": context,
"response": response
}
def get_run_statistics(self, project_name: Optional[str] = None) -> Dict:
"""Get statistics for runs"""
project = project_name or self.project_name
runs = self.client.list_runs(
project_name=project,
execution_order=1 # Only parent runs
)
run_list = list(runs)
if not run_list:
return {}
# Calculate statistics
latencies = [
(run.end_time - run.start_time).total_seconds()
for run in run_list
if run.end_time and run.start_time
]
errors = [run for run in run_list if run.error]
return {
"total_runs": len(run_list),
"error_count": len(errors),
"error_rate": len(errors) / len(run_list),
"avg_latency": sum(latencies) / len(latencies) if latencies else 0,
"total_tokens": sum(
run.outputs.get("token_usage", {}).get("total_tokens", 0)
for run in run_list
if run.outputs
)
}
def create_dataset(self, name: str, examples: List[Dict]):
"""Create evaluation dataset"""
dataset = self.client.create_dataset(
dataset_name=name,
description="Test dataset for evaluation"
)
for example in examples:
self.client.create_example(
dataset_id=dataset.id,
inputs=example["inputs"],
outputs=example["outputs"]
)
return dataset
def run_evaluation(self, dataset_name: str):
"""Run evaluation on dataset"""
from langsmith.evaluation import evaluate
def accuracy_evaluator(run, example):
"""Custom evaluator"""
predicted = run.outputs.get("response", "")
expected = example.outputs.get("response", "")
# Simple exact match for demonstration
return {"score": 1.0 if predicted == expected else 0.0}
results = evaluate(
lambda inputs: self.process_query(**inputs),
data=dataset_name,
evaluators=[accuracy_evaluator],
experiment_prefix="experiment"
)
return results
# Example usage
monitor = LangSmithMonitor(project_name="my-llm-app")
# Process queries (automatically traced)
result = monitor.rag_pipeline("What is machine learning?")
print(f"Response: {result['response']}")
# Get statistics
stats = monitor.get_run_statistics()
print(f"\nRun Statistics:")
print(f"Total runs: {stats.get('total_runs', 0)}")
print(f"Error rate: {stats.get('error_rate', 0):.2%}")
print(f"Avg latency: {stats.get('avg_latency', 0):.3f}s")
Advanced LangSmith Features
class AdvancedLangSmithMonitoring:
"""Advanced LangSmith monitoring features"""
def __init__(self):
self.client = Client()
def add_custom_metadata(self, run_id: str, metadata: Dict):
"""Add custom metadata to a run"""
self.client.update_run(
run_id=run_id,
extra=metadata
)
def add_feedback(
self,
run_id: str,
key: str,
score: float,
comment: Optional[str] = None
):
"""Add feedback to a run"""
self.client.create_feedback(
run_id=run_id,
key=key,
score=score,
comment=comment
)
def get_feedback_statistics(self, project_name: str) -> Dict:
"""Get feedback statistics"""
feedbacks = list(self.client.list_feedback(
project_name=project_name
))
if not feedbacks:
return {}
# Aggregate by key
by_key = {}
for feedback in feedbacks:
key = feedback.key
if key not in by_key:
by_key[key] = []
by_key[key].append(feedback.score)
return {
key: {
"count": len(scores),
"avg_score": sum(scores) / len(scores),
"min_score": min(scores),
"max_score": max(scores)
}
for key, scores in by_key.items()
}
def create_annotation_queue(
self,
name: str,
project_name: str,
sampling_rate: float = 0.1
):
"""Create annotation queue for human review"""
# Get runs to annotate
runs = self.client.list_runs(
project_name=project_name,
execution_order=1
)
# Sample runs
import random
sampled_runs = [
run for run in runs
if random.random() < sampling_rate
]
return {
"queue_name": name,
"runs_to_annotate": len(sampled_runs),
"run_ids": [run.id for run in sampled_runs[:100]] # Limit to 100
}
# Example
advanced = AdvancedLangSmithMonitoring()
# Add feedback
# advanced.add_feedback(
# run_id="some-run-id",
# key="helpfulness",
# score=0.9,
# comment="Very helpful response"
# )
# Get feedback stats
# stats = advanced.get_feedback_statistics("my-llm-app")
# print("Feedback Statistics:", stats)
Weights & Biases Integration
Weights & Biases (W&B): Track experiments, visualize metrics, and compare model performance across runs with comprehensive dashboards.
W&B Setup for LLMs
import wandb
from typing import Any
class WandBLLMTracker:
"""Track LLM metrics with Weights & Biases"""
def __init__(
self,
project: str,
entity: Optional[str] = None,
config: Optional[Dict] = None
):
"""Initialize W&B tracking"""
self.run = wandb.init(
project=project,
entity=entity,
config=config or {}
)
# Define custom metrics
wandb.define_metric("latency", summary="mean")
wandb.define_metric("cost", summary="sum")
wandb.define_metric("quality_score", summary="mean")
wandb.define_metric("tokens", summary="sum")
def log_inference(
self,
prompt: str,
response: str,
metrics: LLMMetrics,
metadata: Optional[Dict] = None
):
"""Log single inference"""
log_data = {
# Performance
"latency": metrics.total_latency,
"model_latency": metrics.model_latency,
# Tokens
"input_tokens": metrics.input_tokens,
"output_tokens": metrics.output_tokens,
"total_tokens": metrics.total_tokens,
# Cost
"cost": metrics.cost,
"cost_per_token": metrics.cost_per_token,
# Quality
"quality_score": metrics.quality_score,
# Model
"model": metrics.model
}
# Add metadata
if metadata:
log_data.update(metadata)
wandb.log(log_data)
# Log text samples periodically
if wandb.run.step % 100 == 0:
wandb.log({
"examples": wandb.Table(
columns=["prompt", "response", "quality"],
data=[[
prompt[:100],
response[:100],
metrics.quality_score
]]
)
})
def log_batch_metrics(self, batch_metrics: List[LLMMetrics]):
"""Log aggregated batch metrics"""
avg_latency = sum(m.total_latency for m in batch_metrics) / len(batch_metrics)
total_cost = sum(m.cost for m in batch_metrics)
total_tokens = sum(m.total_tokens for m in batch_metrics)
wandb.log({
"batch_size": len(batch_metrics),
"batch_avg_latency": avg_latency,
"batch_total_cost": total_cost,
"batch_total_tokens": total_tokens
})
def log_evaluation_results(
self,
eval_name: str,
results: Dict[str, float]
):
"""Log evaluation results"""
wandb.log({
f"eval/{eval_name}/{metric}": value
for metric, value in results.items()
})
# Create summary table
wandb.log({
f"eval/{eval_name}/summary": wandb.Table(
columns=["Metric", "Value"],
data=list(results.items())
)
})
def log_model_comparison(
self,
model_results: Dict[str, Dict[str, float]]
):
"""Log comparison across models"""
# Create comparison table
columns = ["Model"] + list(next(iter(model_results.values())).keys())
data = [
[model] + list(metrics.values())
for model, metrics in model_results.items()
]
wandb.log({
"model_comparison": wandb.Table(
columns=columns,
data=data
)
})
def create_alert(
self,
metric: str,
threshold: float,
comparison: str = ">"
):
"""Create alert for metric threshold"""
# W&B alerts are typically configured through UI
# This is a placeholder for demonstration
print(f"Alert configured: {metric} {comparison} {threshold}")
def finish(self):
"""Finish W&B run"""
wandb.finish()
# Example usage
tracker = WandBLLMTracker(
project="llm-production",
config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
"max_tokens": 100
}
)
# Log inferences
for i in range(50):
metrics = LLMMetrics(
total_latency=0.5,
model_latency=0.3,
preprocessing_latency=0.1,
postprocessing_latency=0.1,
input_tokens=500,
output_tokens=200,
total_tokens=700,
cost=0.002,
cost_per_token=0.000003,
quality_score=0.85,
model="gpt-3.5-turbo"
)
tracker.log_inference(
prompt=f"Test prompt {i}",
response=f"Test response {i}",
metrics=metrics
)
# Log evaluation
tracker.log_evaluation_results(
"test_set_v1",
{
"accuracy": 0.92,
"f1_score": 0.89,
"avg_latency": 0.45
}
)
# Log model comparison
tracker.log_model_comparison({
"gpt-3.5-turbo": {"accuracy": 0.85, "latency": 0.3, "cost": 0.002},
"gpt-4-turbo": {"accuracy": 0.92, "latency": 0.5, "cost": 0.01},
"claude-3-sonnet": {"accuracy": 0.90, "latency": 0.4, "cost": 0.006}
})
tracker.finish()
Custom Monitoring Dashboard
Production Monitoring: Build custom dashboards to track application-specific metrics and set up alerts for anomalies and degradation.
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from flask import Flask, Response
import threading
import time
class PrometheusMetrics:
"""Prometheus metrics for LLM monitoring"""
def __init__(self):
# Counters
self.request_count = Counter(
'llm_requests_total',
'Total LLM requests',
['model', 'status']
)
self.token_count = Counter(
'llm_tokens_total',
'Total tokens processed',
['model', 'type'] # type: input, output
)
# Histograms
self.latency = Histogram(
'llm_latency_seconds',
'Request latency',
['model', 'operation']
)
self.cost = Histogram(
'llm_cost_dollars',
'Request cost',
['model']
)
# Gauges
self.active_requests = Gauge(
'llm_active_requests',
'Currently active requests',
['model']
)
self.cache_hit_rate = Gauge(
'llm_cache_hit_rate',
'Cache hit rate'
)
def record_request(
self,
model: str,
status: str,
latency: float,
cost: float,
input_tokens: int,
output_tokens: int
):
"""Record request metrics"""
# Count
self.request_count.labels(model=model, status=status).inc()
# Tokens
self.token_count.labels(model=model, type='input').inc(input_tokens)
self.token_count.labels(model=model, type='output').inc(output_tokens)
# Latency
self.latency.labels(model=model, operation='total').observe(latency)
# Cost
self.cost.labels(model=model).observe(cost)
class MonitoringDashboard:
"""Custom monitoring dashboard"""
def __init__(self):
self.metrics = PrometheusMetrics()
self.app = Flask(__name__)
self._setup_routes()
def _setup_routes(self):
"""Setup Flask routes"""
@self.app.route('/metrics')
def metrics():
"""Prometheus metrics endpoint"""
return Response(
generate_latest(),
mimetype='text/plain'
)
@self.app.route('/health')
def health():
"""Health check endpoint"""
return {"status": "healthy"}
def start_server(self, port: int = 9090):
"""Start metrics server"""
def run():
self.app.run(host='0.0.0.0', port=port)
thread = threading.Thread(target=run, daemon=True)
thread.start()
print(f"Metrics server started on port {port}")
print(f"Metrics available at http://localhost:{port}/metrics")
class AlertingSystem:
"""Alert on metric thresholds"""
def __init__(self):
self.alerts = []
self.alert_history = []
def add_alert(
self,
name: str,
metric: str,
threshold: float,
comparison: str = '>',
severity: str = 'warning'
):
"""Add alert rule"""
self.alerts.append({
'name': name,
'metric': metric,
'threshold': threshold,
'comparison': comparison,
'severity': severity,
'triggered': False
})
def check_alerts(self, current_metrics: Dict[str, float]):
"""Check alert conditions"""
triggered_alerts = []
for alert in self.alerts:
metric_value = current_metrics.get(alert['metric'])
if metric_value is None:
continue
# Check condition
triggered = False
if alert['comparison'] == '>':
triggered = metric_value > alert['threshold']
elif alert['comparison'] == '<':
triggered = metric_value < alert['threshold']
elif alert['comparison'] == '>=':
triggered = metric_value >= alert['threshold']
elif alert['comparison'] == '<=':
triggered = metric_value <= alert['threshold']
if triggered and not alert['triggered']:
# New alert
alert_event = {
'name': alert['name'],
'severity': alert['severity'],
'metric': alert['metric'],
'value': metric_value,
'threshold': alert['threshold'],
'timestamp': datetime.now()
}
triggered_alerts.append(alert_event)
self.alert_history.append(alert_event)
# Send alert (implement your notification logic)
self._send_alert(alert_event)
alert['triggered'] = triggered
return triggered_alerts
def _send_alert(self, alert: Dict):
"""Send alert notification"""
print(f"\n🚨 ALERT: {alert['name']}")
print(f" Severity: {alert['severity']}")
print(f" {alert['metric']} = {alert['value']:.4f} (threshold: {alert['threshold']})")
print(f" Time: {alert['timestamp']}")
# Example usage
dashboard = MonitoringDashboard()
dashboard.start_server(port=9090)
alerting = AlertingSystem()
# Add alerts
alerting.add_alert(
name="High Latency",
metric="avg_latency",
threshold=1.0,
comparison='>',
severity='warning'
)
alerting.add_alert(
name="High Error Rate",
metric="error_rate",
threshold=0.05,
comparison='>',
severity='critical'
)
# Simulate monitoring
for i in range(10):
# Record metrics
dashboard.metrics.record_request(
model="gpt-3.5-turbo",
status="success",
latency=0.5 + i * 0.1,
cost=0.002,
input_tokens=500,
output_tokens=200
)
# Check alerts
current_metrics = {
"avg_latency": 0.5 + i * 0.1,
"error_rate": 0.01 if i < 8 else 0.08
}
triggered = alerting.check_alerts(current_metrics)
time.sleep(1)
print("\nAlert History:")
for alert in alerting.alert_history:
print(f" {alert['timestamp']}: {alert['name']} ({alert['severity']})")
Production Monitoring Best Practices
class ProductionMonitoringSystem:
"""Complete production monitoring system"""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.langsmith = LangSmithMonitor()
self.wandb_tracker = None # Initialize when needed
self.prometheus = PrometheusMetrics()
self.alerting = AlertingSystem()
def initialize(self, config: Dict):
"""Initialize all monitoring components"""
# Set up alerts
self.alerting.add_alert("High Latency", "p95_latency", 2.0, '>')
self.alerting.add_alert("High Cost", "hourly_cost", 10.0, '>')
self.alerting.add_alert("Low Quality", "avg_quality", 0.7, '<')
self.alerting.add_alert("High Error Rate", "error_rate", 0.05, '>')
print("Monitoring system initialized")
def track_request(
self,
prompt: str,
response: str,
metrics: LLMMetrics,
metadata: Optional[Dict] = None
):
"""Track a single request across all systems"""
# Collect metrics
self.metrics_collector.record_request(metrics)
# Prometheus
self.prometheus.record_request(
model=metrics.model,
status="success",
latency=metrics.total_latency,
cost=metrics.cost,
input_tokens=metrics.input_tokens,
output_tokens=metrics.output_tokens
)
# Check alerts
summary = self.metrics_collector.get_summary(window_minutes=60)
if summary:
self.alerting.check_alerts({
"p95_latency": summary.get("p95_latency", 0),
"hourly_cost": summary.get("total_cost", 0),
"avg_quality": summary.get("avg_quality", 1.0),
"error_rate": summary.get("error_rate", 0)
})
def get_health_status(self) -> Dict:
"""Get overall system health"""
summary = self.metrics_collector.get_summary(window_minutes=5)
if not summary:
return {"status": "unknown"}
# Determine health
is_healthy = (
summary.get("error_rate", 1.0) < 0.05 and
summary.get("p95_latency", 100) < 2.0 and
summary.get("avg_quality", 0) > 0.7
)
return {
"status": "healthy" if is_healthy else "degraded",
"metrics": summary,
"timestamp": datetime.now().isoformat()
}
# Example usage
monitoring = ProductionMonitoringSystem()
monitoring.initialize({})
# Track requests
for i in range(100):
metrics = LLMMetrics(
total_latency=0.5,
model_latency=0.3,
preprocessing_latency=0.1,
postprocessing_latency=0.1,
input_tokens=500,
output_tokens=200,
total_tokens=700,
cost=0.002,
cost_per_token=0.000003,
quality_score=0.85,
model="gpt-3.5-turbo"
)
monitoring.track_request(
prompt=f"Test {i}",
response=f"Response {i}",
metrics=metrics
)
# Check health
health = monitoring.get_health_status()
print(f"\nSystem Health: {health['status']}")
Quiz
Test your understanding of monitoring and observability:
Summary
In this lesson, you learned:
- Key metrics: Latency, cost, quality, and usage metrics for LLM applications
- LangSmith: Automatic tracing, debugging, and evaluation
- Weights & Biases: Experiment tracking and visualization
- Custom monitoring: Prometheus metrics and alerting systems
- Production practices: Comprehensive monitoring and health checks
Effective monitoring and observability are essential for maintaining reliable, high-quality LLM applications in production.