AI Batch Processing & Async API Guide 2026
Process millions of LLM requests at half the cost. OpenAI Batch API, Anthropic Message Batches, Google Gemini Batch, and production patterns for async AI workflows.
Not every LLM request needs an instant response. Classifying a dataset of 100,000 support tickets, embedding a document corpus, or running evaluation benchmarks — these tasks can wait hours. Batch APIs let you queue these requests and process them asynchronously, at 50% lower cost with much higher rate limits. If your application has any non-interactive AI workload, batch processing is the single easiest way to cut your API bill in half.
When to Use Batch Processing
Batch APIs are ideal when you don't need an immediate response:
| Use Case | Latency Tolerance | Volume | Savings vs Real-time |
|---|---|---|---|
| Dataset classification | Hours | 10K-1M items | 50% |
| Embedding generation | Hours | 100K-10M docs | 50% |
| Evaluation benchmarks | Hours | 1K-100K prompts | 50% |
| Content generation (drafts) | Hours | 1K-100K items | 50% |
| Translation | Hours | 10K-1M segments | 50% |
| Data extraction | Hours | 10K-1M records | 50% |
Rule of thumb: if the result doesn't need to appear on screen while the user waits, use batch. The 50% cost savings are too significant to ignore at scale.
OpenAI Batch API
OpenAI's Batch API processes requests asynchronously within 24 hours (often much faster) at 50% lower cost with much higher rate limits.
Step 1: Prepare Your Batch File
Create a JSONL file where each line is an individual API request:
// batch_input.jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-5", "messages": [{"role": "user", "content": "Classify: 'The battery died after 2 months'"}], "temperature": 0}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-5", "messages": [{"role": "user", "content": "Classify: 'Love the new design!'"}], "temperature": 0}}
{"custom_id": "request-3", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-3-small", "input": "Document text here"}}
Key rules:
- Each request needs a unique
custom_idfor result matching - All requests in one file must use the same model
- Maximum 50,000 requests per batch file
- Maximum file size: 200MB
Step 2: Upload and Create the Batch
from openai import OpenAI
client = OpenAI()
# Upload the batch input file
batch_input_file = client.files.create(
file=open("batch_input.jsonl", "rb"),
purpose="batch"
)
# Create the batch job
batch = client.batches.create(
input_file_id=batch_input_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"description": "Customer feedback classification"
}
)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.status}") # "validating" → "in_progress" → "completed"
Step 3: Monitor and Retrieve Results
import time
# Poll for completion
while True:
batch = client.batches.retrieve(batch.id)
print(f"Status: {batch.status}, "
f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
if batch.status == "completed":
break
elif batch.status == "failed":
print(f"Batch failed: {batch.errors}")
break
time.sleep(60) # Check every minute
# Download results
result_file_id = batch.output_file_id
content = client.files.content(result_file_id).content
# Parse results
import json
results = {}
for line in content.decode('utf-8').split('\n'):
if not line.strip():
continue
result = json.loads(line)
custom_id = result['custom_id']
response_body = result['response']['body']
content = response_body['choices'][0]['message']['content']
results[custom_id] = content
# Match results to original requests
print(results["request-1"]) # "Product quality issue"
Supported Endpoints
OpenAI Batch API supports these endpoints in 2026:
/v1/responses— Responses API/v1/chat/completions— Chat Completions/v1/embeddings— Embeddings/v1/completions— Legacy Completions/v1/moderations— Content Moderation/v1/images/generations— Image Generation/v1/images/edits— Image Editing/v1/videos— Video Generation
Cost Comparison
| Model | Real-time Input/1M | Batch Input/1M | Savings |
|---|---|---|---|
| GPT-5.5 | $5.00 / $30.00 | $2.50 / $15.00 | 50% |
| GPT-5.4 | $2.50 / $15.00 | $1.25 / $7.50 | 50% |
| GPT-5.4 mini | $0.75 / $4.50 | $0.375 / $2.25 | 50% |
| text-embedding-3-large | $0.13 | $0.065 | 50% |
Anthropic Message Batches
Anthropic's Message Batches API follows a similar pattern:
import anthropic
client = anthropic.Anthropic()
# Create a message batch
batch = client.messages.batches.create(
requests=[
{
"custom_id": "req-1",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Summarize: Machine learning is..."}
]
}
},
{
"custom_id": "req-2",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Summarize: Cloud computing..."}
]
}
}
]
)
print(f"Batch ID: {batch.id}")
# Check status
batch = client.messages.batches.retrieve(batch.id)
print(f"Status: {batch.processing_status}")
# When complete, retrieve individual results
for result in client.messages.batches.results(batch.id):
print(f"ID: {result.custom_id}")
if result.result.type == "succeeded":
print(result.result.message.content[0].text)
elif result.result.type == "errored":
print(f"Error: {result.result.error}")
Google Gemini Batch API
Google's approach uses the same Batch endpoint pattern with Vertex AI:
from google.cloud import aiplatform
aiplatform.init(project="your-project", location="us-central1")
# Submit a batch prediction job
job = aiplatform.BatchPredictionJob.submit(
source_model="gemini-2.5-pro",
input_dataset="gs://your-bucket/input.jsonl",
output_uri_prefix="gs://your-bucket/output/",
predictions_format="jsonl",
)
# Monitor
job.wait()
print(f"Job state: {job.state}")
# Results are written to the output GCS path
Batch API Comparison
| Feature | OpenAI | Anthropic | |
|---|---|---|---|
| Cost discount | 50% | 50% | Varies |
| Turnaround time | <24h (often <1h) | <24h | Varies by volume |
| Max requests/batch | 50,000 | 10,000 | Varies |
| Rate limits | Much higher than real-time | Higher | Higher |
| Image/video support | Yes | No | Yes (Vertex) |
| Embedding support | Yes | N/A | Yes |
| Moderation support | Yes | No | No |
Production Patterns
1. Scheduled Daily Batch Pipeline
# Daily batch processing for content classification
import schedule
import json
class BatchProcessor:
def __init__(self, client, model="gpt-5.4-mini"):
self.client = client
self.model = model
def prepare_batch(self, items):
"""Convert items to JSONL batch format."""
requests = []
for i, item in enumerate(items):
requests.append({
"custom_id": f"item-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": self.model,
"messages": [
{"role": "system", "content": "Classify into: bug, feature, question."},
{"role": "user", "content": item["text"]}
],
"temperature": 0
}
})
return requests
def submit_batch(self, items):
"""Submit a batch job and return batch ID."""
requests = self.prepare_batch(items)
# Write to temp file
with open("temp_batch.jsonl", "w") as f:
for req in requests:
f.write(json.dumps(req) + "\n")
# Upload and create batch
batch_file = self.client.files.create(
file=open("temp_batch.jsonl", "rb"),
purpose="batch"
)
batch = self.client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
return batch.id
def get_results(self, batch_id):
"""Retrieve completed batch results."""
batch = self.client.batches.retrieve(batch_id)
if batch.status != "completed":
return None
content = self.client.files.content(batch.output_file_id).content
results = {}
for line in content.decode('utf-8').split('\n'):
if not line.strip():
continue
result = json.loads(line)
results[result['custom_id']] = result['response']['body']['choices'][0]['message']['content']
return results
# Schedule daily at 2 AM
processor = BatchProcessor(OpenAI())
schedule.every().day.at("02:00").do(run_daily_batch)
2. Hybrid: Real-time with Batch Fallback
Use real-time when you need speed, fall back to batch when you hit rate limits:
class HybridProcessor:
def __init__(self, client, model, rate_limiter):
self.client = client
self.model = model
self.rate_limiter = rate_limiter
self.batch_queue = []
async def process(self, items):
"""Process items with real-time first, batch for overflow."""
real_time_results = {}
batch_items = []
for item in items:
if self.rate_limiter.allow():
# Process in real-time
response = await self.real_time_call(item)
real_time_results[item["id"]] = response
else:
# Queue for batch
batch_items.append(item)
# Submit batch for remaining items
if batch_items:
batch_id = self.submit_batch(batch_items)
# Results available later via get_results(batch_id)
return real_time_results, batch_items
3. Batch with Error Handling and Retry
def process_batch_safely(client, input_file_path, max_retries=3):
"""Submit batch with error handling and retry logic."""
for attempt in range(max_retries):
try:
# Upload input file
batch_file = client.files.create(
file=open(input_file_path, "rb"),
purpose="batch"
)
# Create batch
batch = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
# Wait for completion
while True:
batch = client.batches.retrieve(batch.id)
if batch.status == "completed":
# Check for individual failures
if batch.request_counts.failed > 0:
error_content = client.files.content(batch.error_file_id).content
errors = parse_errors(error_content)
print(f"{len(errors)} requests failed in batch")
return batch
elif batch.status == "failed":
print(f"Batch failed on attempt {attempt + 1}")
break
elif batch.status == "expired":
print(f"Batch expired on attempt {attempt + 1}")
break
time.sleep(60)
except Exception as e:
print(f"Attempt {attempt + 1} error: {e}")
time.sleep(10 * (attempt + 1)) # Exponential backoff
raise RuntimeError(f"Batch failed after {max_retries} attempts")
Common Mistakes
- Using real-time for batch workloads — If you're processing 10K+ items and don't need results instantly, you're overpaying by 2x
- Not matching custom_ids — Batch results come back in a different order. Always use
custom_idto match inputs to outputs - Mixing models in one batch — Each batch file can only contain requests for one model
- Not handling partial failures — A batch can complete with some requests failing. Check the
failedcount inrequest_counts - Exceeding file size limits — 200MB max per batch file. Split large workloads across multiple batches
- Polling too aggressively — Check status every 30-60 seconds, not every second. Batch jobs take at least minutes
- Not cleaning up files — Uploaded batch files and results persist. Delete them after processing to avoid storage charges
Conclusion
Batch processing is the easiest 50% cost reduction available for any AI application with non-interactive workloads. The pattern is simple: prepare a JSONL file, submit, wait, and retrieve results. OpenAI, Anthropic, and Google all offer batch APIs with the same 50% discount. The main tradeoff is latency — you wait hours instead of seconds — but for classification, embedding, evaluation, and data extraction tasks, that's almost never a problem.
If your application processes more than 10,000 LLM requests per day in a non-interactive context, switching to batch will save you thousands of dollars per month with minimal code changes.
Related Guides: AI Cost Optimization · API Rate Limits & Error Handling · Prompt Caching Guide