ag_kit_py.agents
Agents模块提供了使用AG-Kit构建AI Agent的核心抽象与实现。它定义了基础Agent接口,并提供了如LangGraph等框架特定的实现。安装
复制
pip install ag_kit_py
可用Agent
| Agent类型 | 类 | 状态 | 描述 |
|---|---|---|---|
| LangGraph | LangGraphAgent | ✅ 可用 | 基于LangGraph的Agent,包含稳定性补丁 |
| 基础 | BaseAgent | ✅ 可用 | 自定义Agent的抽象基类 |
快速开始
创建LangGraph Agent
复制
from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from langgraph.graph import StateGraph, MessagesState
# 创建provider
provider = create_provider("openai")
model = provider.get_langchain_model()
# 定义聊天节点
def chat_node(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 构建工作流
workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.set_entry_point("chat")
workflow.set_finish_point("chat")
# 编译并创建Agent
graph = workflow.compile()
agent = LangGraphAgent(
name="ChatAgent",
description="对话型Agent",
graph=graph
)
与服务器配合使用
复制
from ag_kit_py.server import AGKitAPIApp
def create_agent():
return {"agent": agent}
AGKitAPIApp().run(create_agent, port=8000)
BaseAgent
所有Agent必须继承的抽象基类。类定义
复制
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator
from ag_ui.core import BaseEvent, RunAgentInput
class BaseAgent(ABC):
"""所有AG-Kit Agent的抽象基类"""
def __init__(self, name: str, description: str, agent: Any):
"""初始化基础Agent
参数:
name: Agent的可读名称
description: Agent功能的详细描述
agent: 底层Agent实现
"""
self._name = name
self._description = description
self._agent = agent
属性
name
获取Agent名称。复制
@property
def name(self) -> str:
"""获取Agent名称"""
return self._name
description
获取Agent描述。复制
@property
def description(self) -> str:
"""获取Agent描述"""
return self._description
agent
获取底层Agent实现。复制
@property
def agent(self) -> Any:
"""获取底层Agent实例"""
return self._agent
抽象方法
run()
使用给定输入执行Agent。复制
@abstractmethod
async def run(
self,
run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
"""使用给定输入执行Agent
参数:
run_input: Agent执行的输入数据
生成:
BaseEvent: 表示执行进度的事件
"""
具体方法
destroy()
清理Agent使用的资源。复制
def destroy(self) -> None:
"""清理Agent使用的资源"""
pass
RunAgentInput
Agent执行的输入数据结构。复制
from ag_ui.core import RunAgentInput
from typing import Any, Dict, List
class RunAgentInput:
"""Agent执行的输入数据"""
messages: List[Dict[str, Any]] # 对话消息
run_id: str # 唯一运行标识符
thread_id: str # 线程/对话标识符
state: Dict[str, Any] # Agent状态
context: List[Any] # 附加上下文
tools: List[Any] # 可用工具
forwarded_props: Dict[str, Any] # 转发属性
BaseEvent
Agent执行事件的基础类。复制
from ag_ui.core import BaseEvent, EventType
class BaseEvent:
"""Agent执行的基础事件"""
type: EventType # 事件类型
thread_id: str # 线程标识符
run_id: str # 运行标识符
raw_event: Any # 原始事件数据
事件类型
复制
from ag_ui.core import EventType
# 可用事件类型
EventType.RUN_STARTED # Agent执行开始
EventType.RUN_FINISHED # Agent执行完成
EventType.RUN_ERROR # Agent执行错误
EventType.TEXT_MESSAGE_START # 文本消息开始
EventType.TEXT_MESSAGE_CONTENT # 文本消息内容块
EventType.TEXT_MESSAGE_END # 文本消息结束
EventType.TOOL_CALL_START # 工具调用开始
EventType.TOOL_CALL_ARGS # 工具调用参数(流式)
EventType.TOOL_CALL_END # 工具调用结束
EventType.TOOL_CALL_RESULT # 工具调用结果
EventType.CUSTOM # 自定义事件
创建自定义Agent
要创建自定义Agent,需继承BaseAgent并实现run方法:
复制
from ag_kit_py.agents import BaseAgent
from ag_ui.core import RunAgentInput, BaseEvent, EventType
from typing import AsyncGenerator
class CustomAgent(BaseAgent):
"""自定义Agent实现"""
def __init__(self, name: str, description: str, custom_config: dict):
# 初始化自定义Agent
agent_impl = create_custom_agent(custom_config)
super().__init__(name, description, agent_impl)
async def run(
self,
run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
"""执行自定义Agent"""
# 发出运行开始事件
yield BaseEvent(
type=EventType.RUN_STARTED,
thread_id=run_input.thread_id,
run_id=run_input.run_id
)
try:
# 执行Agent逻辑
result = await self._agent.execute(run_input)
# 发出结果事件
yield BaseEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
raw_event={"data": {"chunk": {"content": result}}}
)
# 发出运行完成事件
yield BaseEvent(
type=EventType.RUN_FINISHED,
thread_id=run_input.thread_id,
run_id=run_input.run_id
)
except Exception as e:
# 发出错误事件
yield BaseEvent(
type=EventType.RUN_ERROR,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
raw_event={"error": str(e)}
)
def destroy(self) -> None:
"""清理资源"""
# 实现清理逻辑
self._agent.cleanup()
最佳实践
1. 使用类型提示
复制
from ag_kit_py.agents import BaseAgent
from ag_ui.core import RunAgentInput, BaseEvent
from typing import AsyncGenerator
class MyAgent(BaseAgent):
async def run(
self,
run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
# 类型提示提升代码质量
pass
2. 优雅处理错误
复制
async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
try:
# Agent逻辑
yield event
except Exception as e:
# 始终发出错误事件
yield BaseEvent(
type=EventType.RUN_ERROR,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
raw_event={"error": str(e)}
)
3. 清理资源
复制
def destroy(self) -> None:
"""始终实现清理逻辑"""
if hasattr(self._agent, 'close'):
self._agent.close()
if hasattr(self, '_db_connection'):
self._db_connection.close()
4. 使用异步/等待
复制
async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
# 使用异步操作提升性能
result = await self._agent.async_execute(run_input)
yield result
5. 发出适当事件
复制
async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
# 始终发出RUN_STARTED事件
yield BaseEvent(type=EventType.RUN_STARTED, ...)
try:
# 发出进度事件
yield BaseEvent(type=EventType.TEXT_MESSAGE_CONTENT, ...)
# 成功时始终发出RUN_FINISHED事件
yield BaseEvent(type=EventType.RUN_FINISHED, ...)
except Exception as e: