跳转到主要内容
学习如何将工具与AG-Kit Agent集成,通过全面的工具包创建能与外部世界交互的强大AI工作流。

基础Agent集成

简单工具集成

最基本的Agent集成形式是创建一个自定义工具并将其提供给Agent。这使得Agent能够执行超出其基础语言能力的特定操作。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool as langchain_tool
from pydantic import BaseModel
import os

# 定义工具模式
class CalculatorInput(BaseModel):
    operation: str
    a: float
    b: float

# 创建一个简单工具
@langchain_tool
def calculator(operation: str, a: float, b: float) -> dict:
    """Perform basic mathematical calculations"""
    if operation == 'divide' and b == 0:
        raise ValueError('Division by zero')
    
    operations = {'add': a + b, 'subtract': a - b, 'multiply': a * b, 'divide': a / b}
    return {'result': operations[operation], 'operation': operation, 'operands': [a, b]}

# 创建带工具的模型
model = ChatOpenAI(model='gpt-4', api_key=os.getenv('OPENAI_API_KEY'))
model_with_tools = model.bind_tools([calculator])

# 创建LangGraph工作流
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
    'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)

graph = workflow.compile()
Agent集成的关键组件:
  • 模型提供者:配置LLM提供者(OpenAI、Anthropic等)
  • 工具定义:创建具有清晰模式和错误处理的工具
  • Agent配置:设置Agent的指令、工具和模型参数
  • 执行:使用自然语言输入运行Agent

多工具集成

对于更复杂的工作流,您可以将多个工具和工具包与单个Agent集成。这能创建出能够处理不同领域多样化任务的强大助手。
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from agkit.tools import (
    FilesystemToolkit,
    BuiltInCodeExecutor,
    create_bash_tool,
    LocalFileOperator
)
from agkit.tools.adapters import AGKitTool
import os

# 设置文件系统上下文
fs_context = {
    'working_directory': os.getcwd(),
    'fs_operator': LocalFileOperator()
}

# 创建文件系统工具包
filesystem_toolkit = FilesystemToolkit(
    name='filesystem-toolkit',
    context=fs_context
)

# 将AG-Kit工具转换为LangChain工具
agkit_tools = [
    *filesystem_toolkit.get_tools(),
    BuiltInCodeExecutor(),
    create_bash_tool(fs_context),
    calculator_tool
]
tools = [AGKitTool(tool=t).as_langchain_tool() for t in agkit_tools]

# 创建带工具的模型
model = ChatOpenAI(model='gpt-4')
model_with_tools = model.bind_tools(tools)

# 创建LangGraph工作流
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
    'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)

graph = workflow.compile()
这种方法使得Agent能够:
  • 读写文件 使用文件系统工具包
  • 执行代码 在安全的沙盒环境中
  • 运行命令行操作 实现系统交互
  • 执行计算 使用自定义工具

高级集成模式

智能工具选择

高级Agent可以配置多种专用工具和智能选择策略。Agent会根据任务上下文和用户需求自动选择最合适的工具。
# 为不同数据格式创建专用工具
data_processing_tools = [
    tool(
        lambda file_path, operation: {
            'success': True, 
            'data': {'processed': True, 'operation': operation}
        },
        name='csv_processor',
        description='Process CSV data files and perform data analysis',
        schema=CSVProcessorInput
    ),
    tool(
        lambda url, method='GET', headers=None: {
            'success': True, 
            'data': {'status': 200, 'response': 'API response'}
        },
        name='http_client',
        description='Make HTTP requests to external APIs',
        schema=HTTPClientInput
    )
]

# 创建包含多种专用工具的Agent
smart_agent = # Agent初始化
    # ...
    tools=[*data_processing_tools, *filesystem_toolkit.get_tools()],
    # ...
智能工具选择的优势:
  • 自动工具路由 基于任务需求
  • 上下文决策 优化工具使用
  • 可扩展架构 方便添加新专用工具
  • 清晰推理 工具选择逻辑透明

错误处理与容错机制

优雅的错误恢复

构建能够优雅处理工具故障的稳健Agent对生产应用至关重要。AG-Kit提供了多种错误处理和恢复机制。
# 创建具备错误处理能力的稳健Agent
resilient_agent = # Agent初始化
    # ...
    tools=[
        *filesystem_toolkit.get_tools(),
        BuiltInCodeExecutor()
    ],
    # control_flow={'error_retry_limit': 2, 'max_steps': 10}

