from ag_kit_py.agents import BaseAgent
from ag_ui.core import RunAgentInput, BaseEvent, EventType
from typing import AsyncGenerator
class CustomAgent(BaseAgent):
"""Custom agent implementation."""
def __init__(self, name: str, description: str, custom_config: dict):
# Initialize your custom agent
agent_impl = create_custom_agent(custom_config)
super().__init__(name, description, agent_impl)
async def run(
self,
run_input: RunAgentInput
) -> AsyncGenerator[BaseEvent, None]:
"""Execute the custom agent."""
# Emit run started event
yield BaseEvent(
type=EventType.RUN_STARTED,
thread_id=run_input.thread_id,
run_id=run_input.run_id
)
try:
# Execute your agent logic
result = await self._agent.execute(run_input)
# Emit result events
yield BaseEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
raw_event={"data": {"chunk": {"content": result}}}
)
# Emit run finished event
yield BaseEvent(
type=EventType.RUN_FINISHED,
thread_id=run_input.thread_id,
run_id=run_input.run_id
)
except Exception as e:
# Emit error event
yield BaseEvent(
type=EventType.RUN_ERROR,
thread_id=run_input.thread_id,
run_id=run_input.run_id,
raw_event={"error": str(e)}
)
def destroy(self) -> None:
"""Clean up resources."""
# Implement cleanup logic
self._agent.cleanup()