Tutorial May 11, 2026

AI Streaming Responses Implementation Guide 2026

Implement real-time streaming for AI APIs with SSE. OpenAI, Anthropic, and Google streaming patterns, error recovery, UX design, and production deployment.

Waiting 30 seconds for an AI response feels like an eternity. Streaming changes that: instead of waiting for the complete response, tokens appear one by one as the model generates them. This isn't just a nice UX improvement — it's become the default expectation for any production AI application. Every major chatbot, coding assistant, and AI tool you use streams responses in real time.

This guide covers everything you need to implement streaming responses across all major LLM providers, from the underlying protocols to production-ready patterns.

Why Streaming Matters

The performance difference is dramatic. Consider a typical response of 500 tokens:

MetricNon-StreamingStreaming
Time to first token15-30 seconds0.3-1 second
Perceived responsivenessPoorExcellent
User abandonment riskHighLow
Total timeSameSame
Streaming doesn't make the model faster — the total time is the same. But it transforms the user experience from "did it crash?" to "it's thinking and responding." This is the difference between an app that feels broken and one that feels intelligent.

Server-Sent Events (SSE) Protocol

All major LLM providers use Server-Sent Events (SSE) for streaming. SSE is a simple HTTP-based protocol where the server sends a stream of events to the client. Each event has a data field containing a JSON payload.

The raw SSE format looks like this:

data: {"id":"chatcmpl-abc123","choices":[{"delta":{"content":"Hello"},"index":0}]}

data: {"id":"chatcmpl-abc123","choices":[{"delta":{"content":" world"},"index":0}]}

data: [DONE]

Key SSE characteristics:

  • One-way communication — Server pushes to client, client cannot send data back on the same connection
  • Auto-reconnect — Browsers automatically reconnect if the connection drops
  • Text-based — Each event is a line starting with data:
  • Termination — Stream ends with data: [DONE]

SSE is simpler than WebSockets and purpose-built for server-push scenarios. It works through proxies, load balancers, and firewalls without special configuration.

OpenAI Streaming

Python SDK

from openai import OpenAI

client = OpenAI()

# Using the responses API with streaming
stream = client.responses.create(
    model="gpt-5",
    input="Explain quantum computing in simple terms",
    stream=True
)

for event in stream:
    if event.type == "response.output_text.delta":
        print(event.delta, end="", flush=True)
    elif event.type == "response.completed":
        print("\n[Done]")

Node.js SDK

import OpenAI from 'openai';

const client = new OpenAI();

const stream = await client.responses.create({
    model: 'gpt-5',
    input: 'Explain quantum computing in simple terms',
    stream: true
});

for await (const event of stream) {
    if (event.type === 'response.output_text.delta') {
        process.stdout.write(event.delta);
    }
}

Streaming with Tool Calls

Tool calls in streaming mode arrive in fragments. You need to accumulate them before executing:

stream = client.responses.create(
    model="gpt-5",
    tools=tools,
    input="What's the weather in Tokyo?",
    stream=True
)

tool_calls = {}  # Accumulate fragments

for event in stream:
    if event.type == "response.function_call_arguments.delta":
        call_id = event.call_id
        if call_id not in tool_calls:
            tool_calls[call_id] = {"name": event.name, "arguments": ""}
        tool_calls[call_id]["arguments"] += event.arguments_delta
    elif event.type == "response.completed":
        # Now execute all completed tool calls
        for call_id, call in tool_calls.items():
            args = json.loads(call["arguments"])
            result = execute_tool(call["name"], args)
            print(f"Tool {call['name']} returned: {result}")

Anthropic Streaming

Anthropic's streaming works similarly but uses a different event structure:

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain RAG"}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Low-Level Event Handling

For fine-grained control, handle individual events:

