跳转到主要内容

ag_kit_py.agents

Agents模块提供了使用AG-Kit构建AI Agent的核心抽象与实现。它定义了基础Agent接口,并提供了如LangGraph等框架特定的实现。

安装

pip install ag_kit_py

可用Agent

Agent类型状态描述
LangGraphLangGraphAgent✅ 可用基于LangGraph的Agent,包含稳定性补丁
基础BaseAgent✅ 可用自定义Agent的抽象基类

快速开始

创建LangGraph Agent

from ag_kit_py.agents import LangGraphAgent
from ag_kit_py.providers import create_provider
from langgraph.graph import StateGraph, MessagesState

# 创建provider
provider = create_provider("openai")
model = provider.get_langchain_model()

# 定义聊天节点
def chat_node(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 构建工作流
workflow = StateGraph(MessagesState)
workflow.add_node("chat", chat_node)
workflow.set_entry_point("chat")
workflow.set_finish_point("chat")

# 编译并创建Agent
graph = workflow.compile()
agent = LangGraphAgent(
    name="ChatAgent",
    description="对话型Agent",
    graph=graph
)

与服务器配合使用

from ag_kit_py.server import AGKitAPIApp

def create_agent():
    return {"agent": agent}

AGKitAPIApp().run(create_agent, port=8000)

BaseAgent

所有Agent必须继承的抽象基类。

类定义

from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator
from ag_ui.core import BaseEvent, RunAgentInput

class BaseAgent(ABC):
    """所有AG-Kit Agent的抽象基类"""
    
    def __init__(self, name: str, description: str, agent: Any):
        """初始化基础Agent
        
        参数:
            name: Agent的可读名称
            description: Agent功能的详细描述
            agent: 底层Agent实现
        """
        self._name = name
        self._description = description
        self._agent = agent

属性

name

获取Agent名称。
@property
def name(self) -> str:
    """获取Agent名称"""
    return self._name

description

获取Agent描述。
@property
def description(self) -> str:
    """获取Agent描述"""
    return self._description

agent

获取底层Agent实现。
@property
def agent(self) -> Any:
    """获取底层Agent实例"""
    return self._agent

抽象方法

run()

使用给定输入执行Agent。
@abstractmethod
async def run(
    self, 
    run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
    """使用给定输入执行Agent
    
    参数:
        run_input: Agent执行的输入数据
        
    生成:
        BaseEvent: 表示执行进度的事件
    """

具体方法

destroy()

清理Agent使用的资源。
def destroy(self) -> None:
    """清理Agent使用的资源"""
    pass

RunAgentInput

Agent执行的输入数据结构。
from ag_ui.core import RunAgentInput
from typing import Any, Dict, List

class RunAgentInput:
    """Agent执行的输入数据"""
    
    messages: List[Dict[str, Any]]      # 对话消息
    run_id: str                         # 唯一运行标识符
    thread_id: str                      # 线程/对话标识符
    state: Dict[str, Any]               # Agent状态
    context: List[Any]                  # 附加上下文
    tools: List[Any]                    # 可用工具
    forwarded_props: Dict[str, Any]     # 转发属性

BaseEvent

Agent执行事件的基础类。
from ag_ui.core import BaseEvent, EventType

class BaseEvent:
    """Agent执行的基础事件"""
    
    type: EventType                     # 事件类型
    thread_id: str                      # 线程标识符
    run_id: str                         # 运行标识符
    raw_event: Any                      # 原始事件数据

事件类型

from ag_ui.core import EventType

# 可用事件类型
EventType.RUN_STARTED              # Agent执行开始
EventType.RUN_FINISHED             # Agent执行完成
EventType.RUN_ERROR                # Agent执行错误
EventType.TEXT_MESSAGE_START       # 文本消息开始
EventType.TEXT_MESSAGE_CONTENT     # 文本消息内容块
EventType.TEXT_MESSAGE_END         # 文本消息结束
EventType.TOOL_CALL_START          # 工具调用开始
EventType.TOOL_CALL_ARGS           # 工具调用参数(流式)
EventType.TOOL_CALL_END            # 工具调用结束
EventType.TOOL_CALL_RESULT         # 工具调用结果
EventType.CUSTOM                   # 自定义事件

创建自定义Agent

要创建自定义Agent,需继承BaseAgent并实现run方法:
from ag_kit_py.agents import BaseAgent
from ag_ui.core import RunAgentInput, BaseEvent, EventType
from typing import AsyncGenerator

class CustomAgent(BaseAgent):
    """自定义Agent实现"""
    
    def __init__(self, name: str, description: str, custom_config: dict):
        # 初始化自定义Agent
        agent_impl = create_custom_agent(custom_config)
        super().__init__(name, description, agent_impl)
    
    async def run(
        self, 
        run_input: RunAgentInput
    ) -> AsyncGenerator[BaseEvent, None]:
        """执行自定义Agent"""
        # 发出运行开始事件
        yield BaseEvent(
            type=EventType.RUN_STARTED,
            thread_id=run_input.thread_id,
            run_id=run_input.run_id
        )
        
        try:
            # 执行Agent逻辑
            result = await self._agent.execute(run_input)
            
            # 发出结果事件
            yield BaseEvent(
                type=EventType.TEXT_MESSAGE_CONTENT,
                thread_id=run_input.thread_id,
                run_id=run_input.run_id,
                raw_event={"data": {"chunk": {"content": result}}}
            )
            
            # 发出运行完成事件
            yield BaseEvent(
                type=EventType.RUN_FINISHED,
                thread_id=run_input.thread_id,
                run_id=run_input.run_id
            )
        except Exception as e:
            # 发出错误事件
            yield BaseEvent(
                type=EventType.RUN_ERROR,
                thread_id=run_input.thread_id,
                run_id=run_input.run_id,
                raw_event={"error": str(e)}
            )
    
    def destroy(self) -> None:
        """清理资源"""
        # 实现清理逻辑
        self._agent.cleanup()

最佳实践

1. 使用类型提示

from ag_kit_py.agents import BaseAgent
from ag_ui.core import RunAgentInput, BaseEvent
from typing import AsyncGenerator

class MyAgent(BaseAgent):
    async def run(
        self, 
        run_input: RunAgentInput
    ) -> AsyncGenerator[BaseEvent, None]:
        # 类型提示提升代码质量
        pass

2. 优雅处理错误

async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
    try:
        # Agent逻辑
        yield event
    except Exception as e:
        # 始终发出错误事件
        yield BaseEvent(
            type=EventType.RUN_ERROR,
            thread_id=run_input.thread_id,
            run_id=run_input.run_id,
            raw_event={"error": str(e)}
        )

3. 清理资源

def destroy(self) -> None:
    """始终实现清理逻辑"""
    if hasattr(self._agent, 'close'):
        self._agent.close()
    if hasattr(self, '_db_connection'):
        self._db_connection.close()

4. 使用异步/等待

async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
    # 使用异步操作提升性能
    result = await self._agent.async_execute(run_input)
    yield result

5. 发出适当事件

async def run(self, run_input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
    # 始终发出RUN_STARTED事件
    yield BaseEvent(type=EventType.RUN_STARTED, ...)
    
    try:
        # 发出进度事件
        yield BaseEvent(type=EventType.TEXT_MESSAGE_CONTENT, ...)
        
        # 成功时始终发出RUN_FINISHED事件
        yield BaseEvent(type=EventType.RUN_FINISHED, ...)
    except Exception as e: