Skip to main content

MCP Integration

The AG-Kit Python MCP integration provides seamless bidirectional conversion between AG-Kit’s tool system and the Model Context Protocol (MCP). This allows you to:
  • Connect to standard MCP servers and use their tools as AG-Kit BaseTool instances
  • Expose AG-Kit tools as MCP servers for external clients to consume
  • Support multiple transport protocols including stdio, HTTP, SSE, in-memory etc.

Overview

The MCP integration consists of several key components:
  • MCPClientTool: Wraps external MCP tools to work within AG-Kit
  • MCPToolkit: High-level toolkit for managing MCP tools
  • AGKitMCPServer: Exposes AG-Kit tools as a standard MCP server
  • MCPClientManager: Manages connections to multiple MCP servers

Quick Start

Using External MCP Tools in AG-Kit

from ag_kit_py.tools.mcp import MCPClientManager, MCPClientTool

# Create a client manager
client_manager = MCPClientManager()

# Connect to an external MCP server
await client_manager.add_server('math-server', {
    'name': 'math-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'python',
        'args': ['-m', 'math_mcp_server']
    }
})

# Create AG-Kit tools from MCP tools
client_tools = client_manager.create_client_tools('math-server')

# Use the tools in your AG-Kit application
for tool in client_tools:
    result = await tool.invoke({'a': 5, 'b': 3})
    print(result.data)

Exposing AG-Kit Tools as MCP Server

from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyCustomTool

# Create your AG-Kit tools
calculator_tool = MyCustomTool()

# Create MCP server
server = AGKitMCPServer({
    'name': 'ag-kit-mcp-server',
    'version': '1.0.0',
    'description': 'AG-Kit tools exposed via MCP'
})

# Register tools
server.register_tool(calculator_tool)

# Start server with stdio transport
await server.run({'type': 'stdio'})

Transport Protocols

Stdio Transport

The most common transport for MCP servers, using standard input/output.

Server Configuration

# Start server with stdio
await server.run({'type': 'stdio'})

Client Configuration

await client_manager.add_server('my-server', {
    'name': 'my-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'python',
        'args': ['-m', 'my_mcp_server'],
        'timeout': 10000
    }
})

HTTP Transport (StreamableHTTP)

For web-based MCP servers and clients.

Server Configuration

from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer

app = FastAPI()

server = AGKitMCPServer({
    'name': 'http-server',
    'version': '1.0.0'
})

async def http_setup(server_instance, create_transport):
    @app.post('/mcp')
    async def handle_mcp(request: Request):
        transport = await create_transport({
            'enable_json_response': True,
            'session_id_generator': lambda: str(uuid.uuid4())
        })
        
        body = await request.json()
        return await transport.handle_request(request, body)

await server.run({
    'type': 'streamableHttp',
    'streamable_http_setup': http_setup
})

# Start FastAPI server
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=3000)

Client Configuration

await client_manager.add_server('http-server', {
    'name': 'http-client',
    'version': '1.0.0',
    'transport': {
        'type': 'streamableHttp',
        'url': 'http://localhost:3000/mcp',
        'timeout': 15000
    }
})

SSE Transport

Server-Sent Events transport for real-time communication.

Server Configuration

from fastapi import FastAPI
from ag_kit_py.tools.mcp import AGKitMCPServer

app = FastAPI()
transport = None

async def sse_setup(server_instance, create_transport_func):
    global transport
    
    @app.get('/mcp/sse')
    async def handle_sse(request: Request):
        nonlocal transport
        transport = await create_transport_func('/mcp/sse', request, {
            'enable_dns_rebinding_protection': False
        })
    
    @app.post('/mcp/sse')
    async def handle_post(request: Request):
        if transport:
            body = await request.json()
            return await transport.handle_post_message(request, body)
        else:
            return {'error': 'No active SSE connection'}

await server.run({
    'type': 'sse',
    'sse_setup': sse_setup
})

Client Configuration

await client_manager.add_server('sse-server', {
    'name': 'sse-client',
    'version': '1.0.0',
    'transport': {
        'type': 'sse',
        'url': 'http://localhost:3000/mcp/sse'
    }
})

In-Memory Transport

For testing and same-process communication.

Server Configuration

memory_id = 'my-memory-server'

await server.run({
    'type': 'memory',
    'memory_id': memory_id
})

Client Configuration

await client_manager.add_server('memory-server', {
    'name': 'memory-client',
    'version': '1.0.0',
    'transport': {
        'type': 'memory',
        'memory_id': 'my-memory-server'
    }
})

Core Examples

Example 1: Creating and Running an MCP Server

Expose AG-Kit tools as a standard MCP server:
from ag_kit_py.tools.mcp import AGKitMCPServer
from my_tools import MyExistingTool

