MCP 集成
AG-Kit Python 的 MCP 集成提供了 AG-Kit 工具系统与模型上下文协议(MCP)之间的无缝双向转换。通过该功能,您可以:- 连接标准 MCP 服务器,并将其工具作为 AG-Kit BaseTool 实例使用
- 将 AG-Kit 工具暴露为 MCP 服务器,供外部客户端调用
- 支持多种传输协议,包括 stdio、HTTP、SSE、内存通信等
概述
MCP 集成包含以下核心组件:- MCPClientTool:封装外部 MCP 工具以适配 AG-Kit 环境
- MCPToolkit:管理 MCP 工具的高级工具包
- AGKitMCPServer:将 AG-Kit 工具暴露为标准 MCP 服务器
- MCPClientManager:管理多个 MCP 服务器的连接
快速入门
在 AG-Kit 中使用外部 MCP 工具
复制
from ag_kit_py.tools.mcp import MCPClientManager, MCPClientTool
# 创建客户端管理器
client_manager = MCPClientManager()
# 连接外部 MCP 服务器
await client_manager.add_server('math-server', {
'name': 'math-client',
'version': '1.0.0',
'transport': {
'type': 'stdio',
'command': 'python',
'args': ['-m', 'math_mcp_server']
}
})
# 从 MCP 工具创建 AG-Kit 工具
client_tools = client_manager.create_client_tools('math-server')
# 在 AG-Kit 应用中使用工具
for tool in client_tools:
result = await tool.invoke({'a': 5, 'b': 3})
print(result.data)
将 AG-Kit 工具暴露为 MCP 服务器
复制
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyCustomTool
# 创建 AG-Kit 工具
calculator_tool = MyCustomTool()
# 创建 MCP 服务器
server = AGKitMCPServer({
'name': 'ag-kit-mcp-server',
'version': '1.0.0',
'description': '通过 MCP 暴露的 AG-Kit 工具'
})
# 注册工具
server.register_tool(calculator_tool)
# 使用 stdio 传输启动服务器
await server.run({'type': 'stdio'})
传输协议
Stdio 传输
最常用的 MCP 服务器传输方式,使用标准输入/输出。服务器配置
复制
# 使用 stdio 启动服务器
await server.run({'type': 'stdio'})
客户端配置
复制
await client_manager.add_server('my-server', {
'name': 'my-client',
'version': '1.0.0',
'transport': {
'type': 'stdio',
'command': 'python',
'args': ['-m', 'my_mcp_server'],
'timeout': 10000
}
})
HTTP 传输 (StreamableHTTP)
适用于基于 Web 的 MCP 服务器和客户端。服务器配置
复制
from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer
app = FastAPI()
server = AGKitMCPServer({
'name': 'http-server',
'version': '1.0.0'
})
async def http_setup(server_instance, create_transport):
@app.post('/mcp')
async def handle_mcp(request: Request):
transport = await create_transport({
'enable_json_response': True,
'session_id_generator': lambda: str(uuid.uuid4())
})
body = await request.json()
return await transport.handle_request(request, body)
await server.run({
'type': 'streamableHttp',
'streamable_http_setup': http_setup
})
# 启动 FastAPI 服务器
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=3000)
客户端配置
复制
await client_manager.add_server('http-server', {
'name': 'http-client',
'version': '1.0.0',
'transport': {
'type': 'streamableHttp',
'url': 'http://localhost:3000/mcp',
'timeout': 15000
}
})
SSE 传输
服务器发送事件传输,用于实时通信。服务器配置
复制
from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer
app = FastAPI()
transport = None
async def sse_setup(server_instance, create_transport_func):
global transport
@app.get('/mcp/sse')
async def handle_sse(request: Request):
nonlocal transport
transport = await create_transport_func('/mcp/sse', request, {
'enable_dns_rebinding_protection': False
})
@app.post('/mcp/sse')
async def handle_post(request: Request):
if transport:
body = await request.json()
return await transport.handle_post_message(request, body)
else:
return {'error': '无活跃的 SSE 连接'}
await server.run({
'type': 'sse',
'sse_setup': sse_setup
})
客户端配置
复制
await client_manager.add_server('sse-server', {
'name': 'sse-client',
'version': '1.0.0',
'transport': {
'type': 'sse',
'url': 'http://localhost:3000/mcp/sse'
}
})
内存传输
用于测试和同进程通信。服务器配置
复制
memory_id = 'my-memory-server'
await server.run({
'type': 'memory',
'memory_id': memory_id
})
客户端配置
复制
await client_manager.add_server('memory-server', {
'name': 'memory-client',
'version': '1.0.0',
'transport': {
'type': 'memory',
'memory_id': 'my-memory-server'
}
})
核心示例
示例 1:创建并运行 MCP 服务器
将 AG-Kit 工具暴露为标准 MCP 服务器:复制
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyExistingTool
# 创建 MCP 服务器实例
server = AGKitMCPServer({
'name': 'my-agkit-server',
'version': '1.0.0',
'description': '通过 MCP 暴露的 AG-Kit 工具'
})
# 注册现有 AG-Kit 工具
my_tool = MyExistingTool()
server.register_tool(my_tool)
# 使用共享配置注册多个工具
server.register_tools([tool1, tool2, tool3], {
'name_prefix': 'agkit_' # 所有工具名称将添加 'agkit_' 前缀
})
# 使用不同传输协议启动服务器
# 选项 1:Stdio(最常用)
await server.run({'type': 'stdio'})
# 选项 2:HTTP
await server.run({
'type
# 清理
await client_manager.disconnect_all()
await server.stop()
server_task.cancel()
try:
await server_task
except asyncio.CancelledError:
pass
示例5:高级连接管理
处理连接选项和错误场景:复制
from ag_kit_py.tools.mcp import MCPClientManager
client_manager = MCPClientManager()
# 添加带连接选项的服务器
await client_manager.add_server('reliable-server', {
'name': 'reliable-client',
'version': '1.0.0',
'transport': {
'type': 'stdio',
'command': 'external-server',
'timeout': 10000
}
}, {
'auto_reconnect': True,
'reconnect_delay': 5000,
'max_reconnect_attempts': 3,
'heartbeat_interval': 30000
})
# 监控连接状态
import time
while True:
is_connected = client_manager.is_server_connected('reliable-server')
print(f'服务器连接状态: {is_connected}')
await asyncio.sleep(5)
示例6:事件处理与监控
通过事件监听器监控MCP操作:复制
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager
# 服务器事件监控
server = AGKitMCPServer({
'name': 'monitored-server',
'version': '1.0.0'
})
def server_event_handler(event):
event_type = event.type
if event_type == 'connected':
print(f"客户端已连接: {event.data.get('transport')}")
elif event_type == 'tool_called':
print(f"工具 {event.data.get('tool_name')} 被调用")
elif event_type == 'tool_result':
print(f"工具返回结果: {event.data.get('result')}")
elif event_type == 'error':
print(f"服务器错误: {event.data.get('error')}")
server.add_event_listener(server_event_handler)
# 客户端事件监控
client_manager = MCPClientManager()
def client_event_handler(event):
event_type = event.type
if event_type == 'connected':
print(f"已连接至服务器: {event.data.get('server_id')}")
elif event_type == 'tool_discovered':
print(f"发现工具: {event.data.get('tool_name')}")
elif event_type == 'disconnected':
print(f"连接断开: {event.data.get('reason', '未知原因')}")
elif event_type == 'error':
print(f"客户端错误: {event.data.get('error')}")
client_manager.add_event_listener(client_event_handler)
高级配置
工具配置
自定义AG-Kit工具通过MCP的暴露方式:复制
# 注册带自定义配置的工具
server.register_tool(calculator_tool, {
'name_prefix': 'math_', # 工具名前缀
'description': '自定义计算器' # 覆盖描述
})
# 批量注册工具并共享配置
server.register_tools([tool1, tool2, tool3], {
'name_prefix': 'utils_'
})
客户端工具配置
自定义MCP客户端工具:复制
from ag_kit_py.tools.mcp import MCPClientTool
from ag_kit_py.tools import ToolResult
def transform_input(input_data):
return {**input_data, 'timestamp': time.time()}
def transform_output(output_data):
return {'result': output_data, 'processed': True}
def error_handler(error):
return ToolResult(
success=False,
error=f'自定义错误: {str(error)}'
)
client_tool = MCPClientTool(
mcp_client=mcp_client,
mcp_tool_metadata=tool_metadata,
config={
'name': 'custom_tool_name', # 自定义AG-Kit名称
'timeout': 30000, # 调用超时时间
'retries': 3 # 重试次数
},
adapter_config={
'include_metadata': True, # 包含执行元数据
'transform_input': transform_input,
'transform_output': transform_output,
'error_handler': error_handler
}
)
连接管理
高级连接选项:复制
await client_manager.add_server('reliable-server', config, {
'auto_reconnect': True, # 失败时自动重连
'reconnect_delay': 5000, # 重连间隔时间
'max_reconnect_attempts': 5, # 最大重连尝试次数
'heartbeat_interval': 30000 # 心跳间隔
})
事件处理
监控MCP操作:复制
# 服务器事件
def handle_server_event(event):
if event.type == 'connected':
print(f"客户端已连接")
elif event.type == 'tool_called':
print(f"工具被调用: {event.data['tool_name']}")
elif event.type == 'error':
print(f"错误: {event.data['error']}")
server.add_event_listener(handle_server_event)
# 客户端事件
def handle_client_event(event):
if event.type == 'connected':
print(f"已连接至服务器")
elif event.type == 'tool_discovered':
print(f"发现工具: {event.data['tool_name']}")
elif event.type == 'disconnected':
print(f"连接断开")
client_manager.add_event_listener(handle_client_event)
模式转换
AG-Kit自动在Pydantic模型和MCP JSON模式间转换:Pydantic转MCP模式
复制
from pydantic import BaseModel, Field
from ag_kit_py.tools.mcp.utils import pydantic_to_json_schema
class UserInput(BaseModel):
name: str = Field(description='用户名')
age: int = Field(ge=0, le=150)
email: str | None = None
tags: list[str]
preferences: dict[str, Any]
mcp_schema = pydantic_to_json_schema(UserInput)
# 生成MCP兼容的JSON模式
MCP转Pydantic模式
复制
# MCPClientTool自动处理MCP模式
mcp_tool_metadata = {
'name': 'user_tool',
'inputSchema': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'age': {'type': 'number'}
},
'required': ['name', 'age']
}
}
client_tool = MCPClientTool(client, mcp_tool_metadata)
# 工具现在具有AG-Kit兼容的模式
错误处理
服务器错误处理
复制
from ag_kit_py.tools import BaseTool, ToolResult
class SafeTool(BaseTool):
async def _invoke(self, input_data):
try:
# 工具逻辑
return ToolResult(success=True, data=result)
except Exception as error:
return ToolResult(
success=False,
error=f'工具执行失败: {str(error)}'
)
server = AGKitMCPServer({
'name': 'error-handling-server',
'version': '1.0.0',
'error_handling': 'return_error', # 或 'throw'
})
客户端错误处理
复制
def custom_error_handler(error):
if 'timeout' in str(error):
return ToolResult(
success=False,
error='操作超时,请重试'
)
return ToolResult(
success=False,
error=f'MCP工具错误: {str(error)}'
)
client_tool = MCPClientTool(
mcp_client=client,
mcp_tool_metadata=metadata,
config={'retries':
**关键要求:**
1. **使用适当的传输方式**:CLI工具使用stdio,Web服务使用HTTP,测试使用内存传输
2. **优雅处理错误**:实现正确的错误处理和重试逻辑
3. **监控连接状态**:使用事件监听器跟踪连接状态
4. **清理资源**:完成后始终调用清理方法
5. **全面测试**:使用内存传输进行完整测试
6. **模式验证**:确保Pydantic与MCP JSON Schema之间的模式兼容
7. **连接管理**:生产部署使用连接选项确保可靠性
8. **正确使用异步/等待**:后台服务任务使用`asyncio.create_task()`并妥善处理`CancelledError`
## 故障排除
### 常见问题
1. **连接超时**:增加传输配置中的超时值
2. **模式转换错误**:确保Pydantic模型使用支持的类型
3. **工具未找到**:检查工具注册和命名
4. **传输错误**:验证传输配置和服务可用性
5. **内存泄漏**:始终清理连接和事件监听器
6. **CancelledError**:在异步上下文中正确处理任务取消
### 调试模式
启用日志进行调试:
```python
server = AGKitMCPServer({
'name': 'debug-server',
'version': '1.0.0',
'log_level': 'debug' # 启用调试日志
})
示例仓库
完整工作示例请访问:- mcp_basic.py - 基础MCP集成示例
- mcp_transports.py - 传输协议示例