Tutorial May 10, 2026

AI API Rate Limits and Error Handling Guide 2026

Build resilient AI applications with retry strategies, circuit breakers, and production patterns for OpenAI, Anthropic, Google, and DeepSeek APIs.

Why Rate Limits and Error Handling Matter

Every AI API has rate limits, and every production system will hit them. The question is not whether you will encounter 429 Too Many Requests errors—it is whether your application will gracefully recover or crash in front of your users.

In 2026, as AI workloads scale from prototype to production, rate limit management has become a core engineering discipline. A single unhandled 429 error can cascade through your system, causing timeouts, failed jobs, and degraded user experiences. This guide covers everything you need to know: the actual rate limits for each major provider, proven retry strategies, production patterns for high throughput, and complete Python code examples you can ship today.

Rate Limits by Provider: The Numbers

Rate limits vary dramatically between providers and tiers. Here are the actual numbers as of May 2026 for standard paid accounts.

OpenAI (GPT-5.5, GPT-4.1)

OpenAI uses a token-based rate limiting system with separate limits for requests per minute (RPM) and tokens per minute (TPM). Tier 1 through Tier 5 accounts have progressively higher limits.

TierRPMTPMBatch Queue Limit
Tier 1 ($0–$50)500200K5M
Tier 2 ($50–$200)5,0002M50M
Tier 3 ($200–$1K)10,00010M200M
Tier 4 ($1K–$10K)30,00050M1B
Tier 5 ($10K+)100,000200M5B

OpenAI returns rate limit headers in every response: x-ratelimit-limit-requests, x-ratelimit-limit-tokens, x-ratelimit-remaining-requests, and x-ratelimit-remaining-tokens. Always read these headers to anticipate throttling.

Anthropic (Claude Opus 4.7, Sonnet 4, Haiku 4)

Anthropic uses a combined token-based system with separate limits per model. Claude Opus 4.7 has the strictest limits due to its computational cost.

ModelRPMTPMMax Concurrent
Claude Opus 4.71,000400K50
Claude Sonnet 44,0002M200
Claude Haiku 410,0005M500

Anthropic provides retry-after headers on 429 responses and includes x-ratelimit-remaining-tokens in all responses. The concurrent request limit is a frequent gotcha for teams used to OpenAI's burst-friendly approach.

Google (Gemini 2.5 Pro, Flash)

Google's rate limiting is the most generous for free-tier users but scales differently at production levels. Gemini uses a project-level quota system managed through the Google Cloud Console.

PlanRPM (Pro)RPM (Flash)TPM (Pro)TPM (Flash)
Free15301M1M
Pay-as-you-go2,00010,00020M100M
Enterprise10,00050,000200M500M

Google's API returns quota metrics in the response headers, and you can monitor usage in real-time through the Cloud Console quotas page. Quota increases typically require a support ticket and 1-2 business days for approval.

DeepSeek (V4, R2)

DeepSeek offers the most cost-effective API in 2026, but its rate limits are more conservative, especially for new accounts. The limits scale based on your account balance and usage history.

LevelRPMTPMDaily Token Cap
Starter ($0–$10)100100K50M
Basic ($10–$100)500500K500M
Standard ($100–$1K)2,0002M5B
Enterprise ($1K+)10,00020MUnlimited

DeepSeek returns standard rate limit headers and provides a dashboard for real-time quota monitoring. The daily token cap is the main constraint to watch—it can silently cap your throughput even when RPM and TPM look fine.

Common Error Types You Must Handle

Not all API errors are created equal. Understanding which errors are retryable and which are not is the foundation of resilient error handling.

429 Too Many Requests

The most common rate limit error. It means you have exceeded your RPM, TPM, or concurrent request limit. This error is always retryable after a delay. The retry-after header (when present) tells you exactly how long to wait. If no header is present, use exponential backoff starting at 1 second.

Critical: Never retry a 429 immediately. Each immediate retry wastes one of your remaining quota slots and makes the problem worse. Always wait.

500 Internal Server Error

The provider's infrastructure failed. This is retryable—the failure is transient. However, if you see sustained 500 errors (more than 3 in a row), the provider may be experiencing an outage. Check their status page.

503 Service Unavailable

The provider is temporarily overloaded or under maintenance. Retryable, but with longer backoff intervals. A 503 usually means you should wait 30-60 seconds before retrying.

400 Context Length Exceeded

Your input plus the requested output exceeds the model's context window. This error is not retryable—repeating the same request will always fail. You must reduce the input length, switch to a model with a larger context window, or chunk your input.

401 Unauthorized / 403 Forbidden

Authentication or permission errors. Not retryable. Check your API key, account status, and model access permissions.

Error CodeRetryable?Recommended Action
429Yes (after delay)Exponential backoff, respect retry-after header
500YesRetry with backoff, check provider status
503Yes (long delay)Backoff with 30s+ initial delay
400 (context)NoReduce input, switch model, or chunk
401/403NoCheck API key and permissions

Retry Strategies: Exponential Backoff with Jitter

The industry standard for retrying rate-limited API calls is exponential backoff with jitter. Here is why each component matters and how to implement it.

Why Exponential Backoff?

Linear retries (waiting 1 second between every attempt) are wasteful when the provider needs 30 seconds to recover. Exponential backoff doubles the wait time after each failure: 1s, 2s, 4s, 8s, 16s. This converges on the right delay without overwhelming the provider.

Why Jitter?

Without jitter, all clients that hit the rate limit at the same time will also retry at the same time—causing a "thundering herd" problem. Adding random jitter (typically ±25% of the backoff interval) spreads retries across time, dramatically reducing the chance of repeated collisions.

Complete Python Implementation

import asyncio
import random
import time
from typing import Any, Callable
from openai import OpenAI

client = OpenAI()

async def call_with_retry(
    func: Callable,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter_factor: float = 0.25,
) -> Any:
    """Call an API function with exponential backoff and jitter.
    
    Args:
        func: Async callable that makes the API request
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay cap in seconds
        jitter_factor: Random jitter as fraction of delay (0.25 = +/-25%)
    """
    for attempt in range(max_retries + 1):
        try:
            return await func()
        except Exception as e:
            error_code = getattr(e, 'status_code', None)
            
            # Non-retryable errors: fail immediately
            if error_code in (400, 401, 403):
                raise
            
            # Context length exceeded: fail immediately
            if error_code == 400 and 'context_length' in str(e).lower():
                raise
            
            # No more retries left
            if attempt == max_retries:
                raise
            
            # Calculate backoff delay
            if error_code == 429:
                # Respect retry-after header if present
                retry_after = getattr(e, 'headers', {}).get('retry-after')
                if retry_after:
                    delay = float(retry_after)
                else:
                    delay = min(base_delay * (2 ** attempt), max_delay)
            elif error_code == 503:
                # Service unavailable: use longer initial delay
                delay = min(30.0 * (2 ** attempt), max_delay)
            else:
                delay = min(base_delay * (2 ** attempt), max_delay)
            
            # Add jitter
            jitter = delay * jitter_factor * (2 * random.random() - 1)
            delay = max(0.1, delay + jitter)
            
            print(f"Attempt {attempt + 1} failed ({error_code}). "
                  f"Retrying in {delay:.1f}s...")
            await asyncio.sleep(delay)

# Usage example
async def generate_completion(prompt: str):
    async def _call():
        return await client.chat.completions.create(
            model="gpt-5.5",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=1024
        )
    return await call_with_retry(_call)

Circuit Breaker Pattern

Exponential backoff handles individual request failures. But what happens when the entire provider is down? Without a circuit breaker, your application will keep sending requests that are guaranteed to fail, wasting resources and adding latency.

A circuit breaker tracks consecutive failures and "opens" after a threshold, immediately rejecting requests without calling the API. After a cooldown period, it enters a "half-open" state, allowing one test request. If that succeeds, the circuit closes and normal operation resumes.

