Skip to main content

LangGraph Agent

The LangGraphAgent class provides a LangGraph-based agent implementation that extends BaseAgent. It includes critical stability patches to fix known issues in the ag-ui-langgraph library, particularly around message regeneration and streaming event handling.

Installation

pip install ag_kit_py langgraph langchain-core
Optional dependencies:
# For OpenAI models
pip install langchain-openai

# For Anthropic models
pip install langchain-anthropic

# For community tools and integrations
pip install langchain-community

Quick Start

Basic Chat Agent

from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from langgraph.graph import StateGraph, MessagesState, START, END

# Create provider and model
provider = create_provider("openai")
model = provider.get_langchain_model()

# Define chat node
def chat_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.add_edge(START, "chat")
workflow.add_edge(END, "chat")

# Compile graph
graph = workflow.compile()

# Create agent (stability patches applied automatically)
agent = LangGraphAgent(
    name="ChatAgent",
    description="A conversational assistant",
    graph=graph
)

With Server

from ag_kit_py.server import AGKitAPIApp

def create_agent():
    return {"agent": agent}

AGKitAPIApp().run(create_agent, port=8000)

LangGraphAgent Class

Class Definition

from ag_kit_py.agents import LangGraphAgent
from langgraph.graph.state import CompiledStateGraph

class LangGraphAgent(BaseAgent):
    """LangGraph Agent implementation extending BaseAgent.
    
    This class wraps the ag-ui-langgraph LangGraphAgent with stability patches
    and provides a consistent interface following the AG-Kit agent pattern.
    """
    
    def __init__(
        self, 
        name: str, 
        description: str, 
        graph: CompiledStateGraph
    ):
        """Initialize the LangGraph agent with stability patches.
        
        Args:
            name: Human-readable name for the agent
            description: Detailed description of the agent's purpose
            graph: Compiled LangGraph state graph defining the agent's workflow
        """

Parameters

  • name (str): Human-readable name for the agent
  • description (str): Detailed description of the agent’s purpose and capabilities
  • graph (CompiledStateGraph): Compiled LangGraph state graph defining the agent’s workflow

Methods

run()

Execute the LangGraph agent with the given input.
async def run(
    self, 
    run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
    """Execute the LangGraph agent with the given input.
    
    Args:
        run_input: Input data for the agent execution
        
    Yields:
        BaseEvent: Events representing the agent's execution progress
    """
Example:
from ag_ui.core import RunAgentInput

run_input = RunAgentInput(
    messages=[{"role": "user", "content": "Hello"}],
    run_id="run-123",
    thread_id="thread-456",
    state={},
    context=[],
    tools=[],
    forwarded_props={}
)

async for event in agent.run(run_input):
    print(f"Event: {event.type}")

destroy()

Clean up resources used by the agent.
def destroy(self) -> None:
    """Clean up resources used by the agent."""

Stability Patches

AG-Kit Python SDK includes critical stability patches that fix known issues in the ag-ui-langgraph library. These patches are applied automatically when creating a LangGraphAgent instance.

Fixed Issues

  1. Message Regeneration Logic: Prevents “Message ID not found in history” errors
  2. Checkpoint Error Handling: Improves error handling for missing checkpoints
  3. Streaming Event Stability: Enhances streaming event processing reliability

Patch Details

The patches modify the following behaviors:
# 1. Prevents automatic regeneration that causes errors
# Only regenerates when explicitly requested, not on automatic detection

# 2. Enhanced error handling for missing checkpoints
# Gracefully degrades instead of crashing when checkpoints are missing

# 3. Improved streaming event processing
# Better handling of edge cases in streaming responses

Automatic Application

Patches are applied automatically on first agent creation:
from ag_kit_py.agents import LangGraphAgent

# First call automatically applies all patches
agent = LangGraphAgent("test", "Test agent", graph)

# Subsequent agents use the patched methods
agent2 = LangGraphAgent("test2", "Another agent", graph2)

Usage Examples

Agent with Tools

from langchain_core.tools import tool
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from ag_kit_py.providers import create_provider

# Define tools
@tool
def search_web(query: str) -> str:
    """Search the web for information."""
    return f"Search results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Calculate a mathematical expression."""
    try:
        result = eval(expression)
        return f"Result: {result}"
    except:
        return "Calculation error"

tools = [search_web, calculate]

# Create model with tools
provider = create_provider("openai")
model = provider.get_langchain_model().bind_tools(tools)

# Define agent node
def agent_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# Define tool node
tool_node = ToolNode(tools)

# Route function
def should_continue(state: MessagesState):
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools"
    return END

# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")

# Create agent
agent = LangGraphAgent(
    name="ToolAgent",
    description="Agent with web search and calculator tools",
    graph=workflow.compile()
)

Multi-Step Workflow

from langgraph.graph import StateGraph, MessagesState, START, END
from typing import Literal
from langchain_core.messages import HumanMessage

class WorkflowState(MessagesState):
    current_step: str = "analyze"
    analysis_result: str = ""
    plan: str = ""

def analyze_node(state: WorkflowState):
    # Analysis logic
    analysis = "Analysis complete"
    return {
        "current_step": "plan",
        "analysis_result": analysis,
        "messages": [HumanMessage(content=f"Analysis: {analysis}")]
    }

def plan_node(state: WorkflowState):
    # Planning logic
    plan = f"Plan based on {state['analysis_result']}"
    return {
        "current_step": "execute",
        "plan": plan,
        "messages": [HumanMessage(content=f"Plan: {plan}")]
    }

def execute_node(state: WorkflowState):
    # Execution logic
    result = f"Executed plan: {state['plan']}"
    return {
        "current_step": "done",
        "messages": [HumanMessage(content=f"Result: {result}")]
    }

def route_step(state: WorkflowState) -> Literal["plan", "execute", "end"]:
    step = state.get("current_step", "analyze")
    if step == "analyze":
        return "plan"
    elif step == "plan":
        return "execute"
    else:
        return "end"

# Build workflow
workflow = StateGraph(WorkflowState)
workflow.add_node("analyze", analyze_node)
workflow.add_node("plan", plan_node)
workflow.add_node("execute", execute_node)

workflow.add_edge(START, "analyze")
workflow.add_conditional_edges("analyze", route_step)
workflow.add_conditional_edges("plan", route_step)
workflow.add_conditional_edges("execute", route_step)

agent = LangGraphAgent(
    name="WorkflowAgent",
    description="Multi-step workflow agent",
    graph=workflow.compile()
)

Checkpoints and Persistence

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver

# Memory checkpoint
memory_saver = MemorySaver()
workflow_with_memory = workflow.compile(checkpointer=memory_saver)

# SQLite checkpoint
sqlite_saver = SqliteSaver.from_conn_string("checkpoints.db")
workflow_with_sqlite = workflow.compile(checkpointer=sqlite_saver)

# Create agent with persistence
persistent_agent = LangGraphAgent(
    name="PersistentAgent",
    description="Agent with persistent state",
    graph=workflow_with_sqlite
)

Human-in-the-Loop

from langgraph.graph import StateGraph, MessagesState, START, END, interrupt
from langchain_core.messages import HumanMessage

class InterruptState(MessagesState):
    user_approval: bool = False

def approval_node(state: InterruptState):
    # Request user approval
    interrupt("User approval required to continue")
    
    # Check approval status
    if state.get("user_approval", False):
        return {"messages": [HumanMessage(content="Approved, continuing")]}
    else:
        return {"messages": [HumanMessage(content="Waiting for approval")]}

workflow = StateGraph(InterruptState)
workflow.add_node("approval", approval_node)
workflow.add_edge(START, "approval")
workflow.add_edge("approval", END)

agent = LangGraphAgent(
    name="InterruptAgent",
    description="Agent with human-in-the-loop approval",
    graph=workflow.compile()
)

Streaming Responses

from ag_ui.core import RunAgentInput, EventType

async def stream_agent_response(agent, user_message: str):
    """Stream agent responses in real-time."""
    run_input = RunAgentInput(
        messages=[{"role": "user", "content": user_message}],
        run_id="run-123",
        thread_id="thread-456",
        state={},
        context=[],
        tools=[],
        forwarded_props={}
    )
    
    async for event in agent.run(run_input):
        if event.type == EventType.TEXT_MESSAGE_CONTENT:
            content = event.raw_event["data"]["chunk"]["content"]
            print(content, end="", flush=True)
        elif event.type == EventType.TOOL_CALL_START:
            print(f"\n[Calling tool: {event.tool_call_name}]")
        elif event.type == EventType.TOOL_CALL_RESULT:
            print(f"[Tool result received]")

# Usage
await stream_agent_response(agent, "What's the weather in Paris?")

Error Handling

Common Errors and Solutions

1. Message ID Not Found

# Fixed by stability patches
# The patches automatically handle this case without manual intervention

2. Missing Checkpoint

# Fixed by stability patches
# Gracefully degrades to normal flow when checkpoint is missing

3. Tool Execution Errors

from langchain_core.tools import tool

@tool
def safe_tool(input: str) -> str:
    """Tool with error handling."""
    try:
        # Tool logic
        return process(input)
    except Exception as e:
        return f"Tool execution error: {str(e)}"

Debugging

import logging

# Enable detailed logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ag_kit_py.agents")

# Patch application is logged automatically
# Look for: "🔧 MONKEY PATCH APPLIED: LangGraphAgent methods replaced successfully"

Performance Optimization

1. State Optimization

# Good: Keep state minimal
class OptimizedState(MessagesState):
    current_step: str = ""
    # Only store necessary fields

# Bad: Avoid large objects in state
class BadState(MessagesState):
    large_data: dict = {}  # ❌ Avoid

2. Node Optimization

async def optimized_node(state: MessagesState):
    # Use async operations
    result = await async_operation()
    
    # Return only changed fields
    return {"messages": [result]}  # ✅ Good
    # return state  # ❌ Avoid returning entire state

3. Tool Optimization

@tool
async def async_tool(query: str) -> str:
    """Use async tools for better performance."""
    result = await async_api_call(query)
    return result

Best Practices

1. State Design

  • Keep state structure simple
  • Store only necessary data
  • Use type annotations
from typing import TypedDict
from langgraph.graph import MessagesState

class MyState(MessagesState):
    """Well-designed state with type hints."""
    current_step: str
    result: str

2. Node Implementation

  • Use async functions
  • Handle exceptions
  • Return explicit state updates
async def my_node(state: MyState):
    """Well-implemented node."""
    try:
        result = await process(state)
        return {"result": result}
    except Exception as e:
        return {"error": str(e)}

3. Tool Integration

  • Provide clear tool descriptions
  • Implement error handling
  • Use typed parameters
@tool
def well_documented_tool(query: str, limit: int = 10) -> str:
    """Search for information.
    
    Args:
        query: The search query
        limit: Maximum number of results (default: 10)
        
    Returns:
        Search results as formatted string
    """
    try:
        return search(query, limit)
    except Exception as e:
        return f"Search failed: {e}"

4. Error Handling

  • Catch exceptions in nodes
  • Provide meaningful error messages
  • Implement graceful degradation
def error_handling_node(state: MyState):
    """Node with proper error handling."""
    try:
        result = risky_operation(state)
        return {"result": result}
    except ValueError as e:
        # Handle specific errors
        return {"error": f"Invalid input: {e}"}
    except Exception as e:
        # Handle unexpected errors
        return {"error": f"Unexpected error: {e}"}

Integration with Other Components

Server Integration

from ag_kit_py.server import AGKitAPIApp

def create_agent():
    return {"agent": agent}

# Quick start
AGKitAPIApp().run(create_agent, port=8000)

# Advanced configuration
app = AGKitAPIApp()
fastapi_app = app.build(
    create_agent,
    base_path="/api",
    enable_cors=True
)

Provider Integration

from ag_kit_py.providers import create_provider

# Create provider
provider = create_provider("openai")

# Get LangChain model
model = provider.get_langchain_model(
    model="gpt-4o-mini",
    temperature=0.7
)

# Use in LangGraph workflow
def chat_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

Complete Example

"""
Complete example of LangGraph agent with tools, checkpoints, and server.
"""

from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from ag_kit_py.server import AGKitAPIApp
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.sqlite import SqliteSaver
from langchain_core.tools import tool

# Define tools
@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Calculate expression."""
    try:
        return str(eval(expression))
    except:
        return "Error"

tools = [search, calculate]

# Create provider and model
provider = create_provider("openai")
model = provider.get_langchain_model().bind_tools(tools)

# Define nodes
def agent_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

tool_node = ToolNode(tools)

# Route function
def should_continue(state: MessagesState):
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

# Build workflow
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")

# Add persistence
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = workflow.compile(checkpointer=checkpointer)

# Create agent
agent = LangGraphAgent(
    name="CompleteAgent",
    description="Agent with tools and persistence",
    graph=graph
)

# Create agent creator
def create_agent():
    return {"agent": agent}

# Run server
if __name__ == "__main__":
    AGKitAPIApp().run(create_agent, port=8000)