LangGraph extends LangChain with a graph-based execution model that gives you stateful, multi-step agents with persistent memory, conditional branching, cycles, and human-in-the-loop checkpointing. It's the production-grade choice for complex agentic workflows.
Why LangGraph? The Key Advantages
- Stateful: State persists across steps and can be inspected/modified at any point
- Cycles: Unlike DAG-only frameworks, LangGraph supports loops (agent thinks, acts, thinks again)
- Interrupts: Pause execution at any node for human approval, then resume
- Persistence: Save and restore state across sessions using checkpointers
- Streaming: Stream state updates, LLM tokens, and tool calls in real time
Core Concepts: Nodes, Edges & State
from typing import TypedDict, Annotated, List
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.tools import tool
import operator
# 1. Define the State โ shared across all nodes
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add] # append-only list
iteration_count: int
final_answer: str | None
# 2. Define Tools
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Web results for '{query}': [simulated results]"
@tool
def run_code(code: str) -> str:
"""Execute Python code and return the output."""
try:
import io, contextlib
output = io.StringIO()
with contextlib.redirect_stdout(output):
exec(code, {})
return output.getvalue() or "Code executed successfully (no output)"
except Exception as e:
return f"Error: {e}"
tools = [search_web, run_code]
llm = ChatAnthropic(model="claude-opus-4-6").bind_tools(tools)
# 3. Define Nodes (functions that transform state)
def agent_node(state: AgentState) -> dict:
"""Main reasoning node"""
response = llm.invoke(state["messages"])
return {
"messages": [response],
"iteration_count": state.get("iteration_count", 0) + 1
}
def check_completion(state: AgentState) -> str:
"""Router: decide whether to use tools or finish"""
last_message = state["messages"][-1]
# Check if we've used a tool
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
# Check iteration limit
if state.get("iteration_count", 0) >= 10:
return "force_end"
return "end"
# 4. Build the Graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools)) # Pre-built tool executor
# Add edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
check_completion,
{
"tools": "tools", # Use tool โ go to tool node
"end": END, # Done โ finish
"force_end": END # Max iterations โ finish
}
)
workflow.add_edge("tools", "agent") # After tool use โ back to agent
# Compile
app = workflow.compile()
# Run
result = app.invoke({
"messages": [HumanMessage(content="Search for the latest Claude model and write code to call it")],
"iteration_count": 0,
"final_answer": None
})
print(result["messages"][-1].content)
Checkpointing & Persistence
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
# In-memory checkpointer (development)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# PostgreSQL checkpointer (production โ survives restarts)
async def setup_production_app():
async with AsyncPostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/langgraph"
) as checkpointer:
await checkpointer.setup() # Create tables
app = workflow.compile(checkpointer=checkpointer)
return app
# Thread IDs enable separate conversation contexts
config_user1 = {"configurable": {"thread_id": "user-alice-session-1"}}
config_user2 = {"configurable": {"thread_id": "user-bob-session-1"}}
# Each thread maintains its own state independently
result_alice = await app.ainvoke(
{"messages": [HumanMessage(content="What is 2+2?")]},
config=config_user1
)
# Resume a thread โ it remembers all previous messages
result_alice_followup = await app.ainvoke(
{"messages": [HumanMessage(content="Double the result")]},
config=config_user1 # Same thread = same conversation history
)
Human-in-the-Loop: Interrupts & Approval Gates
from langgraph.graph import StateGraph, END
class WorkflowState(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
pending_action: dict | None
approved: bool
def plan_action(state: WorkflowState) -> dict:
"""Agent plans what action to take"""
action = {"type": "send_email", "to": "team@company.com", "body": "..."}
return {"pending_action": action}
def execute_action(state: WorkflowState) -> dict:
"""Execute the approved action"""
if not state.get("approved"):
return {"messages": [AIMessage(content="Action was not approved.")]}
action = state["pending_action"]
# Actually execute the action here
return {"messages": [AIMessage(content=f"Executed: {action['type']}")]}
workflow = StateGraph(WorkflowState)
workflow.add_node("plan", plan_action)
workflow.add_node("execute", execute_action)
workflow.add_edge(START, "plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", END)
# Compile with interrupt BEFORE execute node
app = workflow.compile(
checkpointer=MemorySaver(),
interrupt_before=["execute"] # Pause here for human review
)
config = {"configurable": {"thread_id": "approval-flow-1"}}
# Step 1: Run until interruption
state = app.invoke(
{"messages": [HumanMessage(content="Send a status update email")]},
config=config
)
# Step 2: Inspect what the agent wants to do
current_state = app.get_state(config)
pending = current_state.values.get("pending_action")
print(f"Agent wants to: {pending}") # Show to human
# Step 3: Human decides
human_approved = True # (get from UI/Slack/etc.)
# Step 4: Update state and resume
app.update_state(config, {"approved": human_approved})
final_state = app.invoke(None, config=config) # Resume from checkpoint
Streaming for Real-Time UX
async def stream_agent_response(user_message: str):
"""Stream all state updates, LLM tokens, and tool calls."""
config = {"configurable": {"thread_id": "stream-example"}}
async for event in app.astream_events(
{"messages": [HumanMessage(content=user_message)]},
config=config,
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_stream":
# Stream LLM tokens in real time
token = event["data"]["chunk"].content
if token:
print(token, end="", flush=True)
elif kind == "on_tool_start":
# Tool is being called
tool_name = event["name"]
print(f"\n[Using tool: {tool_name}...]")
elif kind == "on_tool_end":
# Tool returned result
result = event["data"]["output"]
print(f"[Tool result: {str(result)[:100]}]")
# Run it
import asyncio
asyncio.run(stream_agent_response("Search for LangGraph news and summarize"))
Subgraphs for Complex Workflows
"""Subgraphs allow you to nest graph logic โ great for reusable agent components"""
# Define a research sub-graph
research_workflow = StateGraph(AgentState)
research_workflow.add_node("search", search_node)
research_workflow.add_node("summarize", summarize_node)
research_workflow.add_edge(START, "search")
research_workflow.add_edge("search", "summarize")
research_workflow.add_edge("summarize", END)
research_app = research_workflow.compile()
# Use it as a node in the main graph
def research_node(state):
"""Invoke the research subgraph"""
result = research_app.invoke(state)
return result
main_workflow = StateGraph(AgentState)
main_workflow.add_node("research", research_node) # Subgraph as node
main_workflow.add_node("write", write_node)
main_workflow.add_edge(START, "research")
main_workflow.add_edge("research", "write")
main_workflow.add_edge("write", END)
When to reach for LangGraph: Use it when you need (1) cycles in your workflow, (2) persistent state across sessions, (3) human approval gates, (4) complex conditional branching, or (5) you're building a product (not a prototype). For simple linear chains, plain LCEL is sufficient.
Key Takeaways
- LangGraph models workflows as stateful graphs with typed state, nodes, and edges
Annotated[list, operator.add]creates append-only state โ the right pattern for message history- Checkpointers persist state to memory/DB โ enables multi-session conversations
- Thread IDs isolate separate user sessions in the same graph
interrupt_beforepauses execution for human review โ essential for production agentsastream_eventsenables real-time streaming of tokens, tool calls, and state updates- Subgraphs allow you to compose complex workflows from reusable components