with client.messages.stream(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "message_start":
            print(f"Message started: {event.message.id}")
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "message_delta":
            # Usage info and stop reason
            print(f"\nStop reason: {event.delta.stop_reason}")
            print(f"Tokens: {event.usage.output_tokens}")

Google Gemini Streaming

import google.generativeai as genai

model = genai.GenerativeModel('gemini-2.5-pro')

response = model.generate_content(
    "Explain machine learning",
    stream=True
)

for chunk in response:
    print(chunk.text, end="", flush=True)

Streaming API Comparison

Feature OpenAI Anthropic Google
Protocol SSE SSE SSE
Granularity Token-level Token-level Chunk-level
Streaming tool calls Yes Yes Yes
Usage stats in stream Yes Yes Yes
Thinking tokens stream Yes (o-series) Yes (extended thinking) Yes
Cancel mid-stream Yes (disconnect) Yes (disconnect) Yes (disconnect)

Frontend Implementation

Basic Browser SSE Client

async function streamChat(message) {
    const response = await fetch('/api/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ message })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop(); // Keep incomplete line

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') return;
                
                try {
                    const parsed = JSON.parse(data);
                    const content = parsed.choices?.[0]?.delta?.content || '';
                    appendToChat(content);
                } catch (e) {
                    // Skip malformed events
                }
            }
        }
    }
}

React Component with Streaming

import { useState, useRef } from 'react';

function ChatComponent() {
    const [messages, setMessages] = useState([]);
    const [isStreaming, setIsStreaming] = useState(false);
    const abortRef = useRef(null);

    const sendMessage = async (content) => {
        setMessages(prev => [...prev, { role: 'user', content }]);
        setMessages(prev => [...prev, { role: 'assistant', content: '' }]);
        setIsStreaming(true);

        abortRef.current = new AbortController();

        try {
            const response = await fetch('/api/chat', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ messages: [...messages, { role: 'user', content }] }),
                signal: abortRef.current.signal
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder();
            let buffer = '';
            let assistantContent = '';

            while (true) {
                const { done, value } = await reader.read();
                if (done) break;

                buffer += decoder.decode(value, { stream: true });
                const lines = buffer.split('\n');
                buffer = lines.pop();

                for (const line of lines) {
                    if (line.startsWith('data: ') && line.slice(6) !== '[DONE]') {
                        const parsed = JSON.parse(line.slice(6));
                        const delta = parsed.choices?.[0]?.delta?.content || '';
                        assistantContent += delta;
                        
                        setMessages(prev => {
                            const updated = [...prev];
                            updated[updated.length - 1] = { 
                                role: 'assistant', 
                                content: assistantContent 
                            };
                            return updated;
                        });
                    }
                }
            }
        } catch (e) {
            if (e.name === 'AbortError') {
                console.log('Stream cancelled by user');
            }
        } finally {
            setIsStreaming(false);
        }
    };

    const stopStreaming = () => {
        abortRef.current?.abort();
    };

    return (
        // Your chat UI with stop button
        // isStreaming && <button onClick={stopStreaming}>Stop</button>
    );
}

Backend Proxy Pattern

Never expose API keys to the frontend. Always proxy through your backend:

# FastAPI example
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json

app = FastAPI()
client = OpenAI()

@app.post("/api/chat")
async def chat(request: dict):
    messages = request.get("messages", [])
    
    def generate():
        stream = client.chat.completions.create(
            model="gpt-5",
            messages=messages,
            stream=True
        )
        for chunk in stream:
            if chunk.choices[0].delta.content:
                data = json.dumps({
                    "choices": [{
                        "delta": {"content": chunk.choices[0].delta.content}
                    }]
                })
                yield f"data: {data}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # Disable nginx buffering
        }
    )

Error Recovery in Streaming

Streaming introduces error scenarios that non-streaming APIs don't have. Here's how to handle them:

1. Mid-Stream Disconnects

The connection drops while streaming. Your client should detect this and offer to retry:

async function streamWithRetry(message, maxRetries = 2) {
    for (let attempt = 0; attempt <= maxRetries; attempt++) {
        try {
            return await streamChat(message);
        } catch (e) {
            if (attempt === maxRetries) throw e;
            
            // Exponential backoff
            const delay = Math.pow(2, attempt) * 1000;
            await new Promise(r => setTimeout(r, delay));
        }
    }
}

2. Partial Response Handling

Always save the partial response so users don't lose content if the stream breaks:

// Auto-save partial responses
let lastSavedLength = 0;

function appendToChat(content) {
    assistantContent += content;
    
    // Save every 50 characters
    if (assistantContent.length - lastSavedLength > 50) {
        localStorage.setItem('draft_response', assistantContent);
        lastSavedLength = assistantContent.length;
    }
}

3. Timeout Detection

class StreamTimeout {
    constructor(ms) {
        this.ms = ms;
        this.timer = null;
    }
    
    start(onTimeout) {
        this.reset(onTimeout);
    }
    
    reset(onTimeout) {
        clearTimeout(this.timer);
        this.timer = setTimeout(onTimeout, this.ms);
    }
    
    clear() {
        clearTimeout(this.timer);
    }
}

// Usage: timeout if no token received for 30 seconds
const timeout = new StreamTimeout(30000);
timeout.start(() => {
    console.error('Stream timeout - no data received for 30s');
    abortController.abort();
});

// Reset timeout on each received token
// timeout.reset(() => abortController.abort());

UX Patterns for Streaming

Typing Indicator

Show a typing indicator before the first token arrives:

<div class="typing-indicator">
    <span></span><span></span><span></span>
</div>

<style>
.typing-indicator span {
    display: inline-block;
    width: 8px;
    height: 8px;
    border-radius: 50%;
    background: #94a3b8;
    animation: bounce 1.4s infinite;
}
.typing-indicator span:nth-child(2) { animation-delay: 0.2s; }
.typing-indicator span:nth-child(3) { animation-delay: 0.4s; }
@keyframes bounce {
    0%, 60%, 100% { transform: translateY(0); }
    30% { transform: translateY(-10px); }
}
</style>

Markdown Rendering During Streaming

