跳转到主要内容

MCP 集成

AG-Kit Python 的 MCP 集成提供了 AG-Kit 工具系统与模型上下文协议(MCP)之间的无缝双向转换。通过该功能,您可以:
  • 连接标准 MCP 服务器,并将其工具作为 AG-Kit BaseTool 实例使用
  • 将 AG-Kit 工具暴露为 MCP 服务器,供外部客户端调用
  • 支持多种传输协议,包括 stdio、HTTP、SSE、内存通信等

概述

MCP 集成包含以下核心组件:
  • MCPClientTool:封装外部 MCP 工具以适配 AG-Kit 环境
  • MCPToolkit:管理 MCP 工具的高级工具包
  • AGKitMCPServer:将 AG-Kit 工具暴露为标准 MCP 服务器
  • MCPClientManager:管理多个 MCP 服务器的连接

快速入门

在 AG-Kit 中使用外部 MCP 工具

from ag_kit_py.tools.mcp import MCPClientManager, MCPClientTool

# 创建客户端管理器
client_manager = MCPClientManager()

# 连接外部 MCP 服务器
await client_manager.add_server('math-server', {
    'name': 'math-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'python',
        'args': ['-m', 'math_mcp_server']
    }
})

# 从 MCP 工具创建 AG-Kit 工具
client_tools = client_manager.create_client_tools('math-server')

# 在 AG-Kit 应用中使用工具
for tool in client_tools:
    result = await tool.invoke({'a': 5, 'b': 3})
    print(result.data)

将 AG-Kit 工具暴露为 MCP 服务器

from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyCustomTool

# 创建 AG-Kit 工具
calculator_tool = MyCustomTool()

# 创建 MCP 服务器
server = AGKitMCPServer({
    'name': 'ag-kit-mcp-server',
    'version': '1.0.0',
    'description': '通过 MCP 暴露的 AG-Kit 工具'
})

# 注册工具
server.register_tool(calculator_tool)

# 使用 stdio 传输启动服务器
await server.run({'type': 'stdio'})

传输协议

Stdio 传输

最常用的 MCP 服务器传输方式,使用标准输入/输出。

服务器配置

# 使用 stdio 启动服务器
await server.run({'type': 'stdio'})

客户端配置

await client_manager.add_server('my-server', {
    'name': 'my-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'python',
        'args': ['-m', 'my_mcp_server'],
        'timeout': 10000
    }
})

HTTP 传输 (StreamableHTTP)

适用于基于 Web 的 MCP 服务器和客户端。

服务器配置

from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer

app = FastAPI()

server = AGKitMCPServer({
    'name': 'http-server',
    'version': '1.0.0'
})

async def http_setup(server_instance, create_transport):
    @app.post('/mcp')
    async def handle_mcp(request: Request):
        transport = await create_transport({
            'enable_json_response': True,
            'session_id_generator': lambda: str(uuid.uuid4())
        })
        
        body = await request.json()
        return await transport.handle_request(request, body)

await server.run({
    'type': 'streamableHttp',
    'streamable_http_setup': http_setup
})

# 启动 FastAPI 服务器
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=3000)

客户端配置

await client_manager.add_server('http-server', {
    'name': 'http-client',
    'version': '1.0.0',
    'transport': {
        'type': 'streamableHttp',
        'url': 'http://localhost:3000/mcp',
        'timeout': 15000
    }
})

SSE 传输

服务器发送事件传输,用于实时通信。

服务器配置

from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer

app = FastAPI()
transport = None

async def sse_setup(server_instance, create_transport_func):
    global transport
    
    @app.get('/mcp/sse')
    async def handle_sse(request: Request):
        nonlocal transport
        transport = await create_transport_func('/mcp/sse', request, {
            'enable_dns_rebinding_protection': False
        })
    
    @app.post('/mcp/sse')
    async def handle_post(request: Request):
        if transport:
            body = await request.json()
            return await transport.handle_post_message(request, body)
        else:
            return {'error': '无活跃的 SSE 连接'}

await server.run({
    'type': 'sse',
    'sse_setup': sse_setup
})

客户端配置

await client_manager.add_server('sse-server', {
    'name': 'sse-client',
    'version': '1.0.0',
    'transport': {
        'type': 'sse',
        'url': 'http://localhost:3000/mcp/sse'
    }
})

内存传输

用于测试和同进程通信。

服务器配置

memory_id = 'my-memory-server'

await server.run({
    'type': 'memory',
    'memory_id': memory_id
})

客户端配置

await client_manager.add_server('memory-server', {
    'name': 'memory-client',
    'version': '1.0.0',
    'transport': {
        'type': 'memory',
        'memory_id': 'my-memory-server'
    }
})

核心示例

示例 1:创建并运行 MCP 服务器

将 AG-Kit 工具暴露为标准 MCP 服务器:
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyExistingTool

# 创建 MCP 服务器实例
server = AGKitMCPServer({
    'name': 'my-agkit-server',
    'version': '1.0.0',
    'description': '通过 MCP 暴露的 AG-Kit 工具'
})

# 注册现有 AG-Kit 工具
my_tool = MyExistingTool()
server.register_tool(my_tool)

# 使用共享配置注册多个工具
server.register_tools([tool1, tool2, tool3], {
    'name_prefix': 'agkit_'  # 所有工具名称将添加 'agkit_' 前缀
})

# 使用不同传输协议启动服务器
# 选项 1:Stdio(最常用)
await server.run({'type': 'stdio'})

# 选项 2:HTTP
await server.run({
    'type
# 清理
await client_manager.disconnect_all()
await server.stop()
server_task.cancel()
try:
    await server_task
except asyncio.CancelledError:
    pass

示例5:高级连接管理

处理连接选项和错误场景:
from ag_kit_py.tools.mcp import MCPClientManager

client_manager = MCPClientManager()

# 添加带连接选项的服务器
await client_manager.add_server('reliable-server', {
    'name': 'reliable-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'external-server',
        'timeout': 10000
    }
}, {
    'auto_reconnect': True,
    'reconnect_delay': 5000,
    'max_reconnect_attempts': 3,
    'heartbeat_interval': 30000
})

# 监控连接状态
import time
while True:
    is_connected = client_manager.is_server_connected('reliable-server')
    print(f'服务器连接状态: {is_connected}')
    await asyncio.sleep(5)

示例6:事件处理与监控

通过事件监听器监控MCP操作:
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager

# 服务器事件监控
server = AGKitMCPServer({
    'name': 'monitored-server',
    'version': '1.0.0'
})

def server_event_handler(event):
    event_type = event.type
    if event_type == 'connected':
        print(f"客户端已连接: {event.data.get('transport')}")
    elif event_type == 'tool_called':
        print(f"工具 {event.data.get('tool_name')} 被调用")
    elif event_type == 'tool_result':
        print(f"工具返回结果: {event.data.get('result')}")
    elif event_type == 'error':
        print(f"服务器错误: {event.data.get('error')}")

server.add_event_listener(server_event_handler)

# 客户端事件监控
client_manager = MCPClientManager()

def client_event_handler(event):
    event_type = event.type
    if event_type == 'connected':
        print(f"已连接至服务器: {event.data.get('server_id')}")
    elif event_type == 'tool_discovered':
        print(f"发现工具: {event.data.get('tool_name')}")
    elif event_type == 'disconnected':
        print(f"连接断开: {event.data.get('reason', '未知原因')}")
    elif event_type == 'error':
        print(f"客户端错误: {event.data.get('error')}")

client_manager.add_event_listener(client_event_handler)

高级配置

工具配置

自定义AG-Kit工具通过MCP的暴露方式:
# 注册带自定义配置的工具
server.register_tool(calculator_tool, {
    'name_prefix': 'math_',             # 工具名前缀
    'description': '自定义计算器'  # 覆盖描述
})

# 批量注册工具并共享配置
server.register_tools([tool1, tool2, tool3], {
    'name_prefix': 'utils_'
})

客户端工具配置

自定义MCP客户端工具:
from ag_kit_py.tools.mcp import MCPClientTool
from ag_kit_py.tools import ToolResult

def transform_input(input_data):
    return {**input_data, 'timestamp': time.time()}

def transform_output(output_data):
    return {'result': output_data, 'processed': True}

def error_handler(error):
    return ToolResult(
        success=False,
        error=f'自定义错误: {str(error)}'
    )

client_tool = MCPClientTool(
    mcp_client=mcp_client,
    mcp_tool_metadata=tool_metadata,
    config={
        'name': 'custom_tool_name',  # 自定义AG-Kit名称
        'timeout': 30000,            # 调用超时时间
        'retries': 3                 # 重试次数
    },
    adapter_config={
        'include_metadata': True,      # 包含执行元数据
        'transform_input': transform_input,
        'transform_output': transform_output,
        'error_handler': error_handler
    }
)

连接管理

高级连接选项:
await client_manager.add_server('reliable-server', config, {
    'auto_reconnect': True,           # 失败时自动重连
    'reconnect_delay': 5000,          # 重连间隔时间
    'max_reconnect_attempts': 5,      # 最大重连尝试次数
    'heartbeat_interval': 30000       # 心跳间隔
})

事件处理

监控MCP操作:
# 服务器事件
def handle_server_event(event):
    if event.type == 'connected':
        print(f"客户端已连接")
    elif event.type == 'tool_called':
        print(f"工具被调用: {event.data['tool_name']}")
    elif event.type == 'error':
        print(f"错误: {event.data['error']}")

server.add_event_listener(handle_server_event)

# 客户端事件
def handle_client_event(event):
    if event.type == 'connected':
        print(f"已连接至服务器")
    elif event.type == 'tool_discovered':
        print(f"发现工具: {event.data['tool_name']}")
    elif event.type == 'disconnected':
        print(f"连接断开")

client_manager.add_event_listener(handle_client_event)

模式转换

AG-Kit自动在Pydantic模型和MCP JSON模式间转换:

Pydantic转MCP模式

from pydantic import BaseModel, Field
from ag_kit_py.tools.mcp.utils import pydantic_to_json_schema

class UserInput(BaseModel):
    name: str = Field(description='用户名')
    age: int = Field(ge=0, le=150)
    email: str | None = None
    tags: list[str]
    preferences: dict[str, Any]

mcp_schema = pydantic_to_json_schema(UserInput)
# 生成MCP兼容的JSON模式

MCP转Pydantic模式

# MCPClientTool自动处理MCP模式
mcp_tool_metadata = {
    'name': 'user_tool',
    'inputSchema': {
        'type': 'object',
        'properties': {
            'name': {'type': 'string'},
            'age': {'type': 'number'}
        },
        'required': ['name', 'age']
    }
}

client_tool = MCPClientTool(client, mcp_tool_metadata)
# 工具现在具有AG-Kit兼容的模式

错误处理

服务器错误处理

from ag_kit_py.tools import BaseTool, ToolResult

class SafeTool(BaseTool):
    async def _invoke(self, input_data):
        try:
            # 工具逻辑
            return ToolResult(success=True, data=result)
        except Exception as error:
            return ToolResult(
                success=False,
                error=f'工具执行失败: {str(error)}'
            )

server = AGKitMCPServer({
    'name': 'error-handling-server',
    'version': '1.0.0',
    'error_handling': 'return_error',  # 或 'throw'
})

客户端错误处理

def custom_error_handler(error):
    if 'timeout' in str(error):
        return ToolResult(
            success=False,
            error='操作超时,请重试'
        )
    
    return ToolResult(
        success=False,
        error=f'MCP工具错误: {str(error)}'
    )

client_tool = MCPClientTool(
    mcp_client=client,
    mcp_tool_metadata=metadata,
    config={'retries':
**关键要求:**

1. **使用适当的传输方式**:CLI工具使用stdio,Web服务使用HTTP,测试使用内存传输
2. **优雅处理错误**:实现正确的错误处理和重试逻辑
3. **监控连接状态**:使用事件监听器跟踪连接状态
4. **清理资源**:完成后始终调用清理方法
5. **全面测试**:使用内存传输进行完整测试
6. **模式验证**:确保Pydantic与MCP JSON Schema之间的模式兼容
7. **连接管理**:生产部署使用连接选项确保可靠性
8. **正确使用异步/等待**:后台服务任务使用`asyncio.create_task()`并妥善处理`CancelledError`

## 故障排除

### 常见问题

1. **连接超时**:增加传输配置中的超时值
2. **模式转换错误**:确保Pydantic模型使用支持的类型
3. **工具未找到**:检查工具注册和命名
4. **传输错误**:验证传输配置和服务可用性
5. **内存泄漏**:始终清理连接和事件监听器
6. **CancelledError**:在异步上下文中正确处理任务取消

### 调试模式

启用日志进行调试:

```python
server = AGKitMCPServer({
    'name': 'debug-server',
    'version': '1.0.0',
    'log_level': 'debug'  # 启用调试日志
})

示例仓库

完整工作示例请访问:

相关文档