import time
from enum import Enum
from typing import Optional

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing, reject immediately
    HALF_OPEN = "half_open" # Testing if provider recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        half_open_max_calls: int = 3,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.half_open_calls = 0
    
    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        
        if self.state == CircuitState.OPEN:
            # Check if recovery timeout has elapsed
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.half_open_max_calls
        
        return False
    
    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max_calls:
                # Provider has recovered
                self.state = CircuitState.CLOSED
                self.failure_count = 0
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.state == CircuitState.HALF_OPEN:
            # Provider is still down
            self.state = CircuitState.OPEN
        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Usage with retry
circuit = CircuitBreaker(failure_threshold=5, recovery_timeout=60)

async def resilient_call(func, **kwargs):
    if not circuit.can_execute():
        raise Exception("Circuit breaker is OPEN - provider unavailable")
    try:
        result = await call_with_retry(func, **kwargs)
        circuit.record_success()
        return result
    except Exception as e:
        circuit.record_failure()
        raise

Token Bucket vs Fixed Window Rate Limiting

Understanding how providers implement rate limiting internally helps you optimize your request patterns.

Fixed Window

The simplest approach: count requests in a fixed time window (e.g., 60 seconds). When the count hits the limit, all subsequent requests are rejected until the window resets. The problem? You can burst through your entire quota in the first 5 seconds and then wait 55 seconds doing nothing. Then at the window boundary, you get another full burst.

DeepSeek uses a variant of fixed window rate limiting for its daily token caps.

Sliding Window

Instead of a fixed boundary, the window slides continuously. Your quota is calculated over the last N seconds from the current moment. This smooths out the burst problem but is computationally more expensive for the provider.

Token Bucket

The gold standard for API rate limiting. You have a bucket of tokens that refills at a constant rate. Each request consumes one or more tokens. If the bucket is empty, the request is rejected. The key insight: the bucket has a maximum capacity (the burst size), so you can send a burst of requests up to the bucket size, but sustained throughput is capped at the refill rate.

OpenAI uses token bucket rate limiting for both RPM and TPM. This is why you see "bursty" behavior—you can send many requests quickly, but sustained throughput is lower.

class TokenBucket:
    """Token bucket rate limiter for client-side throttling."""
    
    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: Tokens added per second
            capacity: Maximum tokens in the bucket
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.time()
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(
            self.capacity,
            self.tokens + elapsed * self.rate
        )
        self.last_refill = now
    
    def can_proceed(self, tokens: int = 1) -> bool:
        self._refill()
        return self.tokens >= tokens
    
    def consume(self, tokens: int = 1) -> bool:
        self._refill()
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False
    
    def wait_time(self, tokens: int = 1) -> float:
        """How long until enough tokens are available."""
        self._refill()
        if self.tokens >= tokens:
            return 0.0
        deficit = tokens - self.tokens
        return deficit / self.rate

# Client-side rate limiter for OpenAI Tier 3 (10K RPM = ~167/sec)
limiter = TokenBucket(rate=167, capacity=500)  # Allow short bursts

async def rate_limited_call(func, **kwargs):
    while not limiter.consume():
        wait = limiter.wait_time()
        await asyncio.sleep(wait)
    return await call_with_retry(func, **kwargs)

Production Patterns for High-Throughput Applications

When you are processing thousands of AI requests per minute, basic retry logic is not enough. You need architectural patterns that handle rate limits at the system level.

Pattern 1: Request Queuing with Priority

Instead of sending API requests directly, push them to a queue (Redis, SQS, RabbitMQ). A worker process pulls from the queue at a controlled rate that stays within your rate limits. Priority queues ensure important requests are processed first.

import heapq
import time
from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedRequest:
    priority: int  # Lower = higher priority
    enqueue_time: float = field(compare=True)
    request_id: str = field(compare=False)
    payload: Any = field(compare=False)

class AIRequestQueue:
    def __init__(self, rate_limiter: TokenBucket):
        self.queue: list[PrioritizedRequest] = []
        self.rate_limiter = rate_limiter
    
    def enqueue(
        self,
        request_id: str,
        payload: Any,
        priority: int = 5,  # 1 = critical, 5 = normal, 9 = batch
    ):
        req = PrioritizedRequest(
            priority=priority,
            enqueue_time=time.time(),
            request_id=request_id,
            payload=payload,
        )
        heapq.heappush(self.queue, req)
    
    async def process_next(self) -> Any:
        if not self.queue:
            return None
        
        # Wait for rate limiter
        while not self.rate_limiter.consume():
            await asyncio.sleep(self.rate_limiter.wait_time())
        
        req = heapq.heappop(self.queue)
        return req

# Priority levels:
# 1 = User-facing real-time (chat responses)
# 3 = User-facing async (email drafts)
# 5 = Background processing (data enrichment)
# 7 = Batch jobs (evaluation runs)
# 9 = Internal analytics

Pattern 2: Fallback Models

When your primary model is rate-limited or unavailable, automatically fall back to a secondary model. This requires a fallback chain configuration.

from openai import OpenAI
import anthropic

openai_client = OpenAI()
anthropic_client = anthropic.Anthropic()

FALLBACK_CHAIN = [
    {"provider": "openai", "model": "gpt-5.5"},
    {"provider": "anthropic", "model": "claude-sonnet-4-20250514"},
    {"provider": "deepseek", "model": "deepseek-chat"},
    {"provider": "openai", "model": "gpt-4.1-mini"},
]

async def call_with_fallback(prompt: str, max_tokens: int = 1024):
    """Try each model in the fallback chain until one succeeds."""
    errors = []
    
    for config in FALLBACK_CHAIN:
        try:
            if config["provider"] == "openai":
                response = await openai_client.chat.completions.create(
                    model=config["model"],
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=max_tokens,
                )
                return response.choices[0].message.content
            elif config["provider"] == "anthropic":
                response = anthropic_client.messages.create(
                    model=config["model"],
                    max_tokens=max_tokens,
                    messages=[{"role": "user", "content": prompt}],
                )
                return response.content[0].text
            elif config["provider"] == "deepseek":
                response = await openai_client.chat.completions.create(
                    model=config["model"],
                    messages=[{"role": "user", "content": prompt}],
                    max_tokens=max_tokens,
                    base_url="https://api.deepseek.com",
                )
                return response.choices[0].message.content
        except Exception as e:
            errors.append(f"{config['provider']}/{config['model']}: {e}")
            continue
    
    raise Exception(f"All fallbacks failed: {errors}")

Pattern 3: Multi-Account Sharding

For extreme throughput, distribute requests across multiple API keys or accounts. Each key has its own rate limit quota, effectively multiplying your total capacity.

import itertools

class MultiKeyRouter:
    """Distribute requests across multiple API keys."""
    
    def __init__(self, api_keys: list[str]):
        self.clients = [
            {"client": OpenAI(api_key=key), "limiter": TokenBucket(rate=167, capacity=500)}
            for key in api_keys
        ]
        self.key_cycle = itertools.cycle(range(len(self.clients)))
    
    async def call(self, prompt: str, max_tokens: int = 1024):
        """Route to the first available client."""
        # Try each client in round-robin order
        for _ in range(len(self.clients)):
            idx = next(self.key_cycle)
            client_info = self.clients[idx]
            
            if client_info["limiter"].can_proceed():
                try:
                    response = await client_info["client"].chat.completions.create(
                        model="gpt-5.5",
                        messages=[{"role": "user", "content": prompt}],
                        max_tokens=max_tokens,
                    )
                    client_info["limiter"].consume()
                    return response.choices[0].message.content
                except Exception as e:
                    if getattr(e, 'status_code', None) == 429:
                        continue  # Try next key
                    raise
        
        # All keys rate limited: queue and wait
        raise Exception("All API keys rate limited. Consider adding more keys or reducing request volume.")

Rate Limit Monitoring and Alerting

You cannot manage what you do not measure. Production systems need real-time visibility into rate limit consumption and proactive alerting before failures occur.

Key Metrics to Track

  • Rate limit utilization percentage: What fraction of your RPM/TPM quota are you using? Alert at 80%.
  • 429 error rate: What percentage of requests are being throttled? Should be below 1% in a healthy system.
  • Retry success rate: Of the requests that get a 429, what percentage succeed on retry? If this drops, your backoff may be too aggressive.
  • Effective throughput: How many requests per minute actually complete successfully? Compare this to your theoretical limit.
  • P99 latency with retries: End-to-end latency including retry delays. Users should not wait more than 10 seconds.

Monitoring Implementation

import time
from collections import defaultdict
from dataclasses import dataclass

@dataclass
class RateLimitMetrics:
    total_requests: int = 0
    rate_limited_429: int = 0
    server_errors_5xx: int = 0
    context_errors: int = 0
    successful: int = 0
    retry_successes: int = 0
    total_retry_delay_seconds: float = 0.0
    
    def record_request(self, status: str, retry_delay: float = 0):
        self.total_requests += 1
        if status == "success":
            self.successful += 1
        elif status == "429":
            self.rate_limited_429 += 1
        elif status in ("500", "503"):
            self.server_errors_5xx += 1
        elif status == "context_exceeded":
            self.context_errors += 1
        elif status == "retry_success":
            self.retry_successes += 1
            self.successful += 1
        self.total_retry_delay_seconds += retry_delay
    
    def summary(self) -> dict:
        return {
            "total_requests": self.total_requests,
            "success_rate": self.successful / max(self.total_requests, 1),
            "rate_limit_rate": self.rate_limited_429 / max(self.total_requests, 1),
            "avg_retry_delay": self.total_retry_delay_seconds / max(self.rate_limited_429, 1),
            "effective_rps": self.successful / max(self.total_requests, 1),
        }

# Global metrics collector
metrics = defaultdict(RateLimitMetrics)

def get_metrics(provider: str) -> RateLimitMetrics:
    return metrics[provider]

Alert Thresholds

Set up alerts at these thresholds to catch problems before users notice:

MetricWarningCritical
Rate limit utilization> 70%> 90%
429 error rate> 2%> 10%
P99 latency> 5s> 15s
5xx error rate> 1%> 5%
Circuit breaker opensAny> 3 in 1 hour

Best Practices for High-Throughput AI Applications

These practices come from operating AI systems at scale in 2026. Follow them and your application will be resilient against rate limits, outages, and traffic spikes.

1. Always Client-Side Throttle

Never rely solely on the provider's 429 responses to control your request rate. Implement client-side rate limiting using a token bucket that matches your quota. This prevents unnecessary 429 errors, reduces latency, and avoids getting your account flagged for excessive retries.

2. Batch Wherever Possible

Both OpenAI and Anthropic offer batch APIs at 50% discount. Any workload that does not need real-time responses should use batch processing. This reduces your effective RPM by orders of magnitude and dramatically cuts costs.

3. Use Prompt Caching Consistently

Prompt caching reduces both cost and rate limit consumption. Cached tokens count differently toward your TPM limit on most providers. A well-cached request can use 90% fewer input tokens, which means 90% less TPM consumption.

4. Implement Graceful Degradation

When rate limits are hit, degrade gracefully rather than failing. Show a "generating response..." message with a progress indicator. Queue the request and deliver the result when capacity is available. Never show a raw error to the user.

5. Pre-Size Your Context

Count tokens before sending requests. If you detect that a request will exceed the context window, chunk the input or switch to a larger-context model proactively—before getting a 400 error. This saves a round-trip and avoids wasting rate limit quota on guaranteed failures.

import tiktoken

def safe_request(
    prompt: str,
    system: str,
    model: str = "gpt-5.5",
    max_output_tokens: int = 1024,
):
    """Check context length before sending the request."""
    encoder = tiktoken.encoding_for_model(model)
    
    input_tokens = len(encoder.encode(system + prompt))
    model_limits = {
        "gpt-5.5": 256_000,
        "claude-opus-4-7": 1_000_000,
        "deepseek-chat": 1_000_000,
        "gemini-2.5-pro": 1_000_000,
    }
    
    limit = model_limits.get(model, 128_000)
    if input_tokens + max_output_tokens > limit:
        # Auto-downgrade to a larger context model
        if model == "gpt-5.5":
            print(f"Context too long ({input_tokens} tokens). "
                  f"Switching to deepseek-chat for 1M context.")
            return "deepseek-chat"
        raise ValueError(
            f"Input ({input_tokens} tokens) + output ({max_output_tokens}) "
            f"exceeds {model} limit ({limit} tokens). Chunk your input."
        )
    
    return model

