基础Agent集成
简单工具集成
最基本的Agent集成形式是创建一个自定义工具并将其提供给Agent。这使得Agent能够执行超出其基础语言能力的特定操作。复制
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool as langchain_tool
from pydantic import BaseModel
import os
# 定义工具模式
class CalculatorInput(BaseModel):
operation: str
a: float
b: float
# 创建一个简单工具
@langchain_tool
def calculator(operation: str, a: float, b: float) -> dict:
"""Perform basic mathematical calculations"""
if operation == 'divide' and b == 0:
raise ValueError('Division by zero')
operations = {'add': a + b, 'subtract': a - b, 'multiply': a * b, 'divide': a / b}
return {'result': operations[operation], 'operation': operation, 'operands': [a, b]}
# 创建带工具的模型
model = ChatOpenAI(model='gpt-4', api_key=os.getenv('OPENAI_API_KEY'))
model_with_tools = model.bind_tools([calculator])
# 创建LangGraph工作流
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)
graph = workflow.compile()
- 模型提供者:配置LLM提供者(OpenAI、Anthropic等)
- 工具定义:创建具有清晰模式和错误处理的工具
- Agent配置:设置Agent的指令、工具和模型参数
- 执行:使用自然语言输入运行Agent
多工具集成
对于更复杂的工作流,您可以将多个工具和工具包与单个Agent集成。这能创建出能够处理不同领域多样化任务的强大助手。复制
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from agkit.tools import (
FilesystemToolkit,
BuiltInCodeExecutor,
create_bash_tool,
LocalFileOperator
)
from agkit.tools.adapters import AGKitTool
import os
# 设置文件系统上下文
fs_context = {
'working_directory': os.getcwd(),
'fs_operator': LocalFileOperator()
}
# 创建文件系统工具包
filesystem_toolkit = FilesystemToolkit(
name='filesystem-toolkit',
context=fs_context
)
# 将AG-Kit工具转换为LangChain工具
agkit_tools = [
*filesystem_toolkit.get_tools(),
BuiltInCodeExecutor(),
create_bash_tool(fs_context),
calculator_tool
]
tools = [AGKitTool(tool=t).as_langchain_tool() for t in agkit_tools]
# 创建带工具的模型
model = ChatOpenAI(model='gpt-4')
model_with_tools = model.bind_tools(tools)
# 创建LangGraph工作流
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)
graph = workflow.compile()
- 读写文件 使用文件系统工具包
- 执行代码 在安全的沙盒环境中
- 运行命令行操作 实现系统交互
- 执行计算 使用自定义工具
高级集成模式
智能工具选择
高级Agent可以配置多种专用工具和智能选择策略。Agent会根据任务上下文和用户需求自动选择最合适的工具。复制
# 为不同数据格式创建专用工具
data_processing_tools = [
tool(
lambda file_path, operation: {
'success': True,
'data': {'processed': True, 'operation': operation}
},
name='csv_processor',
description='Process CSV data files and perform data analysis',
schema=CSVProcessorInput
),
tool(
lambda url, method='GET', headers=None: {
'success': True,
'data': {'status': 200, 'response': 'API response'}
},
name='http_client',
description='Make HTTP requests to external APIs',
schema=HTTPClientInput
)
]
# 创建包含多种专用工具的Agent
smart_agent = # Agent初始化
# ...
tools=[*data_processing_tools, *filesystem_toolkit.get_tools()],
# ...
- 自动工具路由 基于任务需求
- 上下文决策 优化工具使用
- 可扩展架构 方便添加新专用工具
- 清晰推理 工具选择逻辑透明
错误处理与容错机制
优雅的错误恢复
构建能够优雅处理工具故障的稳健Agent对生产应用至关重要。AG-Kit提供了多种错误处理和恢复机制。复制
# 创建具备错误处理能力的稳健Agent
resilient_agent = # Agent初始化
# ...
tools=[
*filesystem_toolkit.get_tools(),
BuiltInCodeExecutor()
],
# control_flow={'error_retry_limit': 2, 'max_steps': 10}
# 使用时的错误处理
try:
response = await resilient_agent.run('Process this complex request')
print('Success:', response)
except Exception as error:
print(f'Agent error: {error}')
# 实施回退策略或用户通知
输入验证与安全
在工具中实施适当的输入验证和安全措施对Agent的安全操作至关重要,特别是在处理文件系统或外部API时。复制
async def validated_file_processor(file_path: str, operation: str):
try:
# 执行前验证
if not os.path.exists(file_path):
return {
'success': False,
'error': 'File does not exist',
'error_type': 'validation'
}
stats = os.stat(file_path)
if stats.st_size > 10 * 1024 * 1024: # 10MB限制
return {
'success': False,
'error': 'File too large (max 10MB)',
'error_type': 'validation'
}
# 带错误处理的操作执行
operations = {
'read': lambda: {'content': open(file_path).read()},
'analyze': lambda: {
'lines': len(open(file_path).readlines()),
'characters': len(open(file_path).read()),
'words': len(open(file_path).read().split())
},
'transform': lambda: {'message': 'Transform operation completed'}
}
result = operations[operation]()
# 执行后验证
if not result or not isinstance(result, dict):
return {
'success': False,
'error': 'Invalid processing result',
'error_type': 'execution'
}
return {'success': True, 'data': result}
except Exception as error:
return {
'success': False,
'error': str(error),
'error_type': 'execution'
}
validated_tool = tool(
validated_file_processor,
name='validated_file_processor',
description='Process files with comprehensive validation and security checks',
schema=ValidatedFileProcessorInput
)
# 在Agent中使用已验证工具
secure_agent = # Agent初始化
# ...
tools=[validated_tool],
# ...
- 输入净化 防止路径遍历攻击
- 文件大小限制 避免资源耗尽
- 执行前后验证 确保数据完整性
- 结构化错误响应 包含适当的错误类型
流式处理与实时集成
流式工具响应
对于长时间运行的操作或实时数据处理,AG-Kit支持流式响应,可向用户提供进度更新和中间结果。复制
import asyncio
from typing import AsyncIterator
# 为长时间运行操作创建流式工具
async def streaming_processor(dataset: str, batch_size: int = 100):
async def stream_progress():
for progress in [25, 50, 75, 100]:
await asyncio.sleep(1)
yield {
'progress': progress,
'message': f'Processing batch {progress // 25}',
'timestamp': datetime.now().isoformat()
}
return {
'success': True,
'stream': stream_progress(),
'data': {
'message': 'Streaming processing started',
'batch_size': batch_size,
'estimated_duration': '4 seconds'
}
}
streaming_tool = tool(
streaming_processor,
name='streaming_processor',
description='Process large datasets with real-time progress updates',
schema=StreamingProcessorInput
)
# 创建流式Agent
streaming_agent = # Agent初始化
# ...
tools=[streaming_tool],
# ...
# 启用流式的使用方式
response = await streaming_agent.run(
'Process the large dataset with real-time progress updates',
# state, options
)
# 处理流式响应
if hasattr(response, 'stream'):
async for chunk in response.stream:
print(f'Streaming chunk: {chunk}')
# 使用进度信息更新UI
- 实时进度追踪 针对长时间运行的操作
- 中间结果交付 提供更好的用户体验
- 响应式UI更新 在处理过程中
- 早期错误检测 和处理
性能优化
并行工具执行
AG-Kit Agent可以自动并行执行独立的工具调用,显著提升不相互依赖操作的性能。复制
# 创建支持并行执行的Agent
parallel_agent = # Agent初始化
# ...
tools=[
calculator_tool,
*filesystem_toolkit.get_tools()
],
# ...
# Agent会自动并行执行独立的工具调用
response = await parallel_agent.run(
'Calculate 15 * 23, read the config.json file, and check if package.json exists'
)
- 减少执行时间 针对独立操作
- 更好的资源利用率 通过并发处理
- 改进用户体验 响应速度更快
- 自动优化 无需手动协调