Guide May 15, 2026

AI Observability & Monitoring Guide 2026

Production LLM debugging and analytics. Tracing, cost tracking, latency monitoring, prompt versioning, and debugging patterns for AI applications.

Deploying an LLM application to production is just the beginning. Without observability, you're flying blind — you won't know when costs spike, latency degrades, or outputs drift from expected quality. Traditional application monitoring (CPU, memory, error rates) isn't enough for AI systems. You need to trace individual LLM calls, track token usage, monitor prompt effectiveness, and detect model drift. This guide covers the complete observability stack for production AI applications in 2026.

Why AI Observability is Different

Traditional monitoring tracks infrastructure health. AI observability tracks model behavior:

Traditional MonitoringAI Observability
CPU / memory usageToken usage per request
Request latencyTTFT + TPOT breakdown
Error rate (HTTP 5xx)Output quality scores
Log aggregationPrompt / response tracing
Static thresholdsDrift detection
Binary pass/failGraduated quality metrics

Key Metrics to Track

1. Cost Metrics

# Track per-request and aggregate costs
class CostTracker:
    def __init__(self):
        self.pricing = {
            "gpt-5.5": {"input": 5.0, "output": 30.0},      # per 1M tokens
            "gpt-5.4": {"input": 2.50, "output": 15.0},
            "gpt-5.4-mini": {"input": 0.75, "output": 4.50},
        }
        self.usage_log = []
    
    def log_request(self, model, input_tokens, output_tokens, user_id=None):
        """Log a request and calculate cost."""
        if model not in self.pricing:
            return
        
        cost = (
            input_tokens * self.pricing[model]["input"] +
            output_tokens * self.pricing[model]["output"]
        ) / 1_000_000
        
        entry = {
            "timestamp": datetime.now().isoformat(),
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost,
            "user_id": user_id
        }
        self.usage_log.append(entry)
        return cost
    
    def get_daily_cost(self, date=None):
        """Get total cost for a specific date."""
        date = date or datetime.now().date()
        return sum(
            e["cost_usd"] for e in self.usage_log
            if datetime.fromisoformat(e["timestamp"]).date() == date
        )
    
    def get_cost_by_model(self):
        """Break down costs by model."""
        from collections import defaultdict
        by_model = defaultdict(float)
        for entry in self.usage_log:
            by_model[entry["model"]] += entry["cost_usd"]
        return dict(by_model)

2. Latency Metrics

import time

class LatencyTracker:
    def __init__(self):
        self.measurements = []
    
    def measure(self, func, *args, **kwargs):
        """Measure execution time of an LLM call."""
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        
        # Extract timing from response if available
        ttft = getattr(result, 'ttft', None)  # Time to first token
        total_time = end - start
        
        self.measurements.append({
            "timestamp": datetime.now().isoformat(),
            "total_seconds": total_time,
            "ttft_seconds": ttft,
            "model": kwargs.get('model', 'unknown')
        })
        
        return result
    
    def get_percentiles(self):
        """Get latency percentiles."""
        times = [m["total_seconds"] for m in self.measurements]
        if not times:
            return {}
        
        times.sort()
        return {
            "p50": times[int(len(times) * 0.5)],
            "p95": times[int(len(times) * 0.95)],
            "p99": times[int(len(times) * 0.99)],
        }

3. Quality Metrics

class QualityTracker:
    def __init__(self, client):
        self.client = client
        self.scores = []
    
    def evaluate_response(self, prompt, response, expected=None):
        """Evaluate response quality using multiple methods."""
        metrics = {}
        
        # 1. Length check (responses too short/long may indicate problems)
        metrics["length"] = len(response)
        metrics["length_ok"] = 50 < len(response) < 4000
        
        # 2. Format compliance (if structured output expected)
        if expected and isinstance(expected, dict):
            try:
                parsed = json.loads(response)
                metrics["format_match"] = all(k in parsed for k in expected.keys())
            except json.JSONDecodeError:
                metrics["format_match"] = False
        
        # 3. Relevance score (using embeddings)
        prompt_emb = self._embed(prompt)
        response_emb = self._embed(response)
        metrics["relevance"] = self._cosine_similarity(prompt_emb, response_emb)
        
        # 4. Self-evaluation (ask model to score itself)
        if len(response) > 100:
            eval_prompt = f"Rate this response quality 1-10: {response[:500]}"
            eval_response = self.client.chat.completions.create(
                model="gpt-5.4-mini",
                messages=[{"role": "user", "content": eval_prompt}],
                max_tokens=10
            )
            try:
                metrics["self_score"] = int(eval_response.choices[0].message.content.strip())
            except ValueError:
                metrics["self_score"] = None
        
        self.scores.append(metrics)
        return metrics
    
    def _embed(self, text):
        response = self.client.embeddings.create(
            model="text-embedding-3-small", input=text[:8000]
        )
        return response.data[0].embedding

LLM Tracing and Debugging

When something goes wrong, you need to see the full chain of LLM calls, tool executions, and context:

Manual Tracing Implementation