# Create MCP server instance
server = AGKitMCPServer({
    'name': 'my-agkit-server',
    'version': '1.0.0',
    'description': 'AG-Kit tools exposed via MCP'
})

# Register existing AG-Kit tools
my_tool = MyExistingTool()
server.register_tool(my_tool)

# Register multiple tools with shared configuration
server.register_tools([tool1, tool2, tool3], {
    'name_prefix': 'agkit_'  # All tools will be prefixed with 'agkit_'
})

# Start server with different transports
# Option 1: Stdio (most common)
await server.run({'type': 'stdio'})

# Option 2: HTTP
await server.run({
    'type': 'streamableHttp',
    'streamable_http_setup': async_http_setup_function
})

# Option 3: Memory (for testing)
await server.run({'type': 'memory', 'memory_id': 'test-server'})

Example 2: Connecting to External MCP Servers

Use external MCP servers as AG-Kit tools:
from ag_kit_py.tools.mcp import MCPClientManager

# Create client manager
client_manager = MCPClientManager()

# Connect to different types of MCP servers
# Stdio server
await client_manager.add_server('filesystem-server', {
    'name': 'fs-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'npx',
        'args': ['@modelcontextprotocol/server-filesystem', './workspace']
    }
})

# HTTP server
await client_manager.add_server('api-server', {
    'name': 'api-client',
    'version': '1.0.0',
    'transport': {
        'type': 'streamableHttp',
        'url': 'https://api.example.com/mcp'
    }
})

# Create AG-Kit tools from MCP tools
all_client_tools = client_manager.create_client_tools()

# Create specific tool with custom name
specific_tool = client_manager.create_client_tool(
    'filesystem-server',
    'read_file',
    'custom_file_reader'
)

# Use the tools in AG-Kit
result = await specific_tool.invoke({'path': 'README.md'})
print(result.data)

Example 3: Using MCPToolkit for Simplified Management

High-level toolkit approach for managing multiple MCP connections:
from ag_kit_py.tools.mcp import MCPToolkit, create_mcp_toolkit

# Method 1: Create toolkit and add servers dynamically
toolkit = MCPToolkit('my-toolkit')

await toolkit.add_server('filesystem', {
    'name': 'filesystem',
    'version': '1.0.0',
    'transport': {'type': 'stdio', 'command': 'fs-server'}
})

await toolkit.add_server('database', {
    'name': 'db-client',
    'version': '1.0.0',
    'transport': {'type': 'streamableHttp', 'url': 'http://db-server/mcp'}
})

# Method 2: Create toolkit with pre-configured servers
toolkit2 = await create_mcp_toolkit([
    {
        'id': 'filesystem',
        'config': {
            'name': 'fs-client',
            'version': '1.0.0',
            'transport': {'type': 'stdio', 'command': 'fs-server'}
        }
    },
    {
        'id': 'database',
        'config': {
            'name': 'db-client',
            'version': '1.0.0',
            'transport': {'type': 'streamableHttp', 'url': 'http://db-server/mcp'}
        }
    }
], 'multi-server-toolkit')

# Access tools by server
fs_tools = toolkit.get_server_tools('filesystem')
db_tools = toolkit.get_server_tools('database')

# Access all tools
all_tools = toolkit.get_tools()

# Refresh connections
await toolkit.refresh()

# Cleanup when done
await toolkit.destroy()

Example 4: Memory Transport for Testing

Use memory transport for testing MCP integrations:
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager
import asyncio

# Test setup
memory_id = 'test-memory-transport'
server_info = {
    'name': 'test-server',
    'version': '1.0.0'
}

# Create server
server = AGKitMCPServer(server_info)
server.register_tool(MyTestTool())

# Start server in background
server_task = asyncio.create_task(server.run({'type': 'memory', 'memory_id': memory_id}))
await asyncio.sleep(0.1)  # Give server time to start

# Create client
client_manager = MCPClientManager()
await client_manager.add_server(server_info['name'], {
    'name': 'test-client',
    'version': '1.0.0',
    'transport': {'type': 'memory', 'memory_id': memory_id}
})

# Test tool execution
tools = client_manager.create_client_tools()
test_tool = tools[0]
result = await test_tool.invoke({'test': 'data'})

# Cleanup
await client_manager.disconnect_all()
await server.stop()
server_task.cancel()
try:
    await server_task
except asyncio.CancelledError:
    pass

Example 5: Advanced Connection Management

Handle connection options and error scenarios:
from ag_kit_py.tools.mcp import MCPClientManager

client_manager = MCPClientManager()

# Add server with connection options
await client_manager.add_server('reliable-server', {
    'name': 'reliable-client',
    'version': '1.0.0',
    'transport': {
        'type': 'stdio',
        'command': 'external-server',
        'timeout': 10000
    }
}, {
    'auto_reconnect': True,
    'reconnect_delay': 5000,
    'max_reconnect_attempts': 3,
    'heartbeat_interval': 30000
})

