- 预览
- 代码
- 文档
复制
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Agentic Chat Agent Implementation.
This module provides a complete implementation of a conversational AI agent
using LangGraph workflows and OpenAI-compatible language models. It demonstrates
how to build streaming chat agents with proper state management, message
handling, and error recovery for production use.
Key Features:
- Real-time streaming response generation
- Conversation history management with proper role mapping
- Integration with OpenAI-compatible API endpoints
- Memory persistence using LangGraph checkpointers
- Tool binding and function calling support
- Robust error handling and graceful degradation
- Environment-based configuration management
Architecture:
The agent uses a simple linear workflow (START -> ai_response -> END)
with state management for conversation continuity. It supports various
message types (Human, System, AI) and provides streaming output for
better user experience.
Usage:
This agent can be deployed as a web service using the AG-Kit server
framework or used programmatically for building conversational applications.
"""
import json
import os
from typing import Any, List, Optional
from langchain_core.messages import AIMessage, SystemMessage, convert_to_openai_messages
from langchain_core.messages.ai import add_ai_message_chunks
from langchain_core.runnables import RunnableConfig
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.state import CompiledStateGraph
# from langgraph.config import get_stream_writer
from langgraph.types import StreamWriter
class State(MessagesState):
"""Chat Agent State Model.
This state class manages the conversation state for the chat agent,
extending LangGraph's MessagesState to include tool support for
function calling capabilities.
:param tools: List of available tools for the agent to use during conversation
:type tools: List[Any]
Inherited Fields:
- messages: Conversation history from MessagesState
Note:
The tools field contains tool definitions that will be bound to
the language model for function calling. Each tool should have
a name, description, and parameters schema.
Example:
Creating state with tools::
state = State(
messages=[HumanMessage(content="Hello!")],
tools=[
{
"name": "search_web",
"description": "Search the internet",
"parameters": '{"type": "object", "properties": {...}}'
}
]
)
"""
tools: List[Any]
def chat_node(state: State, config: Optional[RunnableConfig] = None, writer: StreamWriter = None) -> dict:
"""Generate AI response using OpenAI-compatible model with streaming support.
This node function processes the conversation state and generates streaming
responses using an OpenAI-compatible language model. It handles tool binding,
message conversion, real-time streaming, and comprehensive error recovery.
Processing Flow:
1. Load model configuration from environment variables
2. Initialize ChatOpenAI with custom settings
3. Process and bind available tools for function calling
4. Create system message and convert message history
5. Stream response chunks to client in real-time
6. Merge chunks into complete AIMessage for state
:param state: Current conversation state containing messages and tools
:type state: State
:param config: Optional runnable configuration for recursion limits and other settings
:type config: Optional[RunnableConfig]
:return: Dictionary containing the generated AI message for state update
:rtype: dict
Environment Variables:
- OPENAI_MODEL: Model name (default: "gpt-4o-mini")
- OPENAI_BASE_URL: Custom API endpoint (optional)
- OPENAI_TEMPERATURE: Response randomness (default: "0.7")
- OPENAI_API_KEY: API authentication key (required)
:raises Exception: Handled gracefully with error message in response
Note:
This is an internal node function used within the LangGraph workflow.
It uses get_stream_writer() for real-time streaming to clients and
maintains conversation context through proper message handling.
"""
# Get stream writer for real-time output
# writer = get_stream_writer()
if writer is None:
# Fallback no-op writer when stream writer is not available
def writer(x):
pass
try:
# Create LangChain ChatOpenAI model from environment variables
chat_model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7")),
)
# Set default config if none provided
if config is None:
config = RunnableConfig(recursion_limit=25)
# Process and bind tools if available
tools = state.get("tools", [])
if tools:
tool_list = [{**tool, "parameters": json.loads(tool["parameters"])} for tool in tools]
tools_list = [convert_to_openai_function(item) for item in tool_list]
chat_model_with_tools = chat_model.bind_tools(tools_list)
else:
chat_model_with_tools = chat_model
# Create system message to set assistant behavior
system_message = SystemMessage(content="You are a helpful assistant.")
messages = [system_message, *convert_to_openai_messages(state["messages"])]
chunks = []
for chunk in chat_model_with_tools.stream(messages, config):
writer({"messages": [chunk]}) # Stream chunk to client
chunks.append(chunk) # Collect for final message
# Merge all chunks into complete AIMessage
if chunks:
from langchain_core.messages import AIMessageChunk
# Convert chunks to AIMessageChunk if needed
ai_chunks = [
chunk if isinstance(chunk, AIMessageChunk) else AIMessageChunk(content=str(chunk)) for chunk in chunks
]
merged_message = add_ai_message_chunks(*ai_chunks)
return {"messages": [merged_message]}
else:
return {"messages": []}
except Exception as e:
# Handle errors gracefully with informative messages
return {"messages": [AIMessage(content=f"Error: {str(e)}")]}
def build_chat_workflow() -> CompiledStateGraph:
"""Build and configure the complete LangGraph chat workflow.
This function creates a production-ready LangGraph workflow for handling
chat conversations with streaming support. It sets up the state graph,
memory management, and response generation logic using OpenAI-compatible
streaming API with comprehensive error handling.
Workflow Architecture:
- State Management: Conversation history and tool availability
- Message Processing: Type conversion and role mapping
- Streaming Generation: Real-time response streaming to clients
- Error Recovery: Graceful handling of API failures
- Memory Persistence: Conversation continuity across sessions
Workflow Structure:
START -> ai_response -> END
:return: Compiled LangGraph workflow ready for production deployment
:rtype: CompiledStateGraph
Example:
Creating and deploying the chat workflow::
from ag_kit_py_langgraph import LangGraphAgent
workflow = build_chat_workflow()
agent = LangGraphAgent(
name="ChatBot",
description="A helpful conversational assistant",
graph=workflow
)
# Process a conversation with memory
result = await workflow.ainvoke(
{
"messages": [HumanMessage(content="Hello!")],
"tools": []
},
config={"configurable": {"thread_id": "conversation_1"}}
)
Note:
The compiled workflow includes a MemorySaver checkpointer for
conversation persistence. Each conversation should use a unique
thread_id for proper state isolation.
"""
# Create state graph for the chat workflow
graph = StateGraph(State)
memory = MemorySaver() # In-memory checkpointer for conversation persistence
# Add the AI response generation node
graph.add_node("ai_response", chat_node)
graph.set_entry_point("ai_response")
# Define simple linear workflow: START -> ai_response -> END
graph.add_edge(START, "ai_response")
graph.add_edge("ai_response", END)
# Compile workflow with memory checkpointer for conversation history
return graph.compile(checkpointer=memory)
对话式 Agent - LangGraph (Python)
本演示展示
本演示展示了 AG-Kit 使用 Python 和 ZhipuAI 的 LangGraph 对话式 Agent:- Python LangGraph:使用带有自定义
State类的StateGraph - ZhipuAI 集成:集成 ZhipuAI GLM-4 模型以支持中文
- 流式响应:实时流式响应生成
- 消息管理:正确的消息类型转换和角色映射
- 记忆持久化:使用
MemorySaver保存对话历史
如何交互
尝试这些建议或提出您自己的问题:- “提醒用户台风”(触发警报工具)
- “将背景颜色改为蓝色/红色/绿色/黄色/紫色/橙色/粉色/棕色/灰色/黑色/白色”(触发颜色更改)
- “获取我当前位置”(触发地理定位工具)
- “你好,你今天怎么样?”
- “你能帮我写一个关于机器人的短故事吗?“
技术实现
后端 (Python LangGraph):- 带有
messages: Annotated[list, add_messages]的自定义State类 - 启用流式的
ChatZhipuAI模型 - 消息类型转换(HumanMessage、SystemMessage、AIMessage)
- API 通信的角色映射(user、system、assistant)
- 带有优雅回退的错误处理
- 相同的
useChat钩子和AgKitChat组件 - 三个内置工具:
alert、change-background-color和get-current-location - 支持 12 种颜色选项:blue、red、green、yellow、purple、orange、pink、brown、gray、black、white、transparent
- 具有浏览器 API 集成和权限处理的地理定位工具
- URL 参数在实现之间切换
- 跨语言的一致用户体验
关键特性
- 流式 API:使用
astream进行实时响应生成 - 错误恢复:带有信息性消息的优雅错误处理
- 记忆管理:对话历史持久化
- 多语言支持:ZhipuAI 提供出色的中文支持