AI Streaming Responses Implementation Guide 2026
Implement real-time streaming for AI APIs with SSE. OpenAI, Anthropic, and Google streaming patterns, error recovery, UX design, and production deployment.
Waiting 30 seconds for an AI response feels like an eternity. Streaming changes that: instead of waiting for the complete response, tokens appear one by one as the model generates them. This isn't just a nice UX improvement — it's become the default expectation for any production AI application. Every major chatbot, coding assistant, and AI tool you use streams responses in real time.
This guide covers everything you need to implement streaming responses across all major LLM providers, from the underlying protocols to production-ready patterns.
Why Streaming Matters
The performance difference is dramatic. Consider a typical response of 500 tokens:
| Metric | Non-Streaming | Streaming |
|---|---|---|
| Time to first token | 15-30 seconds | 0.3-1 second |
| Perceived responsiveness | Poor | Excellent |
| User abandonment risk | High | Low |
| Total time | Same | Same |
Streaming doesn't make the model faster — the total time is the same. But it transforms the user experience from "did it crash?" to "it's thinking and responding." This is the difference between an app that feels broken and one that feels intelligent.
Server-Sent Events (SSE) Protocol
All major LLM providers use Server-Sent Events (SSE) for streaming. SSE is a simple HTTP-based protocol where the server sends a stream of events to the client. Each event has a data field containing a JSON payload.
The raw SSE format looks like this:
data: {"id":"chatcmpl-abc123","choices":[{"delta":{"content":"Hello"},"index":0}]}
data: {"id":"chatcmpl-abc123","choices":[{"delta":{"content":" world"},"index":0}]}
data: [DONE]
Key SSE characteristics:
- One-way communication — Server pushes to client, client cannot send data back on the same connection
- Auto-reconnect — Browsers automatically reconnect if the connection drops
- Text-based — Each event is a line starting with
data: - Termination — Stream ends with
data: [DONE]
SSE is simpler than WebSockets and purpose-built for server-push scenarios. It works through proxies, load balancers, and firewalls without special configuration.
OpenAI Streaming
Python SDK
from openai import OpenAI
client = OpenAI()
# Using the responses API with streaming
stream = client.responses.create(
model="gpt-5",
input="Explain quantum computing in simple terms",
stream=True
)
for event in stream:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
elif event.type == "response.completed":
print("\n[Done]")
Node.js SDK
import OpenAI from 'openai';
const client = new OpenAI();
const stream = await client.responses.create({
model: 'gpt-5',
input: 'Explain quantum computing in simple terms',
stream: true
});
for await (const event of stream) {
if (event.type === 'response.output_text.delta') {
process.stdout.write(event.delta);
}
}
Streaming with Tool Calls
Tool calls in streaming mode arrive in fragments. You need to accumulate them before executing:
stream = client.responses.create(
model="gpt-5",
tools=tools,
input="What's the weather in Tokyo?",
stream=True
)
tool_calls = {} # Accumulate fragments
for event in stream:
if event.type == "response.function_call_arguments.delta":
call_id = event.call_id
if call_id not in tool_calls:
tool_calls[call_id] = {"name": event.name, "arguments": ""}
tool_calls[call_id]["arguments"] += event.arguments_delta
elif event.type == "response.completed":
# Now execute all completed tool calls
for call_id, call in tool_calls.items():
args = json.loads(call["arguments"])
result = execute_tool(call["name"], args)
print(f"Tool {call['name']} returned: {result}")
Anthropic Streaming
Anthropic's streaming works similarly but uses a different event structure:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain RAG"}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
Low-Level Event Handling
For fine-grained control, handle individual events:
with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "message_start":
print(f"Message started: {event.message.id}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "message_delta":
# Usage info and stop reason
print(f"\nStop reason: {event.delta.stop_reason}")
print(f"Tokens: {event.usage.output_tokens}")
Google Gemini Streaming
import google.generativeai as genai
model = genai.GenerativeModel('gemini-2.5-pro')
response = model.generate_content(
"Explain machine learning",
stream=True
)
for chunk in response:
print(chunk.text, end="", flush=True)
Streaming API Comparison
| Feature | OpenAI | Anthropic | |
|---|---|---|---|
| Protocol | SSE | SSE | SSE |
| Granularity | Token-level | Token-level | Chunk-level |
| Streaming tool calls | Yes | Yes | Yes |
| Usage stats in stream | Yes | Yes | Yes |
| Thinking tokens stream | Yes (o-series) | Yes (extended thinking) | Yes |
| Cancel mid-stream | Yes (disconnect) | Yes (disconnect) | Yes (disconnect) |
Frontend Implementation
Basic Browser SSE Client
async function streamChat(message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
const content = parsed.choices?.[0]?.delta?.content || '';
appendToChat(content);
} catch (e) {
// Skip malformed events
}
}
}
}
}
React Component with Streaming
import { useState, useRef } from 'react';
function ChatComponent() {
const [messages, setMessages] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef(null);
const sendMessage = async (content) => {
setMessages(prev => [...prev, { role: 'user', content }]);
setMessages(prev => [...prev, { role: 'assistant', content: '' }]);
setIsStreaming(true);
abortRef.current = new AbortController();
try {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: [...messages, { role: 'user', content }] }),
signal: abortRef.current.signal
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let assistantContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop();
for (const line of lines) {
if (line.startsWith('data: ') && line.slice(6) !== '[DONE]') {
const parsed = JSON.parse(line.slice(6));
const delta = parsed.choices?.[0]?.delta?.content || '';
assistantContent += delta;
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1] = {
role: 'assistant',
content: assistantContent
};
return updated;
});
}
}
}
} catch (e) {
if (e.name === 'AbortError') {
console.log('Stream cancelled by user');
}
} finally {
setIsStreaming(false);
}
};
const stopStreaming = () => {
abortRef.current?.abort();
};
return (
// Your chat UI with stop button
// isStreaming && <button onClick={stopStreaming}>Stop</button>
);
}
Backend Proxy Pattern
Never expose API keys to the frontend. Always proxy through your backend:
# FastAPI example
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
import json
app = FastAPI()
client = OpenAI()
@app.post("/api/chat")
async def chat(request: dict):
messages = request.get("messages", [])
def generate():
stream = client.chat.completions.create(
model="gpt-5",
messages=messages,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
data = json.dumps({
"choices": [{
"delta": {"content": chunk.choices[0].delta.content}
}]
})
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Disable nginx buffering
}
)
Error Recovery in Streaming
Streaming introduces error scenarios that non-streaming APIs don't have. Here's how to handle them:
1. Mid-Stream Disconnects
The connection drops while streaming. Your client should detect this and offer to retry:
async function streamWithRetry(message, maxRetries = 2) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await streamChat(message);
} catch (e) {
if (attempt === maxRetries) throw e;
// Exponential backoff
const delay = Math.pow(2, attempt) * 1000;
await new Promise(r => setTimeout(r, delay));
}
}
}
2. Partial Response Handling
Always save the partial response so users don't lose content if the stream breaks:
// Auto-save partial responses
let lastSavedLength = 0;
function appendToChat(content) {
assistantContent += content;
// Save every 50 characters
if (assistantContent.length - lastSavedLength > 50) {
localStorage.setItem('draft_response', assistantContent);
lastSavedLength = assistantContent.length;
}
}
3. Timeout Detection
class StreamTimeout {
constructor(ms) {
this.ms = ms;
this.timer = null;
}
start(onTimeout) {
this.reset(onTimeout);
}
reset(onTimeout) {
clearTimeout(this.timer);
this.timer = setTimeout(onTimeout, this.ms);
}
clear() {
clearTimeout(this.timer);
}
}
// Usage: timeout if no token received for 30 seconds
const timeout = new StreamTimeout(30000);
timeout.start(() => {
console.error('Stream timeout - no data received for 30s');
abortController.abort();
});
// Reset timeout on each received token
// timeout.reset(() => abortController.abort());
UX Patterns for Streaming
Typing Indicator
Show a typing indicator before the first token arrives:
<div class="typing-indicator">
<span></span><span></span><span></span>
</div>
<style>
.typing-indicator span {
display: inline-block;
width: 8px;
height: 8px;
border-radius: 50%;
background: #94a3b8;
animation: bounce 1.4s infinite;
}
.typing-indicator span:nth-child(2) { animation-delay: 0.2s; }
.typing-indicator span:nth-child(3) { animation-delay: 0.4s; }
@keyframes bounce {
0%, 60%, 100% { transform: translateY(0); }
30% { transform: translateY(-10px); }
}
</style>
Markdown Rendering During Streaming
One challenge: rendering markdown as it streams. Code blocks arrive character by character, so ``` appears before the block is complete:
import { marked } from 'marked';
function StreamingRenderer() {
let fullContent = '';
function append(delta) {
fullContent += delta;
// Re-render the full content each time
// marked handles incomplete markdown gracefully
const html = marked.parse(fullContent);
document.getElementById('output').innerHTML = html;
}
return { append };
}
For better performance with long responses, debounce the markdown rendering:
let renderTimeout = null;
function append(delta) {
fullContent += delta;
// Show raw text immediately
document.getElementById('raw-output').textContent += delta;
// Debounce markdown rendering
clearTimeout(renderTimeout);
renderTimeout = setTimeout(() => {
document.getElementById('formatted-output').innerHTML = marked.parse(fullContent);
}, 100);
}
Copy Button for Completed Responses
Show a copy button only after streaming completes:
function onStreamComplete() {
const copyBtn = document.createElement('button');
copyBtn.textContent = 'Copy';
copyBtn.onclick = () => {
navigator.clipboard.writeText(fullContent);
copyBtn.textContent = 'Copied!';
setTimeout(() => copyBtn.textContent = 'Copy', 2000);
};
document.querySelector('.message:last-child').appendChild(copyBtn);
}
Production Considerations
Proxy and Load Balancer Configuration
SSE connections are long-lived. Your infrastructure must support this:
- Nginx: Set
proxy_buffering offandproxy_read_timeout 300s - Cloudflare: Disable response buffering, or use
X-Accel-Buffering: noheader - AWS ALB: Increase idle timeout (default 60s may not be enough)
- Kubernetes: Set appropriate
proxy-read-timeoutin ingress annotations
Connection Management
# FastAPI with connection lifecycle
from fastapi import Request
@app.post("/api/chat")
async def chat(request: Request):
async def generate():
try:
stream = client.chat.completions.create(
model="gpt-5",
messages=messages,
stream=True
)
for chunk in stream:
# Check if client disconnected
if await request.is_disconnected():
print("Client disconnected, stopping stream")
break
yield format_sse(chunk)
finally:
print("Stream ended")
# Cleanup resources
return StreamingResponse(generate(), media_type="text/event-stream")
Token Counting and Cost Tracking
Streaming responses include usage information in the final event. Always capture this for cost tracking:
total_tokens = 0
for event in stream:
if event.type == "response.completed":
total_tokens = event.response.usage.total_tokens
input_tokens = event.response.usage.input_tokens
output_tokens = event.response.usage.output_tokens
# Log to your cost tracking system
log_usage(
model="gpt-5",
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=calculate_cost("gpt-5", input_tokens, output_tokens)
)
Concurrent Stream Limits
Each streaming connection holds resources on your server. Implement limits:
from asyncio import Semaphore
# Limit concurrent streams per user
stream_semaphores = {}
async def chat(request: Request):
user_id = get_user_id(request)
if user_id not in stream_semaphores:
stream_semaphores[user_id] = Semaphore(3) # Max 3 concurrent streams
if stream_semaphores[user_id].locked():
return {"error": "Too many concurrent streams"}
async with stream_semaphores[user_id]:
# Process stream
pass
Advanced Patterns
Multi-Model Streaming with Fallback
Stream from a primary model and fall back to a secondary if it fails:
async def stream_with_fallback(messages, primary="gpt-5", fallback="claude-sonnet-4-20250514"):
try:
async for chunk in stream_openai(primary, messages):
yield chunk
except Exception as e:
print(f"Primary model failed: {e}, falling back")
yield f"data: {json.dumps({'fallback': True})}\n\n"
async for chunk in stream_anthropic(fallback, messages):
yield chunk
Streaming with Caching
Cache complete responses for identical queries, but still stream them to the client:
import hashlib, redis
r = redis.Redis()
async def cached_stream(messages, model):
cache_key = hashlib.sha256(
f"{model}:{json.dumps(messages)}".encode()
).hexdigest()
# Check cache first
cached = r.get(f"stream:{cache_key}")
if cached:
# Simulate streaming from cache
full_response = json.loads(cached)
for i in range(0, len(full_response), 10):
yield format_sse(full_response[i:i+10])
await asyncio.sleep(0.01) # Realistic feel
yield "data: [DONE]\n\n"
return
# Stream from API and cache
full_content = ""
async for chunk in stream_from_api(model, messages):
full_content += extract_content(chunk)
yield chunk
r.setex(f"stream:{cache_key}", 3600, json.dumps(full_content))
Common Pitfalls
| Pitfall | Solution |
|---|---|
| Proxy buffering breaking streaming | Set X-Accel-Buffering: no and disable proxy buffering |
| Not handling partial JSON in SSE events | Buffer incomplete lines and parse on newline |
| Memory leak from abandoned connections | Check is_disconnected() and set timeouts |
| Markdown flickering during streaming | Debounce rendering or use incremental parsers |
| Exposing API keys in frontend | Always proxy through backend |
| Not tracking token usage from streams | Capture usage from the final stream event |
| CORS issues with SSE | Configure CORS headers on your proxy |
Conclusion
Streaming is no longer optional for AI applications — it's the baseline expectation. The good news is that all major LLM providers use the same SSE protocol, so your core streaming infrastructure works across them. The real engineering work is in the details: proper error recovery, UX polish, proxy configuration, and cost tracking.
Start with the basic patterns in this guide, then layer on the advanced patterns as your application scales. The most important thing is to never expose API keys to the frontend and always handle mid-stream failures gracefully. Your users will thank you for it.