# Monitor connection status
import time
while True:
    is_connected = client_manager.is_server_connected('reliable-server')
    print(f'Server connected: {is_connected}')
    await asyncio.sleep(5)

Example 6: Event Handling and Monitoring

Monitor MCP operations with event listeners:
from ag_kit_py.tools.mcp import AGKitMCPServer, MCPClientManager

# Server event monitoring
server = AGKitMCPServer({
    'name': 'monitored-server',
    'version': '1.0.0'
})

def server_event_handler(event):
    event_type = event.type
    if event_type == 'connected':
        print(f"Client connected: {event.data.get('transport')}")
    elif event_type == 'tool_called':
        print(f"Tool {event.data.get('tool_name')} called")
    elif event_type == 'tool_result':
        print(f"Tool returned: {event.data.get('result')}")
    elif event_type == 'error':
        print(f"Server error: {event.data.get('error')}")

server.add_event_listener(server_event_handler)

# Client event monitoring
client_manager = MCPClientManager()

def client_event_handler(event):
    event_type = event.type
    if event_type == 'connected':
        print(f"Connected to server: {event.data.get('server_id')}")
    elif event_type == 'tool_discovered':
        print(f"Discovered tool: {event.data.get('tool_name')}")
    elif event_type == 'disconnected':
        print(f"Disconnected: {event.data.get('reason', 'Unknown')}")
    elif event_type == 'error':
        print(f"Client error: {event.data.get('error')}")

client_manager.add_event_listener(client_event_handler)

Advanced Configuration

Tool Configuration

Customize how AG-Kit tools are exposed via MCP:
# Register tool with custom configuration
server.register_tool(calculator_tool, {
    'name_prefix': 'math_',             # Prefix tool name
    'description': 'Custom calculator'  # Override description
})

# Register multiple tools with shared config
server.register_tools([tool1, tool2, tool3], {
    'name_prefix': 'utils_'
})

Client Tool Configuration

Customize MCP client tools:
from ag_kit_py.tools.mcp import MCPClientTool
from ag_kit_py.tools import ToolResult

def transform_input(input_data):
    return {**input_data, 'timestamp': time.time()}

def transform_output(output_data):
    return {'result': output_data, 'processed': True}

def error_handler(error):
    return ToolResult(
        success=False,
        error=f'Custom error: {str(error)}'
    )

client_tool = MCPClientTool(
    mcp_client=mcp_client,
    mcp_tool_metadata=tool_metadata,
    config={
        'name': 'custom_tool_name',  # Custom AG-Kit name
        'timeout': 30000,            # Call timeout
        'retries': 3                 # Retry attempts
    },
    adapter_config={
        'include_metadata': True,      # Include execution metadata
        'transform_input': transform_input,
        'transform_output': transform_output,
        'error_handler': error_handler
    }
)

Connection Management

Advanced connection options:
await client_manager.add_server('reliable-server', config, {
    'auto_reconnect': True,           # Auto-reconnect on failure
    'reconnect_delay': 5000,          # Delay between reconnect attempts
    'max_reconnect_attempts': 5,      # Maximum reconnect attempts
    'heartbeat_interval': 30000       # Heartbeat interval
})

Event Handling

Monitor MCP operations:
# Server events
def handle_server_event(event):
    if event.type == 'connected':
        print(f"Client connected")
    elif event.type == 'tool_called':
        print(f"Tool called: {event.data['tool_name']}")
    elif event.type == 'error':
        print(f"Error: {event.data['error']}")

server.add_event_listener(handle_server_event)

# Client events
def handle_client_event(event):
    if event.type == 'connected':
        print(f"Connected to server")
    elif event.type == 'tool_discovered':
        print(f"Tool discovered: {event.data['tool_name']}")
    elif event.type == 'disconnected':
        print(f"Disconnected")

client_manager.add_event_listener(handle_client_event)

Schema Conversion

AG-Kit automatically converts between Pydantic models and MCP JSON schemas:

Pydantic to MCP Schema

from pydantic import BaseModel, Field
from ag_kit_py.tools.mcp.utils import pydantic_to_json_schema

class UserInput(BaseModel):
    name: str = Field(description='User name')
    age: int = Field(ge=0, le=150)
    email: str | None = None
    tags: list[str]
    preferences: dict[str, Any]

mcp_schema = pydantic_to_json_schema(UserInput)
# Results in MCP-compatible JSON schema

MCP to Pydantic Schema

# MCPClientTool automatically handles MCP schemas
mcp_tool_metadata = {
    'name': 'user_tool',
    'inputSchema': {
        'type': 'object',
        'properties': {
            'name': {'type': 'string'},
            'age': {'type': 'number'}
        },
        'required': ['name', 'age']
    }
}