import uuid
from contextvars import ContextVar

current_trace = ContextVar('current_trace', default=None)

class LLMTracer:
    def __init__(self):
        self.traces = {}
    
    def start_trace(self, name, metadata=None):
        """Start a new trace."""
        trace_id = str(uuid.uuid4())
        trace = {
            "id": trace_id,
            "name": name,
            "start_time": datetime.now().isoformat(),
            "spans": [],
            "metadata": metadata or {}
        }
        self.traces[trace_id] = trace
        current_trace.set(trace_id)
        return trace_id
    
    def add_span(self, name, span_type="llm", inputs=None, outputs=None, 
                 latency_ms=None, tokens=None, error=None):
        """Add a span to the current trace."""
        trace_id = current_trace.get()
        if not trace_id or trace_id not in self.traces:
            return
        
        span = {
            "id": str(uuid.uuid4()),
            "name": name,
            "type": span_type,
            "start_time": datetime.now().isoformat(),
            "inputs": self._truncate(inputs),
            "outputs": self._truncate(outputs),
            "latency_ms": latency_ms,
            "tokens": tokens,
            "error": error
        }
        self.traces[trace_id]["spans"].append(span)
        return span["id"]
    
    def end_trace(self, trace_id, status="success"):
        """End a trace."""
        if trace_id in self.traces:
            self.traces[trace_id]["end_time"] = datetime.now().isoformat()
            self.traces[trace_id]["status"] = status
    
    def _truncate(self, obj, max_length=1000):
        """Truncate for storage."""
        text = str(obj)
        return text[:max_length] + "..." if len(text) > max_length else text

# Usage
tracer = LLMTracer()

def generate_with_tracing(prompt, model="gpt-5.4"):
    trace_id = tracer.start_trace("chat_generation", {"user_id": "123"})
    
    start = time.time()
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        )
        latency = (time.time() - start) * 1000
        
        tracer.add_span(
            name="llm_call",
            span_type="llm",
            inputs={"prompt": prompt, "model": model},
            outputs={"response": response.choices[0].message.content},
            latency_ms=latency,
            tokens={
                "input": response.usage.prompt_tokens,
                "output": response.usage.completion_tokens
            }
        )
        
        tracer.end_trace(trace_id, "success")
        return response
        
    except Exception as e:
        tracer.add_span(name="llm_call", span_type="llm", error=str(e))
        tracer.end_trace(trace_id, "error")
        raise

Observability Platforms

Several platforms specialize in AI observability:

PlatformBest ForPricingOpen Source
LangSmithLangChain appsPay per traceNo
LangfuseGeneral LLM tracingSelf-host or cloudYes
Weights & BiasesExperiment trackingPay per userNo
Phoenix (Arize)Evaluation + tracingFree tierYes
OpenTelemetryVendor-neutral tracingFreeYes

Langfuse Integration

from langfuse import Langfuse

langfuse = Langfuse(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com"
)

# Automatic tracing
@langfuse.observe()
def generate_response(prompt):
    return client.chat.completions.create(
        model="gpt-5.4",
        messages=[{"role": "user", "content": prompt}]
    )

# Manual tracing
trace = langfuse.trace(name="user-chat", user_id="user_123")

span = trace.span(name="retrieval")
# ... run retrieval ...
span.end()

span = trace.span(name="generation")
response = generate_response(prompt)
span.generation(
    model="gpt-5.4",
    input=prompt,
    output=response.choices[0].message.content,
    usage={
        "input": response.usage.prompt_tokens,
        "output": response.usage.completion_tokens
    }
)
span.end()

trace.score(name="quality", value=0.9)

Prompt Versioning

Prompts change frequently. Track versions to understand what changed and when:

class PromptRegistry:
    """Version-controlled prompt management."""
    
    def __init__(self):
        self.prompts = {}  # name -> list of versions
    
    def register(self, name, template, metadata=None):
        """Register a new prompt version."""
        import hashlib
        version = hashlib.sha256(template.encode()).hexdigest()[:8]
        
        if name not in self.prompts:
            self.prompts[name] = []
        
        entry = {
            "version": version,
            "template": template,
            "created_at": datetime.now().isoformat(),
            "metadata": metadata or {}
        }
        self.prompts[name].append(entry)
        return version
    
    def get(self, name, version=None):
        """Get a prompt by name and optional version."""
        if name not in self.prompts:
            raise KeyError(f"Prompt '{name}' not found")
        
        versions = self.prompts[name]
        if version is None:
            return versions[-1]  # Latest
        
        for v in versions:
            if v["version"] == version:
                return v
        raise KeyError(f"Version '{version}' not found for prompt '{name}'")
    
    def list_versions(self, name):
        """List all versions of a prompt."""
        return [(v["version"], v["created_at"]) for v in self.prompts.get(name, [])]

# Usage
registry = PromptRegistry()

# Register initial version
v1 = registry.register("summarize", 
    "Summarize the following text in 3 sentences: {text}")

# Register improved version
v2 = registry.register("summarize",
    "Summarize the following text in 3 sentences. Focus on key facts: {text}",
    metadata={"improvement": "Added focus instruction"})

# Use specific version
prompt = registry.get("summarize", version=v1)

# Compare versions
print(registry.list_versions("summarize"))
# [("abc123de", "2026-05-15T10:00:00"), ("fgh456ij", "2026-05-15T11:00:00")]

Drift Detection

Model outputs can drift over time — due to model updates, prompt changes, or data shifts:

class DriftDetector:
    def __init__(self, window_size=100):
        self.window_size = window_size
        self.baseline_scores = []
        self.current_scores = []
    
    def add_baseline(self, score):
        """Add scores from known-good period."""
        self.baseline_scores.append(score)
        if len(self.baseline_scores) > self.window_size:
            self.baseline_scores.pop(0)
    
    def add_current(self, score):
        """Add current production scores."""
        self.current_scores.append(score)
        if len(self.current_scores) > self.window_size:
            self.current_scores.pop(0)
    
    def detect_drift(self, threshold=2.0):
        """Detect drift using z-score."""
        if len(self.baseline_scores) < 30 or len(self.current_scores) < 30:
            return {"drift_detected": False, "reason": "Insufficient data"}
        
        import numpy as np
        baseline_mean = np.mean(self.baseline_scores)
        baseline_std = np.std(self.baseline_scores)
        current_mean = np.mean(self.current_scores)
        
        if baseline_std == 0:
            return {"drift_detected": False, "reason": "No variance in baseline"}
        
        z_score = abs(current_mean - baseline_mean) / baseline_std
        
        return {
            "drift_detected": z_score > threshold,
            "z_score": z_score,
            "baseline_mean": baseline_mean,
            "current_mean": current_mean,
            "threshold": threshold
        }

# Usage
detector = DriftDetector()

# Collect baseline during testing
for score in test_scores:
    detector.add_baseline(score)

# Monitor production
for score in production_scores:
    detector.add_current(score)
    result = detector.detect_drift()
    if result["drift_detected"]:
        alert(f"Drift detected! Z-score: {result['z_score']:.2f}")

Alerting Patterns

class AIObservabilityAlerts:
    def __init__(self):
        self.rules = []
    
    def add_rule(self, name, condition, severity="warning"):
        self.rules.append({"name": name, "condition": condition, "severity": severity})
    
    def check(self, metrics):
        alerts = []
        for rule in self.rules:
            if rule["condition"](metrics):
                alerts.append({
                    "rule": rule["name"],
                    "severity": rule["severity"],
                    "metrics": metrics
                })
        return alerts

# Setup alerts
alerts = AIObservabilityAlerts()

alerts.add_rule(
    "high_latency",
    lambda m: m.get("p99_latency", 0) > 5000,  # 5 seconds
    severity="critical"
)

alerts.add_rule(
    "cost_spike",
    lambda m: m.get("hourly_cost", 0) > m.get("expected_hourly_cost", 10) * 2,
    severity="warning"
)

alerts.add_rule(
    "error_rate",
    lambda m: m.get("error_rate", 0) > 0.05,  # 5%
    severity="critical"
)

alerts.add_rule(
    "quality_degradation",
    lambda m: m.get("avg_quality_score", 1) < 0.7,
    severity="warning"
)

# Check metrics
metrics = {
    "p99_latency": 6200,
    "hourly_cost": 25.0,
    "expected_hourly_cost": 10.0,
    "error_rate": 0.02,
    "avg_quality_score": 0.85
}

for alert in alerts.check(metrics):
    print(f"[{alert['severity'].upper()}] {alert['rule']}")
# [CRITICAL] high_latency
# [WARNING] cost_spike

Building Dashboards

Key visualizations for AI operations:

  • Cost over time — Daily/hourly spend by model and user
  • Latency distribution — Histogram with p50/p95/p99 markers
  • Token usage — Input vs output tokens over time
  • Quality trends — Average quality score over time
  • Error breakdown — By error type (timeout, rate limit, bad output)
  • Model comparison — Cost vs quality for different models

Common Pitfalls

  1. Not logging prompts — You can't debug what you can't see. Log full prompts, not just summaries
  2. Ignoring token costs in development — Test with the cheapest model that gives acceptable quality
  3. No quality metrics — Tracking cost and latency without quality is optimizing the wrong thing
  4. Alert fatigue — Too many alerts leads to ignored alerts. Use graduated severity
  5. Not versioning prompts — When quality changes, you need to know which prompt version caused it
  6. Storing PII in traces — Sanitize traces before sending to third-party observability platforms

Conclusion

AI observability is not optional in production. Start with cost and latency tracking — these are the easiest to implement and the first things that will surprise you. Add tracing when you need to debug complex multi-step flows. Implement quality metrics before you optimize for cost — there's no point in saving money if your outputs become unusable.

The minimum viable observability stack: log every LLM call with model, tokens, cost, and latency; track daily spend; set alerts for cost spikes and error rates; version your prompts. Everything else is optimization.