๐Ÿ”ด Advanced

Production Agentic Systems:
Architecture, Reliability & Observability

๐Ÿค– AI Agentsโฑ 20 min read๐Ÿ—“ May 2026

Building a demo agent is easy. Building an agent system that handles millions of requests reliably, fails gracefully, costs predictably, and can be debugged when things go wrong โ€” that's an engineering challenge. This guide covers what separates production systems from demos.

The 5 Pillars of Production Agent Systems

Pillar 1: Reliability & Error Handling

Agents fail in unexpected ways โ€” tools return errors, the LLM loops infinitely, external APIs go down. Production systems need robust error handling at every level.

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class ProductionAgent:
    def __init__(self, llm, tools, max_iterations=20):
        self.llm = llm
        self.tools = tools
        self.max_iterations = max_iterations

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=30),
        reraise=True
    )
    async def call_llm_with_retry(self, messages: list) -> str:
        """LLM calls with exponential backoff retry"""
        try:
            response = await self.llm.ainvoke(messages)
            return response
        except Exception as e:
            logger.error(f"LLM call failed: {e}")
            raise

    async def execute_tool_safely(self, tool_name: str, args: dict) -> str:
        """Execute tool with timeout and error handling"""
        if tool_name not in self.tools:
            return f"Error: Unknown tool '{tool_name}'"

        try:
            # Timeout after 30 seconds per tool call
            result = await asyncio.wait_for(
                self.tools[tool_name](**args),
                timeout=30.0
            )
            return str(result)
        except asyncio.TimeoutError:
            logger.warning(f"Tool {tool_name} timed out")
            return f"Error: Tool '{tool_name}' timed out after 30s"
        except Exception as e:
            logger.error(f"Tool {tool_name} failed: {e}")
            return f"Error: Tool '{tool_name}' failed: {str(e)[:200]}"

    async def run(self, goal: str, run_id: str) -> dict:
        """Main agent loop with iteration guard and structured output"""
        messages = [{"role": "user", "content": goal}]
        tool_calls_made = []

        for iteration in range(self.max_iterations):
            logger.info(f"[{run_id}] Iteration {iteration+1}/{self.max_iterations}")

            response = await self.call_llm_with_retry(messages)

            if response.stop_reason == "end_turn":
                return {
                    "status": "success",
                    "result": response.content[-1].text,
                    "iterations": iteration + 1,
                    "tool_calls": tool_calls_made
                }

            # Handle tool calls
            for block in response.content:
                if block.type == "tool_use":
                    result = await self.execute_tool_safely(block.name, block.input)
                    tool_calls_made.append({
                        "tool": block.name,
                        "args": block.input,
                        "result_length": len(result)
                    })

        return {
            "status": "max_iterations_reached",
            "result": None,
            "iterations": self.max_iterations,
            "tool_calls": tool_calls_made
        }

Pillar 2: Observability & Tracing

When an agent produces a wrong answer or takes 47 LLM calls instead of 5, you need full observability to debug it. Use Langfuse (open-source) or LangSmith.

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from functools import wraps
import time

langfuse = Langfuse(
    public_key="pk-...",
    secret_key="sk-...",
    host="https://cloud.langfuse.com"
)

@observe()  # Automatically traces this function
async def run_agent_with_tracing(goal: str, user_id: str):
    # Add custom metadata to the trace
    langfuse_context.update_current_trace(
        name="customer-support-agent",
        user_id=user_id,
        metadata={"goal_length": len(goal)}
    )

    agent = ProductionAgent(llm, tools)
    result = await agent.run(goal, run_id=langfuse_context.get_current_trace_id())

    # Log evaluation score
    langfuse_context.score_current_trace(
        name="task_completed",
        value=1 if result["status"] == "success" else 0
    )

    return result

# Track costs
def track_token_usage(model: str, input_tokens: int, output_tokens: int):
    """Log costs per run for budget monitoring"""
    costs = {
        "claude-opus-4-6": (0.015, 0.075),    # per 1K input/output tokens
        "claude-sonnet-4-6": (0.003, 0.015),
        "claude-haiku-4-5-20251001": (0.00025, 0.00125)
    }
    in_cost, out_cost = costs.get(model, (0, 0))
    total = (input_tokens / 1000 * in_cost) + (output_tokens / 1000 * out_cost)
    langfuse.event(name="token_cost", metadata={"cost_usd": total, "model": model})

