Tutorial May 13, 2026

AI Batch Processing & Async API Guide 2026

Process millions of LLM requests at half the cost. OpenAI Batch API, Anthropic Message Batches, Google Gemini Batch, and production patterns for async AI workflows.

Not every LLM request needs an instant response. Classifying a dataset of 100,000 support tickets, embedding a document corpus, or running evaluation benchmarks — these tasks can wait hours. Batch APIs let you queue these requests and process them asynchronously, at 50% lower cost with much higher rate limits. If your application has any non-interactive AI workload, batch processing is the single easiest way to cut your API bill in half.

When to Use Batch Processing

Batch APIs are ideal when you don't need an immediate response:

Use CaseLatency ToleranceVolumeSavings vs Real-time
Dataset classificationHours10K-1M items50%
Embedding generationHours100K-10M docs50%
Evaluation benchmarksHours1K-100K prompts50%
Content generation (drafts)Hours1K-100K items50%
TranslationHours10K-1M segments50%
Data extractionHours10K-1M records50%
Rule of thumb: if the result doesn't need to appear on screen while the user waits, use batch. The 50% cost savings are too significant to ignore at scale.

OpenAI Batch API

OpenAI's Batch API processes requests asynchronously within 24 hours (often much faster) at 50% lower cost with much higher rate limits.

Step 1: Prepare Your Batch File

Create a JSONL file where each line is an individual API request:

// batch_input.jsonl
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-5", "messages": [{"role": "user", "content": "Classify: 'The battery died after 2 months'"}], "temperature": 0}}
{"custom_id": "request-2", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-5", "messages": [{"role": "user", "content": "Classify: 'Love the new design!'"}], "temperature": 0}}
{"custom_id": "request-3", "method": "POST", "url": "/v1/embeddings", "body": {"model": "text-embedding-3-small", "input": "Document text here"}}

Key rules:

  • Each request needs a unique custom_id for result matching
  • All requests in one file must use the same model
  • Maximum 50,000 requests per batch file
  • Maximum file size: 200MB

Step 2: Upload and Create the Batch

from openai import OpenAI

client = OpenAI()

# Upload the batch input file
batch_input_file = client.files.create(
    file=open("batch_input.jsonl", "rb"),
    purpose="batch"
)

# Create the batch job
batch = client.batches.create(
    input_file_id=batch_input_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
    metadata={
        "description": "Customer feedback classification"
    }
)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.status}")  # "validating" → "in_progress" → "completed"

Step 3: Monitor and Retrieve Results

import time

# Poll for completion
while True:
    batch = client.batches.retrieve(batch.id)
    print(f"Status: {batch.status}, "
          f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
    
    if batch.status == "completed":
        break
    elif batch.status == "failed":
        print(f"Batch failed: {batch.errors}")
        break
    
    time.sleep(60)  # Check every minute

# Download results
result_file_id = batch.output_file_id
content = client.files.content(result_file_id).content

# Parse results
import json
results = {}
for line in content.decode('utf-8').split('\n'):
    if not line.strip():
        continue
    result = json.loads(line)
    custom_id = result['custom_id']
    response_body = result['response']['body']
    content = response_body['choices'][0]['message']['content']
    results[custom_id] = content

# Match results to original requests
print(results["request-1"])  # "Product quality issue"

Supported Endpoints

OpenAI Batch API supports these endpoints in 2026:

  • /v1/responses — Responses API
  • /v1/chat/completions — Chat Completions
  • /v1/embeddings — Embeddings
  • /v1/completions — Legacy Completions
  • /v1/moderations — Content Moderation
  • /v1/images/generations — Image Generation
  • /v1/images/edits — Image Editing
  • /v1/videos — Video Generation

Cost Comparison

ModelReal-time Input/1MBatch Input/1MSavings
GPT-5.5$5.00 / $30.00$2.50 / $15.0050%
GPT-5.4$2.50 / $15.00$1.25 / $7.5050%
GPT-5.4 mini$0.75 / $4.50$0.375 / $2.2550%
text-embedding-3-large$0.13$0.06550%

Anthropic Message Batches

Anthropic's Message Batches API follows a similar pattern:

import anthropic

client = anthropic.Anthropic()

# Create a message batch
batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": "req-1",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "Summarize: Machine learning is..."}
                ]
            }
        },
        {
            "custom_id": "req-2",
            "params": {
                "model": "claude-sonnet-4-20250514",
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "Summarize: Cloud computing..."}
                ]
            }
        }
    ]
)

print(f"Batch ID: {batch.id}")

# Check status
batch = client.messages.batches.retrieve(batch.id)
print(f"Status: {batch.processing_status}")

# When complete, retrieve individual results
for result in client.messages.batches.results(batch.id):
    print(f"ID: {result.custom_id}")
    if result.result.type == "succeeded":
        print(result.result.message.content[0].text)
    elif result.result.type == "errored":
        print(f"Error: {result.result.error}")

Google Gemini Batch API

Google's approach uses the same Batch endpoint pattern with Vertex AI:

from google.cloud import aiplatform

aiplatform.init(project="your-project", location="us-central1")

# Submit a batch prediction job
job = aiplatform.BatchPredictionJob.submit(
    source_model="gemini-2.5-pro",
    input_dataset="gs://your-bucket/input.jsonl",
    output_uri_prefix="gs://your-bucket/output/",
    predictions_format="jsonl",
)

# Monitor
job.wait()
print(f"Job state: {job.state}")

# Results are written to the output GCS path

Batch API Comparison