6. Separate Rate Limits Per Model

Different models have different rate limits. Track and throttle each model independently. A burst of Claude Opus requests should not block your Claude Haiku traffic—they have separate quotas.

7. Use Semaphores for Concurrency Control

In async Python, use asyncio.Semaphore to cap concurrent API calls at or below the provider's concurrent request limit. This prevents accidental overloading.

# Concurrency control matching Anthropic's limits
anthropic_semaphore = asyncio.Semaphore(50)  # Claude Opus: 50 concurrent

async def call_claude_opus(prompt: str):
    async with anthropic_semaphore:
        return await call_with_retry(
            lambda: anthropic_client.messages.create(
                model="claude-opus-4-7",
                max_tokens=4096,
                messages=[{"role": "user", "content": prompt}],
            )
        )

8. Log Everything, Alert Smartly

Log every API call with its status, latency, and rate limit headers. But do not alert on every 429—that is normal. Alert on trends: rising 429 rates, increasing P99 latency, or circuit breaker opens. Use a time window (5 minutes) and alert on aggregates, not individual events.

Putting It All Together: A Production-Ready AI Client

Here is a complete, production-ready async AI client that combines all the patterns in this guide: rate limiting, retries with backoff and jitter, circuit breakers, fallback models, and metrics collection.

import asyncio
import random
import time
from typing import Any, Optional

class ProductionAIClient:
    def __init__(
        self,
        rate: float = 167,
        burst: int = 500,
        max_retries: int = 5,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.limiter = TokenBucket(rate=rate, capacity=burst)
        self.circuit = CircuitBreaker(
            failure_threshold=failure_threshold,
            recovery_timeout=recovery_timeout,
        )
        self.max_retries = max_retries
        self.metrics = RateLimitMetrics()
    
    async def call(self, func, priority: int = 5) -> Any:
        """Production-grade API call with all safeguards."""
        # 1. Check circuit breaker
        if not self.circuit.can_execute():
            raise Exception("Circuit breaker is OPEN")
        
        # 2. Wait for rate limiter
        while not self.limiter.consume():
            await asyncio.sleep(self.limiter.wait_time())
        
        # 3. Call with retry
        for attempt in range(self.max_retries + 1):
            try:
                result = await func()
                self.circuit.record_success()
                self.metrics.record_request("success")
                return result
            except Exception as e:
                code = getattr(e, 'status_code', None)
                
                if code in (400, 401, 403):
                    self.metrics.record_request("auth_error")
                    raise
                
                if attempt == self.max_retries:
                    self.circuit.record_failure()
                    self.metrics.record_request(str(code) or "unknown")
                    raise
                
                delay = min(1.0 * (2 ** attempt), 60.0)
                jitter = delay * 0.25 * (2 * random.random() - 1)
                delay = max(0.1, delay + jitter)
                
                self.metrics.record_request(str(code), retry_delay=delay)
                await asyncio.sleep(delay)

# Initialize for different providers
openai_client = ProductionAIClient(rate=167, burst=500)     # Tier 3
anthropic_client = ProductionAIClient(rate=67, burst=200)   # Sonnet
deepseek_client = ProductionAIClient(rate=33, burst=100)    # Basic

Conclusion

Rate limits and error handling are not edge cases—they are the difference between an AI prototype and a production system. Every AI API will rate limit you. Every provider will have outages. The systems that survive are the ones designed for failure from the start.

Start with exponential backoff and jitter on every API call. Add client-side rate limiting with token buckets. Layer on circuit breakers for provider outages. Implement fallback models for redundancy. And always, always track your metrics.

The code in this guide is battle-tested in production at scale. Adapt it to your needs, but do not skip the fundamentals. Your users will never notice good error handling—but they will absolutely notice the lack of it.

Last updated: 2026-05-10. Rate limits reflect current provider documentation as of May 2026. Always check the provider's official documentation for the most current limits.

Related Articles