One challenge: rendering markdown as it streams. Code blocks arrive character by character, so ``` appears before the block is complete:

import { marked } from 'marked';

function StreamingRenderer() {
    let fullContent = '';
    
    function append(delta) {
        fullContent += delta;
        // Re-render the full content each time
        // marked handles incomplete markdown gracefully
        const html = marked.parse(fullContent);
        document.getElementById('output').innerHTML = html;
    }
    
    return { append };
}

For better performance with long responses, debounce the markdown rendering:

let renderTimeout = null;
function append(delta) {
    fullContent += delta;
    
    // Show raw text immediately
    document.getElementById('raw-output').textContent += delta;
    
    // Debounce markdown rendering
    clearTimeout(renderTimeout);
    renderTimeout = setTimeout(() => {
        document.getElementById('formatted-output').innerHTML = marked.parse(fullContent);
    }, 100);
}

Copy Button for Completed Responses

Show a copy button only after streaming completes:

function onStreamComplete() {
    const copyBtn = document.createElement('button');
    copyBtn.textContent = 'Copy';
    copyBtn.onclick = () => {
        navigator.clipboard.writeText(fullContent);
        copyBtn.textContent = 'Copied!';
        setTimeout(() => copyBtn.textContent = 'Copy', 2000);
    };
    document.querySelector('.message:last-child').appendChild(copyBtn);
}

Production Considerations

Proxy and Load Balancer Configuration

SSE connections are long-lived. Your infrastructure must support this:

  • Nginx: Set proxy_buffering off and proxy_read_timeout 300s
  • Cloudflare: Disable response buffering, or use X-Accel-Buffering: no header
  • AWS ALB: Increase idle timeout (default 60s may not be enough)
  • Kubernetes: Set appropriate proxy-read-timeout in ingress annotations

Connection Management

# FastAPI with connection lifecycle
from fastapi import Request

@app.post("/api/chat")
async def chat(request: Request):
    async def generate():
        try:
            stream = client.chat.completions.create(
                model="gpt-5",
                messages=messages,
                stream=True
            )
            for chunk in stream:
                # Check if client disconnected
                if await request.is_disconnected():
                    print("Client disconnected, stopping stream")
                    break
                yield format_sse(chunk)
        finally:
            print("Stream ended")
            # Cleanup resources
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Token Counting and Cost Tracking

Streaming responses include usage information in the final event. Always capture this for cost tracking:

total_tokens = 0

for event in stream:
    if event.type == "response.completed":
        total_tokens = event.response.usage.total_tokens
        input_tokens = event.response.usage.input_tokens
        output_tokens = event.response.usage.output_tokens
        
        # Log to your cost tracking system
        log_usage(
            model="gpt-5",
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=calculate_cost("gpt-5", input_tokens, output_tokens)
        )

Concurrent Stream Limits

Each streaming connection holds resources on your server. Implement limits:

from asyncio import Semaphore

# Limit concurrent streams per user
stream_semaphores = {}

async def chat(request: Request):
    user_id = get_user_id(request)
    if user_id not in stream_semaphores:
        stream_semaphores[user_id] = Semaphore(3)  # Max 3 concurrent streams
    
    if stream_semaphores[user_id].locked():
        return {"error": "Too many concurrent streams"}
    
    async with stream_semaphores[user_id]:
        # Process stream
        pass

Advanced Patterns

Multi-Model Streaming with Fallback

Stream from a primary model and fall back to a secondary if it fails:

async def stream_with_fallback(messages, primary="gpt-5", fallback="claude-sonnet-4-20250514"):
    try:
        async for chunk in stream_openai(primary, messages):
            yield chunk
    except Exception as e:
        print(f"Primary model failed: {e}, falling back")
        yield f"data: {json.dumps({'fallback': True})}\n\n"
        async for chunk in stream_anthropic(fallback, messages):
            yield chunk

Streaming with Caching

Cache complete responses for identical queries, but still stream them to the client:

import hashlib, redis

r = redis.Redis()

async def cached_stream(messages, model):
    cache_key = hashlib.sha256(
        f"{model}:{json.dumps(messages)}".encode()
    ).hexdigest()
    
    # Check cache first
    cached = r.get(f"stream:{cache_key}")
    if cached:
        # Simulate streaming from cache
        full_response = json.loads(cached)
        for i in range(0, len(full_response), 10):
            yield format_sse(full_response[i:i+10])
            await asyncio.sleep(0.01)  # Realistic feel
        yield "data: [DONE]\n\n"
        return
    
    # Stream from API and cache
    full_content = ""
    async for chunk in stream_from_api(model, messages):
        full_content += extract_content(chunk)
        yield chunk
    
    r.setex(f"stream:{cache_key}", 3600, json.dumps(full_content))

Common Pitfalls

PitfallSolution
Proxy buffering breaking streaming Set X-Accel-Buffering: no and disable proxy buffering
Not handling partial JSON in SSE events Buffer incomplete lines and parse on newline
Memory leak from abandoned connections Check is_disconnected() and set timeouts
Markdown flickering during streaming Debounce rendering or use incremental parsers
Exposing API keys in frontend Always proxy through backend
Not tracking token usage from streams Capture usage from the final stream event
CORS issues with SSE Configure CORS headers on your proxy

Conclusion

Streaming is no longer optional for AI applications — it's the baseline expectation. The good news is that all major LLM providers use the same SSE protocol, so your core streaming infrastructure works across them. The real engineering work is in the details: proper error recovery, UX polish, proxy configuration, and cost tracking.

Start with the basic patterns in this guide, then layer on the advanced patterns as your application scales. The most important thing is to never expose API keys to the frontend and always handle mid-stream failures gracefully. Your users will thank you for it.