Pillar 3: Caching & Cost Optimization

import hashlib
import json
from redis import Redis
from functools import wraps

redis = Redis(host="localhost", port=6379, decode_responses=True)

def cache_tool_result(ttl_seconds: int = 3600):
    """Cache tool results in Redis to avoid redundant calls"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Create cache key from function name + args
            cache_key = f"tool:{func.__name__}:{hashlib.md5(
                json.dumps({**kwargs}, sort_keys=True).encode()
            ).hexdigest()}"

            # Try cache first
            cached = redis.get(cache_key)
            if cached:
                return json.loads(cached)

            # Execute and cache
            result = await func(*args, **kwargs)
            redis.setex(cache_key, ttl_seconds, json.dumps(result))
            return result
        return wrapper
    return decorator

@cache_tool_result(ttl_seconds=1800)  # Cache for 30 minutes
async def web_search(query: str) -> str:
    """Web search - results cached to avoid duplicate API calls"""
    # ... actual search implementation

# Use Claude's prompt caching for repeated system prompts
def create_cached_system_message(long_system_prompt: str) -> list:
    """Mark system prompt as cacheable (saves 90% on repeated calls)"""
    return [
        {
            "type": "text",
            "text": long_system_prompt,
            "cache_control": {"type": "ephemeral"}  # Anthropic cache
        }
    ]

Pillar 4: Human-in-the-Loop & Approval Gates

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres import PostgresSaver

# Use PostgreSQL for persistent checkpoints (survives restarts)
checkpointer = PostgresSaver.from_conn_string("postgresql://...")

# Interrupt the graph before sensitive actions
workflow.add_node("approval_gate", approval_gate_node)
workflow.compile(
    checkpointer=checkpointer,
    interrupt_before=["execute_payment", "send_email", "delete_file"]
)

async def run_with_approval(task: str, thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}

    # Run until interruption
    result = await app.ainvoke({"task": task}, config=config)

    # Check if waiting for approval
    state = await app.aget_state(config)
    if state.next:  # Graph is interrupted
        pending_action = state.values.get("pending_action")
        print(f"โธ Approval needed: {pending_action}")

        # In production: send Slack notification, save to DB, etc.
        approval = await notify_human_and_wait(pending_action)

        if approval:
            # Resume the graph
            await app.ainvoke(None, config=config)
        else:
            print("โŒ Action rejected by user")

Pillar 5: Deployment Architecture

# docker-compose.yml for agent system
version: '3.8'
services:
  agent-api:
    build: .
    ports: ["8000:8000"]
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://postgres:password@db:5432/agents
    depends_on: [redis, db]
    deploy:
      replicas: 3  # Horizontal scaling

  # Background task queue for long-running agents
  celery-worker:
    build: .
    command: celery -A app.tasks worker --concurrency=10
    environment:
      - CELERY_BROKER_URL=redis://redis:6379

  redis:
    image: redis:7-alpine
    volumes: ["redis_data:/data"]

  db:
    image: postgres:16
    environment:
      POSTGRES_DB: agents
      POSTGRES_PASSWORD: password
# FastAPI server with async agent execution
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/agent/run")
async def run_agent(request: AgentRequest, background_tasks: BackgroundTasks):
    """Non-blocking agent execution with callback URL"""
    run_id = str(uuid.uuid4())
    background_tasks.add_task(
        execute_agent_and_callback,
        goal=request.goal,
        run_id=run_id,
        callback_url=request.callback_url
    )
    return {"run_id": run_id, "status": "queued"}

@app.get("/agent/stream/{run_id}")
async def stream_agent_output(run_id: str):
    """Server-sent events for real-time agent output"""
    async def generate():
        async for token in get_agent_stream(run_id):
            yield f"data: {json.dumps({'token': token})}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream")

Production Checklist

The production mindset shift: Your demo agent needs to work once. Your production agent needs to work 99.9% of the time, handle edge cases gracefully, stay within budget, and give you enough visibility to debug the 0.1% of failures โ€” all while handling concurrent users.