AI Observability & Monitoring Guide 2026
Production LLM debugging and analytics. Tracing, cost tracking, latency monitoring, prompt versioning, and debugging patterns for AI applications.
Deploying an LLM application to production is just the beginning. Without observability, you're flying blind — you won't know when costs spike, latency degrades, or outputs drift from expected quality. Traditional application monitoring (CPU, memory, error rates) isn't enough for AI systems. You need to trace individual LLM calls, track token usage, monitor prompt effectiveness, and detect model drift. This guide covers the complete observability stack for production AI applications in 2026.
Why AI Observability is Different
Traditional monitoring tracks infrastructure health. AI observability tracks model behavior:
| Traditional Monitoring | AI Observability |
|---|---|
| CPU / memory usage | Token usage per request |
| Request latency | TTFT + TPOT breakdown |
| Error rate (HTTP 5xx) | Output quality scores |
| Log aggregation | Prompt / response tracing |
| Static thresholds | Drift detection |
| Binary pass/fail | Graduated quality metrics |
Key Metrics to Track
1. Cost Metrics
# Track per-request and aggregate costs
class CostTracker:
def __init__(self):
self.pricing = {
"gpt-5.5": {"input": 5.0, "output": 30.0}, # per 1M tokens
"gpt-5.4": {"input": 2.50, "output": 15.0},
"gpt-5.4-mini": {"input": 0.75, "output": 4.50},
}
self.usage_log = []
def log_request(self, model, input_tokens, output_tokens, user_id=None):
"""Log a request and calculate cost."""
if model not in self.pricing:
return
cost = (
input_tokens * self.pricing[model]["input"] +
output_tokens * self.pricing[model]["output"]
) / 1_000_000
entry = {
"timestamp": datetime.now().isoformat(),
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost,
"user_id": user_id
}
self.usage_log.append(entry)
return cost
def get_daily_cost(self, date=None):
"""Get total cost for a specific date."""
date = date or datetime.now().date()
return sum(
e["cost_usd"] for e in self.usage_log
if datetime.fromisoformat(e["timestamp"]).date() == date
)
def get_cost_by_model(self):
"""Break down costs by model."""
from collections import defaultdict
by_model = defaultdict(float)
for entry in self.usage_log:
by_model[entry["model"]] += entry["cost_usd"]
return dict(by_model)
2. Latency Metrics
import time
class LatencyTracker:
def __init__(self):
self.measurements = []
def measure(self, func, *args, **kwargs):
"""Measure execution time of an LLM call."""
start = time.time()
result = func(*args, **kwargs)
end = time.time()
# Extract timing from response if available
ttft = getattr(result, 'ttft', None) # Time to first token
total_time = end - start
self.measurements.append({
"timestamp": datetime.now().isoformat(),
"total_seconds": total_time,
"ttft_seconds": ttft,
"model": kwargs.get('model', 'unknown')
})
return result
def get_percentiles(self):
"""Get latency percentiles."""
times = [m["total_seconds"] for m in self.measurements]
if not times:
return {}
times.sort()
return {
"p50": times[int(len(times) * 0.5)],
"p95": times[int(len(times) * 0.95)],
"p99": times[int(len(times) * 0.99)],
}
3. Quality Metrics
class QualityTracker:
def __init__(self, client):
self.client = client
self.scores = []
def evaluate_response(self, prompt, response, expected=None):
"""Evaluate response quality using multiple methods."""
metrics = {}
# 1. Length check (responses too short/long may indicate problems)
metrics["length"] = len(response)
metrics["length_ok"] = 50 < len(response) < 4000
# 2. Format compliance (if structured output expected)
if expected and isinstance(expected, dict):
try:
parsed = json.loads(response)
metrics["format_match"] = all(k in parsed for k in expected.keys())
except json.JSONDecodeError:
metrics["format_match"] = False
# 3. Relevance score (using embeddings)
prompt_emb = self._embed(prompt)
response_emb = self._embed(response)
metrics["relevance"] = self._cosine_similarity(prompt_emb, response_emb)
# 4. Self-evaluation (ask model to score itself)
if len(response) > 100:
eval_prompt = f"Rate this response quality 1-10: {response[:500]}"
eval_response = self.client.chat.completions.create(
model="gpt-5.4-mini",
messages=[{"role": "user", "content": eval_prompt}],
max_tokens=10
)
try:
metrics["self_score"] = int(eval_response.choices[0].message.content.strip())
except ValueError:
metrics["self_score"] = None
self.scores.append(metrics)
return metrics
def _embed(self, text):
response = self.client.embeddings.create(
model="text-embedding-3-small", input=text[:8000]
)
return response.data[0].embedding
LLM Tracing and Debugging
When something goes wrong, you need to see the full chain of LLM calls, tool executions, and context:
Manual Tracing Implementation
import uuid
from contextvars import ContextVar
current_trace = ContextVar('current_trace', default=None)
class LLMTracer:
def __init__(self):
self.traces = {}
def start_trace(self, name, metadata=None):
"""Start a new trace."""
trace_id = str(uuid.uuid4())
trace = {
"id": trace_id,
"name": name,
"start_time": datetime.now().isoformat(),
"spans": [],
"metadata": metadata or {}
}
self.traces[trace_id] = trace
current_trace.set(trace_id)
return trace_id
def add_span(self, name, span_type="llm", inputs=None, outputs=None,
latency_ms=None, tokens=None, error=None):
"""Add a span to the current trace."""
trace_id = current_trace.get()
if not trace_id or trace_id not in self.traces:
return
span = {
"id": str(uuid.uuid4()),
"name": name,
"type": span_type,
"start_time": datetime.now().isoformat(),
"inputs": self._truncate(inputs),
"outputs": self._truncate(outputs),
"latency_ms": latency_ms,
"tokens": tokens,
"error": error
}
self.traces[trace_id]["spans"].append(span)
return span["id"]
def end_trace(self, trace_id, status="success"):
"""End a trace."""
if trace_id in self.traces:
self.traces[trace_id]["end_time"] = datetime.now().isoformat()
self.traces[trace_id]["status"] = status
def _truncate(self, obj, max_length=1000):
"""Truncate for storage."""
text = str(obj)
return text[:max_length] + "..." if len(text) > max_length else text
# Usage
tracer = LLMTracer()
def generate_with_tracing(prompt, model="gpt-5.4"):
trace_id = tracer.start_trace("chat_generation", {"user_id": "123"})
start = time.time()
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
latency = (time.time() - start) * 1000
tracer.add_span(
name="llm_call",
span_type="llm",
inputs={"prompt": prompt, "model": model},
outputs={"response": response.choices[0].message.content},
latency_ms=latency,
tokens={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
)
tracer.end_trace(trace_id, "success")
return response
except Exception as e:
tracer.add_span(name="llm_call", span_type="llm", error=str(e))
tracer.end_trace(trace_id, "error")
raise
Observability Platforms
Several platforms specialize in AI observability:
| Platform | Best For | Pricing | Open Source |
|---|---|---|---|
| LangSmith | LangChain apps | Pay per trace | No |
| Langfuse | General LLM tracing | Self-host or cloud | Yes |
| Weights & Biases | Experiment tracking | Pay per user | No |
| Phoenix (Arize) | Evaluation + tracing | Free tier | Yes |
| OpenTelemetry | Vendor-neutral tracing | Free | Yes |
Langfuse Integration
from langfuse import Langfuse
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com"
)
# Automatic tracing
@langfuse.observe()
def generate_response(prompt):
return client.chat.completions.create(
model="gpt-5.4",
messages=[{"role": "user", "content": prompt}]
)
# Manual tracing
trace = langfuse.trace(name="user-chat", user_id="user_123")
span = trace.span(name="retrieval")
# ... run retrieval ...
span.end()
span = trace.span(name="generation")
response = generate_response(prompt)
span.generation(
model="gpt-5.4",
input=prompt,
output=response.choices[0].message.content,
usage={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
)
span.end()
trace.score(name="quality", value=0.9)
Prompt Versioning
Prompts change frequently. Track versions to understand what changed and when:
class PromptRegistry:
"""Version-controlled prompt management."""
def __init__(self):
self.prompts = {} # name -> list of versions
def register(self, name, template, metadata=None):
"""Register a new prompt version."""
import hashlib
version = hashlib.sha256(template.encode()).hexdigest()[:8]
if name not in self.prompts:
self.prompts[name] = []
entry = {
"version": version,
"template": template,
"created_at": datetime.now().isoformat(),
"metadata": metadata or {}
}
self.prompts[name].append(entry)
return version
def get(self, name, version=None):
"""Get a prompt by name and optional version."""
if name not in self.prompts:
raise KeyError(f"Prompt '{name}' not found")
versions = self.prompts[name]
if version is None:
return versions[-1] # Latest
for v in versions:
if v["version"] == version:
return v
raise KeyError(f"Version '{version}' not found for prompt '{name}'")
def list_versions(self, name):
"""List all versions of a prompt."""
return [(v["version"], v["created_at"]) for v in self.prompts.get(name, [])]
# Usage
registry = PromptRegistry()
# Register initial version
v1 = registry.register("summarize",
"Summarize the following text in 3 sentences: {text}")
# Register improved version
v2 = registry.register("summarize",
"Summarize the following text in 3 sentences. Focus on key facts: {text}",
metadata={"improvement": "Added focus instruction"})
# Use specific version
prompt = registry.get("summarize", version=v1)
# Compare versions
print(registry.list_versions("summarize"))
# [("abc123de", "2026-05-15T10:00:00"), ("fgh456ij", "2026-05-15T11:00:00")]
Drift Detection
Model outputs can drift over time — due to model updates, prompt changes, or data shifts:
class DriftDetector:
def __init__(self, window_size=100):
self.window_size = window_size
self.baseline_scores = []
self.current_scores = []
def add_baseline(self, score):
"""Add scores from known-good period."""
self.baseline_scores.append(score)
if len(self.baseline_scores) > self.window_size:
self.baseline_scores.pop(0)
def add_current(self, score):
"""Add current production scores."""
self.current_scores.append(score)
if len(self.current_scores) > self.window_size:
self.current_scores.pop(0)
def detect_drift(self, threshold=2.0):
"""Detect drift using z-score."""
if len(self.baseline_scores) < 30 or len(self.current_scores) < 30:
return {"drift_detected": False, "reason": "Insufficient data"}
import numpy as np
baseline_mean = np.mean(self.baseline_scores)
baseline_std = np.std(self.baseline_scores)
current_mean = np.mean(self.current_scores)
if baseline_std == 0:
return {"drift_detected": False, "reason": "No variance in baseline"}
z_score = abs(current_mean - baseline_mean) / baseline_std
return {
"drift_detected": z_score > threshold,
"z_score": z_score,
"baseline_mean": baseline_mean,
"current_mean": current_mean,
"threshold": threshold
}
# Usage
detector = DriftDetector()
# Collect baseline during testing
for score in test_scores:
detector.add_baseline(score)
# Monitor production
for score in production_scores:
detector.add_current(score)
result = detector.detect_drift()
if result["drift_detected"]:
alert(f"Drift detected! Z-score: {result['z_score']:.2f}")
Alerting Patterns
class AIObservabilityAlerts:
def __init__(self):
self.rules = []
def add_rule(self, name, condition, severity="warning"):
self.rules.append({"name": name, "condition": condition, "severity": severity})
def check(self, metrics):
alerts = []
for rule in self.rules:
if rule["condition"](metrics):
alerts.append({
"rule": rule["name"],
"severity": rule["severity"],
"metrics": metrics
})
return alerts
# Setup alerts
alerts = AIObservabilityAlerts()
alerts.add_rule(
"high_latency",
lambda m: m.get("p99_latency", 0) > 5000, # 5 seconds
severity="critical"
)
alerts.add_rule(
"cost_spike",
lambda m: m.get("hourly_cost", 0) > m.get("expected_hourly_cost", 10) * 2,
severity="warning"
)
alerts.add_rule(
"error_rate",
lambda m: m.get("error_rate", 0) > 0.05, # 5%
severity="critical"
)
alerts.add_rule(
"quality_degradation",
lambda m: m.get("avg_quality_score", 1) < 0.7,
severity="warning"
)
# Check metrics
metrics = {
"p99_latency": 6200,
"hourly_cost": 25.0,
"expected_hourly_cost": 10.0,
"error_rate": 0.02,
"avg_quality_score": 0.85
}
for alert in alerts.check(metrics):
print(f"[{alert['severity'].upper()}] {alert['rule']}")
# [CRITICAL] high_latency
# [WARNING] cost_spike
Building Dashboards
Key visualizations for AI operations:
- Cost over time — Daily/hourly spend by model and user
- Latency distribution — Histogram with p50/p95/p99 markers
- Token usage — Input vs output tokens over time
- Quality trends — Average quality score over time
- Error breakdown — By error type (timeout, rate limit, bad output)
- Model comparison — Cost vs quality for different models
Common Pitfalls
- Not logging prompts — You can't debug what you can't see. Log full prompts, not just summaries
- Ignoring token costs in development — Test with the cheapest model that gives acceptable quality
- No quality metrics — Tracking cost and latency without quality is optimizing the wrong thing
- Alert fatigue — Too many alerts leads to ignored alerts. Use graduated severity
- Not versioning prompts — When quality changes, you need to know which prompt version caused it
- Storing PII in traces — Sanitize traces before sending to third-party observability platforms
Conclusion
AI observability is not optional in production. Start with cost and latency tracking — these are the easiest to implement and the first things that will surprise you. Add tracing when you need to debug complex multi-step flows. Implement quality metrics before you optimize for cost — there's no point in saving money if your outputs become unusable.
The minimum viable observability stack: log every LLM call with model, tokens, cost, and latency; track daily spend; set alerts for cost spikes and error rates; version your prompts. Everything else is optimization.
Related Guides: Evaluation & Testing Guide · Cost Optimization · Batch Processing Guide