# 使用时的错误处理
try:
    response = await resilient_agent.run('Process this complex request')
    print('Success:', response)
except Exception as error:
    print(f'Agent error: {error}')
    # 实施回退策略或用户通知

输入验证与安全

在工具中实施适当的输入验证和安全措施对Agent的安全操作至关重要,特别是在处理文件系统或外部API时。
async def validated_file_processor(file_path: str, operation: str):
    try:
        # 执行前验证
        if not os.path.exists(file_path):
            return {
                'success': False,
                'error': 'File does not exist',
                'error_type': 'validation'
            }
        
        stats = os.stat(file_path)
        if stats.st_size > 10 * 1024 * 1024:  # 10MB限制
            return {
                'success': False,
                'error': 'File too large (max 10MB)',
                'error_type': 'validation'
            }
        
        # 带错误处理的操作执行
        operations = {
            'read': lambda: {'content': open(file_path).read()},
            'analyze': lambda: {
                'lines': len(open(file_path).readlines()),
                'characters': len(open(file_path).read()),
                'words': len(open(file_path).read().split())
            },
            'transform': lambda: {'message': 'Transform operation completed'}
        }
        
        result = operations[operation]()
        
        # 执行后验证
        if not result or not isinstance(result, dict):
            return {
                'success': False,
                'error': 'Invalid processing result',
                'error_type': 'execution'
            }
        
        return {'success': True, 'data': result}
        
    except Exception as error:
        return {
            'success': False,
            'error': str(error),
            'error_type': 'execution'
        }

validated_tool = tool(
    validated_file_processor,
    name='validated_file_processor',
    description='Process files with comprehensive validation and security checks',
    schema=ValidatedFileProcessorInput
)

# 在Agent中使用已验证工具
secure_agent = # Agent初始化
    # ...
    tools=[validated_tool],
    # ...
关键验证实践:
  • 输入净化 防止路径遍历攻击
  • 文件大小限制 避免资源耗尽
  • 执行前后验证 确保数据完整性
  • 结构化错误响应 包含适当的错误类型

流式处理与实时集成

流式工具响应

对于长时间运行的操作或实时数据处理,AG-Kit支持流式响应,可向用户提供进度更新和中间结果。
import asyncio
from typing import AsyncIterator

# 为长时间运行操作创建流式工具
async def streaming_processor(dataset: str, batch_size: int = 100):
    async def stream_progress():
        for progress in [25, 50, 75, 100]:
            await asyncio.sleep(1)
            yield {
                'progress': progress,
                'message': f'Processing batch {progress // 25}',
                'timestamp': datetime.now().isoformat()
            }
    
    return {
        'success': True,
        'stream': stream_progress(),
        'data': {
            'message': 'Streaming processing started',
            'batch_size': batch_size,
            'estimated_duration': '4 seconds'
        }
    }

streaming_tool = tool(
    streaming_processor,
    name='streaming_processor',
    description='Process large datasets with real-time progress updates',
    schema=StreamingProcessorInput
)

# 创建流式Agent
streaming_agent = # Agent初始化
    # ...
    tools=[streaming_tool],
    # ...

# 启用流式的使用方式
response = await streaming_agent.run(
    'Process the large dataset with real-time progress updates',
    # state, options
)

# 处理流式响应
if hasattr(response, 'stream'):
    async for chunk in response.stream:
        print(f'Streaming chunk: {chunk}')
        # 使用进度信息更新UI
流式处理能力可实现:
  • 实时进度追踪 针对长时间运行的操作
  • 中间结果交付 提供更好的用户体验
  • 响应式UI更新 在处理过程中
  • 早期错误检测 和处理

性能优化

并行工具执行

AG-Kit Agent可以自动并行执行独立的工具调用,显著提升不相互依赖操作的性能。
# 创建支持并行执行的Agent
parallel_agent = # Agent初始化
    # ...
    tools=[
        calculator_tool,
        *filesystem_toolkit.get_tools()
    ],
    # ...

# Agent会自动并行执行独立的工具调用
response = await parallel_agent.run(
    'Calculate 15 * 23, read the config.json file, and check if package.json exists'
)
性能优势:
  • 减少执行时间 针对独立操作
  • 更好的资源利用率 通过并发处理
  • 改进用户体验 响应速度更快
  • 自动优化 无需手动协调

后续步骤