Skip to main content
Learn how to integrate tools with AG-Kit agents to create powerful AI workflows that can interact with the external world through a comprehensive toolkit.

Basic Agent Integration

Simple Tool Integration

The most basic form of agent integration involves creating a custom tool and providing it to an agent. This allows the agent to perform specific operations beyond its base language capabilities.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool as langchain_tool
from pydantic import BaseModel
import os

# Define tool schema
class CalculatorInput(BaseModel):
    operation: str
    a: float
    b: float

# Create a simple tool
@langchain_tool
def calculator(operation: str, a: float, b: float) -> dict:
    """Perform basic mathematical calculations"""
    if operation == 'divide' and b == 0:
        raise ValueError('Division by zero')
    
    operations = {'add': a + b, 'subtract': a - b, 'multiply': a * b, 'divide': a / b}
    return {'result': operations[operation], 'operation': operation, 'operands': [a, b]}

# Create model with tools
model = ChatOpenAI(model='gpt-4', api_key=os.getenv('OPENAI_API_KEY'))
model_with_tools = model.bind_tools([calculator])

# Create LangGraph workflow
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
    'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)

graph = workflow.compile()
Key components of agent integration:
  • Model Provider: Configure the LLM provider (OpenAI, Anthropic, etc.)
  • Tool Definition: Create tools with clear schemas and error handling
  • Agent Configuration: Set up the agent with instructions, tools, and model settings
  • Execution: Run the agent with natural language inputs

Multiple Tool Integration

For more complex workflows, you can integrate multiple tools and toolkits with a single agent. This creates powerful assistants capable of handling diverse tasks across different domains.
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from agkit.tools import (
    FilesystemToolkit,
    BuiltInCodeExecutor,
    create_bash_tool,
    LocalFileOperator
)
from agkit.tools.adapters import AGKitTool
import os

# Set up file system context
fs_context = {
    'working_directory': os.getcwd(),
    'fs_operator': LocalFileOperator()
}

# Create filesystem toolkit
filesystem_toolkit = FilesystemToolkit(
    name='filesystem-toolkit',
    context=fs_context
)

# Convert AG-Kit tools to LangChain tools
agkit_tools = [
    *filesystem_toolkit.get_tools(),
    BuiltInCodeExecutor(),
    create_bash_tool(fs_context),
    calculator_tool
]
tools = [AGKitTool(tool=t).as_langchain_tool() for t in agkit_tools]

# Create model with tools
model = ChatOpenAI(model='gpt-4')
model_with_tools = model.bind_tools(tools)

# Create LangGraph workflow
workflow = StateGraph()
workflow.add_node('agent', lambda state: {
    'messages': [model_with_tools.invoke(state['messages'])]
})
workflow.add_edge(START, 'agent')
workflow.add_edge('agent', END)

graph = workflow.compile()
This approach allows agents to:
  • Read and write files using the filesystem toolkit
  • Execute code in secure sandboxed environments
  • Run command line operations for system interactions
  • Perform calculations with custom tools

Advanced Integration Patterns

Intelligent Tool Selection

Advanced agents can be configured with multiple specialized tools and intelligent selection strategies. The agent will automatically choose the most appropriate tool based on the task context and user requirements.
# Create specialized tools for different data formats
data_processing_tools = [
    tool(
        lambda file_path, operation: {
            'success': True, 
            'data': {'processed': True, 'operation': operation}
        },
        name='csv_processor',
        description='Process CSV data files and perform data analysis',
        schema=CSVProcessorInput
    ),
    tool(
        lambda url, method='GET', headers=None: {
            'success': True, 
            'data': {'status': 200, 'response': 'API response'}
        },
        name='http_client',
        description='Make HTTP requests to external APIs',
        schema=HTTPClientInput
    )
]

# Create agent with multiple specialized tools
smart_agent = # Agent initialization
    # ...
    tools=[*data_processing_tools, *filesystem_toolkit.get_tools()],
    # ...
Benefits of intelligent tool selection:
  • Automatic tool routing based on task requirements
  • Contextual decision making for optimal tool usage
  • Extensible architecture for adding new specialized tools
  • Clear reasoning about tool selection choices

Error Handling and Resilience

Graceful Error Recovery

Building resilient agents that can handle tool failures gracefully is crucial for production applications. AG-Kit provides several mechanisms for error handling and recovery.
# Create resilient agent with error handling
resilient_agent = # Agent initialization
    # ...
    tools=[
        *filesystem_toolkit.get_tools(),
        BuiltInCodeExecutor()
    ],
    # control_flow={'error_retry_limit': 2, 'max_steps': 10}

# Handle errors in usage
try:
    response = await resilient_agent.run('Process this complex request')
    print('Success:', response)
except Exception as error:
    print(f'Agent error: {error}')
    # Implement fallback strategies or user notification

Input Validation and Security

Implementing proper input validation and security measures in tools is essential for safe agent operations, especially when dealing with file systems or external APIs.
async def validated_file_processor(file_path: str, operation: str):
    try:
        # Pre-execution validation
        if not os.path.exists(file_path):
            return {
                'success': False,
                'error': 'File does not exist',
                'error_type': 'validation'
            }
        
        stats = os.stat(file_path)
        if stats.st_size > 10 * 1024 * 1024:  # 10MB limit
            return {
                'success': False,
                'error': 'File too large (max 10MB)',
                'error_type': 'validation'
            }
        
        # Execute operation with proper error handling
        operations = {
            'read': lambda: {'content': open(file_path).read()},
            'analyze': lambda: {
                'lines': len(open(file_path).readlines()),
                'characters': len(open(file_path).read()),
                'words': len(open(file_path).read().split())
            },
            'transform': lambda: {'message': 'Transform operation completed'}
        }
        
        result = operations[operation]()
        
        # Post-execution validation
        if not result or not isinstance(result, dict):
            return {
                'success': False,
                'error': 'Invalid processing result',
                'error_type': 'execution'
            }
        
        return {'success': True, 'data': result}
        
    except Exception as error:
        return {
            'success': False,
            'error': str(error),
            'error_type': 'execution'
        }

validated_tool = tool(
    validated_file_processor,
    name='validated_file_processor',
    description='Process files with comprehensive validation and security checks',
    schema=ValidatedFileProcessorInput
)

# Use validated tool with agent
secure_agent = # Agent initialization
    # ...
    tools=[validated_tool],
    # ...
Key validation practices:
  • Input sanitization to prevent path traversal attacks
  • File size limits to prevent resource exhaustion
  • Pre and post-execution validation for data integrity
  • Structured error responses with appropriate error types

Streaming and Real-time Integration

Streaming Tool Responses

For long-running operations or real-time data processing, AG-Kit supports streaming responses that provide progress updates and intermediate results to users.
import asyncio
from typing import AsyncIterator

# Create streaming tool for long-running operations
async def streaming_processor(dataset: str, batch_size: int = 100):
    async def stream_progress():
        for progress in [25, 50, 75, 100]:
            await asyncio.sleep(1)
            yield {
                'progress': progress,
                'message': f'Processing batch {progress // 25}',
                'timestamp': datetime.now().isoformat()
            }
    
    return {
        'success': True,
        'stream': stream_progress(),
        'data': {
            'message': 'Streaming processing started',
            'batch_size': batch_size,
            'estimated_duration': '4 seconds'
        }
    }

streaming_tool = tool(
    streaming_processor,
    name='streaming_processor',
    description='Process large datasets with real-time progress updates',
    schema=StreamingProcessorInput
)

# Create streaming agent
streaming_agent = # Agent initialization
    # ...
    tools=[streaming_tool],
    # ...

# Usage with streaming enabled
response = await streaming_agent.run(
    'Process the large dataset with real-time progress updates',
    # state, options
)

# Handle streaming response
if hasattr(response, 'stream'):
    async for chunk in response.stream:
        print(f'Streaming chunk: {chunk}')
        # Update UI with progress information
Streaming capabilities enable:
  • Real-time progress tracking for long-running operations
  • Intermediate result delivery for better user experience
  • Responsive UI updates during processing
  • Early error detection and handling

Performance Optimization

Parallel Tool Execution

AG-Kit agents can automatically execute independent tool calls in parallel, significantly improving performance for operations that don’t depend on each other.
# Create agent with parallel execution capabilities
parallel_agent = # Agent initialization
    # ...
    tools=[
        calculator_tool,
        *filesystem_toolkit.get_tools()
    ],
    # ...

# The agent will automatically execute independent tool calls in parallel
response = await parallel_agent.run(
    'Calculate 15 * 23, read the config.json file, and check if package.json exists'
)
Performance benefits:
  • Reduced execution time for independent operations
  • Better resource utilization through concurrent processing
  • Improved user experience with faster response times
  • Automatic optimization without manual coordination

Next Steps