Feature OpenAI Anthropic Google
Cost discount 50% 50% Varies
Turnaround time <24h (often <1h) <24h Varies by volume
Max requests/batch 50,000 10,000 Varies
Rate limits Much higher than real-time Higher Higher
Image/video support Yes No Yes (Vertex)
Embedding support Yes N/A Yes
Moderation support Yes No No

Production Patterns

1. Scheduled Daily Batch Pipeline

# Daily batch processing for content classification
import schedule
import json

class BatchProcessor:
    def __init__(self, client, model="gpt-5.4-mini"):
        self.client = client
        self.model = model
    
    def prepare_batch(self, items):
        """Convert items to JSONL batch format."""
        requests = []
        for i, item in enumerate(items):
            requests.append({
                "custom_id": f"item-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": self.model,
                    "messages": [
                        {"role": "system", "content": "Classify into: bug, feature, question."},
                        {"role": "user", "content": item["text"]}
                    ],
                    "temperature": 0
                }
            })
        return requests
    
    def submit_batch(self, items):
        """Submit a batch job and return batch ID."""
        requests = self.prepare_batch(items)
        
        # Write to temp file
        with open("temp_batch.jsonl", "w") as f:
            for req in requests:
                f.write(json.dumps(req) + "\n")
        
        # Upload and create batch
        batch_file = self.client.files.create(
            file=open("temp_batch.jsonl", "rb"),
            purpose="batch"
        )
        
        batch = self.client.batches.create(
            input_file_id=batch_file.id,
            endpoint="/v1/chat/completions",
            completion_window="24h"
        )
        
        return batch.id
    
    def get_results(self, batch_id):
        """Retrieve completed batch results."""
        batch = self.client.batches.retrieve(batch_id)
        
        if batch.status != "completed":
            return None
        
        content = self.client.files.content(batch.output_file_id).content
        results = {}
        for line in content.decode('utf-8').split('\n'):
            if not line.strip():
                continue
            result = json.loads(line)
            results[result['custom_id']] = result['response']['body']['choices'][0]['message']['content']
        
        return results

# Schedule daily at 2 AM
processor = BatchProcessor(OpenAI())
schedule.every().day.at("02:00").do(run_daily_batch)

2. Hybrid: Real-time with Batch Fallback

Use real-time when you need speed, fall back to batch when you hit rate limits:

class HybridProcessor:
    def __init__(self, client, model, rate_limiter):
        self.client = client
        self.model = model
        self.rate_limiter = rate_limiter
        self.batch_queue = []
    
    async def process(self, items):
        """Process items with real-time first, batch for overflow."""
        real_time_results = {}
        batch_items = []
        
        for item in items:
            if self.rate_limiter.allow():
                # Process in real-time
                response = await self.real_time_call(item)
                real_time_results[item["id"]] = response
            else:
                # Queue for batch
                batch_items.append(item)
        
        # Submit batch for remaining items
        if batch_items:
            batch_id = self.submit_batch(batch_items)
            # Results available later via get_results(batch_id)
        
        return real_time_results, batch_items

3. Batch with Error Handling and Retry

def process_batch_safely(client, input_file_path, max_retries=3):
    """Submit batch with error handling and retry logic."""
    
    for attempt in range(max_retries):
        try:
            # Upload input file
            batch_file = client.files.create(
                file=open(input_file_path, "rb"),
                purpose="batch"
            )
            
            # Create batch
            batch = client.batches.create(
                input_file_id=batch_file.id,
                endpoint="/v1/chat/completions",
                completion_window="24h"
            )
            
            # Wait for completion
            while True:
                batch = client.batches.retrieve(batch.id)
                
                if batch.status == "completed":
                    # Check for individual failures
                    if batch.request_counts.failed > 0:
                        error_content = client.files.content(batch.error_file_id).content
                        errors = parse_errors(error_content)
                        print(f"{len(errors)} requests failed in batch")
                    
                    return batch
                
                elif batch.status == "failed":
                    print(f"Batch failed on attempt {attempt + 1}")
                    break
                
                elif batch.status == "expired":
                    print(f"Batch expired on attempt {attempt + 1}")
                    break
                
                time.sleep(60)
            
        except Exception as e:
            print(f"Attempt {attempt + 1} error: {e}")
            time.sleep(10 * (attempt + 1))  # Exponential backoff
    
    raise RuntimeError(f"Batch failed after {max_retries} attempts")

Common Mistakes

  1. Using real-time for batch workloads — If you're processing 10K+ items and don't need results instantly, you're overpaying by 2x
  2. Not matching custom_ids — Batch results come back in a different order. Always use custom_id to match inputs to outputs
  3. Mixing models in one batch — Each batch file can only contain requests for one model
  4. Not handling partial failures — A batch can complete with some requests failing. Check the failed count in request_counts
  5. Exceeding file size limits — 200MB max per batch file. Split large workloads across multiple batches
  6. Polling too aggressively — Check status every 30-60 seconds, not every second. Batch jobs take at least minutes
  7. Not cleaning up files — Uploaded batch files and results persist. Delete them after processing to avoid storage charges

Conclusion

Batch processing is the easiest 50% cost reduction available for any AI application with non-interactive workloads. The pattern is simple: prepare a JSONL file, submit, wait, and retrieve results. OpenAI, Anthropic, and Google all offer batch APIs with the same 50% discount. The main tradeoff is latency — you wait hours instead of seconds — but for classification, embedding, evaluation, and data extraction tasks, that's almost never a problem.

If your application processes more than 10,000 LLM requests per day in a non-interactive context, switching to batch will save you thousands of dollars per month with minimal code changes.