Building a demo agent is easy. Building an agent system that handles millions of requests reliably, fails gracefully, costs predictably, and can be debugged when things go wrong โ that's an engineering challenge. This guide covers what separates production systems from demos.
The 5 Pillars of Production Agent Systems
Pillar 1: Reliability & Error Handling
Agents fail in unexpected ways โ tools return errors, the LLM loops infinitely, external APIs go down. Production systems need robust error handling at every level.
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionAgent:
def __init__(self, llm, tools, max_iterations=20):
self.llm = llm
self.tools = tools
self.max_iterations = max_iterations
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30),
reraise=True
)
async def call_llm_with_retry(self, messages: list) -> str:
"""LLM calls with exponential backoff retry"""
try:
response = await self.llm.ainvoke(messages)
return response
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise
async def execute_tool_safely(self, tool_name: str, args: dict) -> str:
"""Execute tool with timeout and error handling"""
if tool_name not in self.tools:
return f"Error: Unknown tool '{tool_name}'"
try:
# Timeout after 30 seconds per tool call
result = await asyncio.wait_for(
self.tools[tool_name](**args),
timeout=30.0
)
return str(result)
except asyncio.TimeoutError:
logger.warning(f"Tool {tool_name} timed out")
return f"Error: Tool '{tool_name}' timed out after 30s"
except Exception as e:
logger.error(f"Tool {tool_name} failed: {e}")
return f"Error: Tool '{tool_name}' failed: {str(e)[:200]}"
async def run(self, goal: str, run_id: str) -> dict:
"""Main agent loop with iteration guard and structured output"""
messages = [{"role": "user", "content": goal}]
tool_calls_made = []
for iteration in range(self.max_iterations):
logger.info(f"[{run_id}] Iteration {iteration+1}/{self.max_iterations}")
response = await self.call_llm_with_retry(messages)
if response.stop_reason == "end_turn":
return {
"status": "success",
"result": response.content[-1].text,
"iterations": iteration + 1,
"tool_calls": tool_calls_made
}
# Handle tool calls
for block in response.content:
if block.type == "tool_use":
result = await self.execute_tool_safely(block.name, block.input)
tool_calls_made.append({
"tool": block.name,
"args": block.input,
"result_length": len(result)
})
return {
"status": "max_iterations_reached",
"result": None,
"iterations": self.max_iterations,
"tool_calls": tool_calls_made
}
Pillar 2: Observability & Tracing
When an agent produces a wrong answer or takes 47 LLM calls instead of 5, you need full observability to debug it. Use Langfuse (open-source) or LangSmith.
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from functools import wraps
import time
langfuse = Langfuse(
public_key="pk-...",
secret_key="sk-...",
host="https://cloud.langfuse.com"
)
@observe() # Automatically traces this function
async def run_agent_with_tracing(goal: str, user_id: str):
# Add custom metadata to the trace
langfuse_context.update_current_trace(
name="customer-support-agent",
user_id=user_id,
metadata={"goal_length": len(goal)}
)
agent = ProductionAgent(llm, tools)
result = await agent.run(goal, run_id=langfuse_context.get_current_trace_id())
# Log evaluation score
langfuse_context.score_current_trace(
name="task_completed",
value=1 if result["status"] == "success" else 0
)
return result
# Track costs
def track_token_usage(model: str, input_tokens: int, output_tokens: int):
"""Log costs per run for budget monitoring"""
costs = {
"claude-opus-4-6": (0.015, 0.075), # per 1K input/output tokens
"claude-sonnet-4-6": (0.003, 0.015),
"claude-haiku-4-5-20251001": (0.00025, 0.00125)
}
in_cost, out_cost = costs.get(model, (0, 0))
total = (input_tokens / 1000 * in_cost) + (output_tokens / 1000 * out_cost)
langfuse.event(name="token_cost", metadata={"cost_usd": total, "model": model})
Pillar 3: Caching & Cost Optimization
import hashlib
import json
from redis import Redis
from functools import wraps
redis = Redis(host="localhost", port=6379, decode_responses=True)
def cache_tool_result(ttl_seconds: int = 3600):
"""Cache tool results in Redis to avoid redundant calls"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Create cache key from function name + args
cache_key = f"tool:{func.__name__}:{hashlib.md5(
json.dumps({**kwargs}, sort_keys=True).encode()
).hexdigest()}"
# Try cache first
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
# Execute and cache
result = await func(*args, **kwargs)
redis.setex(cache_key, ttl_seconds, json.dumps(result))
return result
return wrapper
return decorator
@cache_tool_result(ttl_seconds=1800) # Cache for 30 minutes
async def web_search(query: str) -> str:
"""Web search - results cached to avoid duplicate API calls"""
# ... actual search implementation
# Use Claude's prompt caching for repeated system prompts
def create_cached_system_message(long_system_prompt: str) -> list:
"""Mark system prompt as cacheable (saves 90% on repeated calls)"""
return [
{
"type": "text",
"text": long_system_prompt,
"cache_control": {"type": "ephemeral"} # Anthropic cache
}
]
Pillar 4: Human-in-the-Loop & Approval Gates
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver
# Use PostgreSQL for persistent checkpoints (survives restarts)
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
# Interrupt the graph before sensitive actions
workflow.add_node("approval_gate", approval_gate_node)
workflow.compile(
checkpointer=checkpointer,
interrupt_before=["execute_payment", "send_email", "delete_file"]
)
async def run_with_approval(task: str, thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
# Run until interruption
result = await app.ainvoke({"task": task}, config=config)
# Check if waiting for approval
state = await app.aget_state(config)
if state.next: # Graph is interrupted
pending_action = state.values.get("pending_action")
print(f"โธ Approval needed: {pending_action}")
# In production: send Slack notification, save to DB, etc.
approval = await notify_human_and_wait(pending_action)
if approval:
# Resume the graph
await app.ainvoke(None, config=config)
else:
print("โ Action rejected by user")
Pillar 5: Deployment Architecture
# docker-compose.yml for agent system
version: '3.8'
services:
agent-api:
build: .
ports: ["8000:8000"]
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://postgres:password@db:5432/agents
depends_on: [redis, db]
deploy:
replicas: 3 # Horizontal scaling
# Background task queue for long-running agents
celery-worker:
build: .
command: celery -A app.tasks worker --concurrency=10
environment:
- CELERY_BROKER_URL=redis://redis:6379
redis:
image: redis:7-alpine
volumes: ["redis_data:/data"]
db:
image: postgres:16
environment:
POSTGRES_DB: agents
POSTGRES_PASSWORD: password
# FastAPI server with async agent execution
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/agent/run")
async def run_agent(request: AgentRequest, background_tasks: BackgroundTasks):
"""Non-blocking agent execution with callback URL"""
run_id = str(uuid.uuid4())
background_tasks.add_task(
execute_agent_and_callback,
goal=request.goal,
run_id=run_id,
callback_url=request.callback_url
)
return {"run_id": run_id, "status": "queued"}
@app.get("/agent/stream/{run_id}")
async def stream_agent_output(run_id: str):
"""Server-sent events for real-time agent output"""
async def generate():
async for token in get_agent_stream(run_id):
yield f"data: {json.dumps({'token': token})}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Production Checklist
- โ Retry logic with exponential backoff on all LLM and tool calls
- โ Hard iteration cap (never infinite loops)
- โ Tool execution timeouts (30s per tool, 5min per agent run)
- โ Full distributed tracing (Langfuse/LangSmith)
- โ Cost tracking per run, per user, per day
- โ Redis caching for tool results and prompt caching for system prompts
- โ Human approval gates for irreversible actions
- โ Async execution with task queue (don't block HTTP threads)
- โ Horizontal scaling with stateless agent workers
- โ Input validation and output sanitization
- โ Rate limiting per user/organization
- โ Audit log of all agent actions
The production mindset shift: Your demo agent needs to work once. Your production agent needs to work 99.9% of the time, handle edge cases gracefully, stay within budget, and give you enough visibility to debug the 0.1% of failures โ all while handling concurrent users.