client_tool = MCPClientTool(client, mcp_tool_metadata)
# Tool now has schema for AG-Kit compatibility

Error Handling

Server Error Handling

from ag_kit_py.tools import BaseTool, ToolResult

class SafeTool(BaseTool):
    async def _invoke(self, input_data):
        try:
            # Tool logic here
            return ToolResult(success=True, data=result)
        except Exception as error:
            return ToolResult(
                success=False,
                error=f'Tool execution failed: {str(error)}'
            )

server = AGKitMCPServer({
    'name': 'error-handling-server',
    'version': '1.0.0',
    'error_handling': 'return_error',  # or 'throw'
})

Client Error Handling

def custom_error_handler(error):
    if 'timeout' in str(error):
        return ToolResult(
            success=False,
            error='Operation timed out, please try again'
        )
    
    return ToolResult(
        success=False,
        error=f'MCP tool error: {str(error)}'
    )

client_tool = MCPClientTool(
    mcp_client=client,
    mcp_tool_metadata=metadata,
    config={'retries': 3, 'timeout': 10000},
    adapter_config={'error_handler': custom_error_handler}
)

API Reference

For complete API documentation including all interfaces, types, and detailed method signatures, see the MCP API Reference. AGKitMCPServer - Exposes AG-Kit tools as a standard MCP server
  • register_tool(tool: BaseTool, config: dict = None) -> dict - Register a single tool
  • register_tools(tools: list[BaseTool], config: dict = None) -> list[dict] - Register multiple tools
  • unregister_tool(name: str) -> bool - Remove a tool from the server
  • run(transport_config: dict) -> None - Start the server with specified transport
  • stop() -> None - Stop the server and cleanup resources
  • call_tool(name: str, args: dict) -> dict - Execute a tool directly
  • list_tools() -> list[dict] - Get list of all registered tools
  • is_server_running() -> bool - Check if server is running
  • get_stats() -> dict - Get server statistics
MCPClientManager - Manages connections to external MCP servers
  • add_server(server_id: str, config: dict, options: dict = None) -> None - Connect to an MCP server
  • disconnect_server(server_id: str) -> None - Disconnect from a specific server
  • disconnect_all() -> None - Disconnect from all servers
  • create_client_tools(server_id: str = None) -> list[MCPClientTool] - Create AG-Kit tool wrappers
  • create_client_tool(server_id: str, tool_name: str, agkit_tool_name: str = None) -> MCPClientTool - Create specific tool wrapper
  • call_tool(server_id: str, tool_name: str, args: Any) -> Any - Call an MCP tool directly
  • is_server_connected(server_id: str) -> bool - Check connection status
  • get_stats() -> dict - Get client manager statistics
MCPClientTool - Wraps external MCP tools for AG-Kit compatibility
  • invoke(input_data: Any, context: dict = None) -> ToolResult - Execute the MCP tool
  • get_mcp_metadata() -> dict - Get original MCP tool metadata
  • is_connected() -> bool - Check if underlying client is connected
  • update_config(new_config: dict) -> None - Update tool configuration
MCPToolkit - High-level toolkit for managing MCP tools
  • add_server(server_id: str, config: dict) -> None - Add an MCP server
  • remove_server(server_id: str) -> None - Remove a server
  • get_connected_servers() -> list[str] - Get list of connected servers
  • get_server_tools(server_id: str) -> list[MCPClientTool] - Get tools from specific server
  • refresh() -> None - Refresh all connections
  • destroy() -> None - Cleanup all resources
  • get_client_manager() -> MCPClientManager - Get underlying client manager
See the complete API documentation for detailed interface definitions, type information, utility functions, and advanced configuration options.

Best Practices

  1. Use appropriate transports: stdio for CLI tools, HTTP for web services, memory for testing
  2. Handle errors gracefully: Implement proper error handling and retry logic
  3. Monitor connections: Use event listeners to track connection status
  4. Clean up resources: Always call cleanup methods when done
  5. Test thoroughly: Use memory transport for comprehensive testing
  6. Schema validation: Ensure your schemas are compatible between Pydantic and MCP JSON Schema
  7. Connection management: Use connection options for reliable production deployments
  8. Async/await properly: Use asyncio.create_task() for background server tasks and handle CancelledError properly

Troubleshooting

Common Issues

  1. Connection timeouts: Increase timeout values in transport configuration
  2. Schema conversion errors: Ensure Pydantic models use supported types
  3. Tool not found: Check tool registration and naming
  4. Transport errors: Verify transport configuration and server availability
  5. Memory leaks: Always clean up connections and event listeners
  6. CancelledError: Handle task cancellation properly in async contexts

Debug Mode

Enable logging for debugging:
server = AGKitMCPServer({
    'name': 'debug-server',
    'version': '1.0.0',
    'log_level': 'debug'  # Enable debug logging
})

Examples Repository

Find complete working examples at: