- Preview
- Code
- Doc
Copy
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Agentic Chat Agent Implementation.
This module provides a complete implementation of a conversational AI agent
using LangGraph workflows and OpenAI-compatible language models. It demonstrates
how to build streaming chat agents with proper state management, message
handling, and error recovery for production use.
Key Features:
- Real-time streaming response generation
- Conversation history management with proper role mapping
- Integration with OpenAI-compatible API endpoints
- Memory persistence using LangGraph checkpointers
- Tool binding and function calling support
- Robust error handling and graceful degradation
- Environment-based configuration management
Architecture:
The agent uses a simple linear workflow (START -> ai_response -> END)
with state management for conversation continuity. It supports various
message types (Human, System, AI) and provides streaming output for
better user experience.
Usage:
This agent can be deployed as a web service using the AG-Kit server
framework or used programmatically for building conversational applications.
"""
import json
import os
from typing import Any, List, Optional
from langchain_core.messages import AIMessage, SystemMessage, convert_to_openai_messages
from langchain_core.messages.ai import add_ai_message_chunks
from langchain_core.runnables import RunnableConfig
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.state import CompiledStateGraph
# from langgraph.config import get_stream_writer
from langgraph.types import StreamWriter
class State(MessagesState):
"""Chat Agent State Model.
This state class manages the conversation state for the chat agent,
extending LangGraph's MessagesState to include tool support for
function calling capabilities.
:param tools: List of available tools for the agent to use during conversation
:type tools: List[Any]
Inherited Fields:
- messages: Conversation history from MessagesState
Note:
The tools field contains tool definitions that will be bound to
the language model for function calling. Each tool should have
a name, description, and parameters schema.
Example:
Creating state with tools::
state = State(
messages=[HumanMessage(content="Hello!")],
tools=[
{
"name": "search_web",
"description": "Search the internet",
"parameters": '{"type": "object", "properties": {...}}'
}
]
)
"""
tools: List[Any]
def chat_node(state: State, config: Optional[RunnableConfig] = None, writer: StreamWriter = None) -> dict:
"""Generate AI response using OpenAI-compatible model with streaming support.
This node function processes the conversation state and generates streaming
responses using an OpenAI-compatible language model. It handles tool binding,
message conversion, real-time streaming, and comprehensive error recovery.
Processing Flow:
1. Load model configuration from environment variables
2. Initialize ChatOpenAI with custom settings
3. Process and bind available tools for function calling
4. Create system message and convert message history
5. Stream response chunks to client in real-time
6. Merge chunks into complete AIMessage for state
:param state: Current conversation state containing messages and tools
:type state: State
:param config: Optional runnable configuration for recursion limits and other settings
:type config: Optional[RunnableConfig]
:return: Dictionary containing the generated AI message for state update
:rtype: dict
Environment Variables:
- OPENAI_MODEL: Model name (default: "gpt-4o-mini")
- OPENAI_BASE_URL: Custom API endpoint (optional)
- OPENAI_TEMPERATURE: Response randomness (default: "0.7")
- OPENAI_API_KEY: API authentication key (required)
:raises Exception: Handled gracefully with error message in response
Note:
This is an internal node function used within the LangGraph workflow.
It uses get_stream_writer() for real-time streaming to clients and
maintains conversation context through proper message handling.
"""
# Get stream writer for real-time output
# writer = get_stream_writer()
if writer is None:
# Fallback no-op writer when stream writer is not available
def writer(x):
pass
try:
# Create LangChain ChatOpenAI model from environment variables
chat_model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7")),
)
# Set default config if none provided
if config is None:
config = RunnableConfig(recursion_limit=25)
# Process and bind tools if available
tools = state.get("tools", [])
if tools:
tool_list = [{**tool, "parameters": json.loads(tool["parameters"])} for tool in tools]
tools_list = [convert_to_openai_function(item) for item in tool_list]
chat_model_with_tools = chat_model.bind_tools(tools_list)
else:
chat_model_with_tools = chat_model
# Create system message to set assistant behavior
system_message = SystemMessage(content="You are a helpful assistant.")
messages = [system_message, *convert_to_openai_messages(state["messages"])]
chunks = []
for chunk in chat_model_with_tools.stream(messages, config):
writer({"messages": [chunk]}) # Stream chunk to client
chunks.append(chunk) # Collect for final message
# Merge all chunks into complete AIMessage
if chunks:
from langchain_core.messages import AIMessageChunk
# Convert chunks to AIMessageChunk if needed
ai_chunks = [
chunk if isinstance(chunk, AIMessageChunk) else AIMessageChunk(content=str(chunk)) for chunk in chunks
]
merged_message = add_ai_message_chunks(*ai_chunks)
return {"messages": [merged_message]}
else:
return {"messages": []}
except Exception as e:
# Handle errors gracefully with informative messages
return {"messages": [AIMessage(content=f"Error: {str(e)}")]}
def build_chat_workflow() -> CompiledStateGraph:
"""Build and configure the complete LangGraph chat workflow.
This function creates a production-ready LangGraph workflow for handling
chat conversations with streaming support. It sets up the state graph,
memory management, and response generation logic using OpenAI-compatible
streaming API with comprehensive error handling.
Workflow Architecture:
- State Management: Conversation history and tool availability
- Message Processing: Type conversion and role mapping
- Streaming Generation: Real-time response streaming to clients
- Error Recovery: Graceful handling of API failures
- Memory Persistence: Conversation continuity across sessions
Workflow Structure:
START -> ai_response -> END
:return: Compiled LangGraph workflow ready for production deployment
:rtype: CompiledStateGraph
Example:
Creating and deploying the chat workflow::
from ag_kit_py_langgraph import LangGraphAgent
workflow = build_chat_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="A helpful conversational assistant",
graph=workflow
)
# Process a conversation with memory
result = await workflow.ainvoke(
{
"messages": [HumanMessage(content="Hello!")],
"tools": []
},
config={"configurable": {"thread_id": "conversation_1"}}
)
Note:
The compiled workflow includes a MemorySaver checkpointer for
conversation persistence. Each conversation should use a unique
thread_id for proper state isolation.
"""
# Create state graph for the chat workflow
graph = StateGraph(State)
memory = MemorySaver() # In-memory checkpointer for conversation persistence
# Add the AI response generation node
graph.add_node("ai_response", chat_node)
graph.set_entry_point("ai_response")
# Define simple linear workflow: START -> ai_response -> END
graph.add_edge(START, "ai_response")
graph.add_edge("ai_response", END)
# Compile workflow with memory checkpointer for conversation history
return graph.compile(checkpointer=memory)
Agentic Chat - LangGraph (Python)
What This Demo Shows
This demo showcases AG-Kit’s agentic chat using LangGraph with Python and ZhipuAI:- Python LangGraph: Uses
StateGraphwith customStateclass - ZhipuAI Integration: Integrates with ZhipuAI GLM-4 model for Chinese language support
- Streaming Responses: Real-time streaming response generation
- Message Management: Proper message type conversion and role mapping
- Memory Persistence: Uses
MemorySaverfor conversation history
How to Interact
Try these suggestions or ask your own questions:- “Alert user about typhoon” (triggers alert tool)
- “Change background color to blue/red/green/yellow/purple/orange/pink/brown/gray/black/white” (triggers color change)
- “Get my current location” (triggers geolocation tool)
- “Hello, how are you today?”
- “Can you help me write a short story about a robot?”
Technical Implementation
Backend (Python LangGraph):- Custom
Stateclass withmessages: Annotated[list, add_messages] ChatZhipuAImodel with streaming enabled- Message type conversion (HumanMessage, SystemMessage, AIMessage)
- Role mapping for API communication (user, system, assistant)
- Error handling with graceful fallbacks
- Same
useChathook andAgKitChatcomponent - Three built-in tools:
alert,change-background-color, andget-current-location - Supports 12 color options: blue, red, green, yellow, purple, orange, pink, brown, gray, black, white, transparent
- Geolocation tool with browser API integration and permission handling
- URL parameter switching between implementations
- Consistent user experience across languages
Key Features
- Streaming API: Uses
astreamfor real-time response generation - Error Recovery: Graceful error handling with informative messages
- Memory Management: Conversation history persistence
- Multi-language Support: ZhipuAI provides excellent Chinese language support