Streaming Events
The AG-Kit Python SDK uses an event-driven architecture to enable real-time communication between agents and frontend applications through Server-Sent Events (SSE).Overview
The streaming event system provides:- Real-time Streaming: Server-Sent Events (SSE) for live updates
- Structured Events: Type-safe event models for different interactions
- Tool Execution: Streaming tool calls with arguments and results
- Error Handling: Comprehensive error event support
- Interrupt Support: Human-in-the-loop workflow events
Event Types
All events are defined as Pydantic models inag_kit_py.server.send_message.models.
TextEvent
Streams text content from the agent.Copy
from pydantic import BaseModel
from typing import Literal
class TextEvent(BaseModel):
"""Text streaming event model."""
type: Literal["text"] = "text"
content: str
Copy
{
"type": "text",
"content": "Hello! How can I help you today?"
}
ToolCallStartEvent
Indicates the start of a tool function call.Copy
class ToolCallStartEvent(BaseModel):
"""Tool call start event model."""
type: Literal["tool-call-start"] = "tool-call-start"
tool_call_id: str = Field(..., alias="toolCallId")
tool_call_name: str = Field(..., alias="toolCallName")
Copy
{
"type": "tool-call-start",
"toolCallId": "call_abc123",
"toolCallName": "web_search"
}
ToolCallArgsEvent
Streams tool function arguments incrementally.Copy
class ToolCallArgsEvent(BaseModel):
"""Tool call arguments streaming event model."""
type: Literal["tool-call-args"] = "tool-call-args"
tool_call_id: str = Field(..., alias="toolCallId")
delta: str
Copy
{
"type": "tool-call-args",
"toolCallId": "call_abc123",
"delta": "{\"query\": \"Python"
}
ToolCallEndEvent
Indicates the completion of a tool function call.Copy
class ToolCallEndEvent(BaseModel):
"""Tool call completion event model."""
type: Literal["tool-call-end"] = "tool-call-end"
tool_call_id: str = Field(..., alias="toolCallId")
Copy
{
"type": "tool-call-end",
"toolCallId": "call_abc123"
}
ToolCallResultEvent
Contains the result of a tool function execution.Copy
class ToolCallResultEvent(BaseModel):
"""Tool result streaming event model."""
type: Literal["tool-result"] = "tool-result"
tool_call_id: str = Field(..., alias="toolCallId")
result: str
Copy
{
"type": "tool-result",
"toolCallId": "call_abc123",
"result": "{\"results\": [\"Python Tutorial 1\", \"Python Guide 2\"]}"
}
InterruptEvent
Signals an interruption requiring user input (human-in-the-loop).Copy
from typing import Any, Optional
class InterruptEvent(BaseModel):
"""Interrupt event model."""
type: Literal["interrupt"] = "interrupt"
id: str
reason: Optional[str] = None
payload: Any
Copy
{
"type": "interrupt",
"id": "interrupt_123",
"reason": "User approval required for next steps",
"payload": {
"steps": [
{"description": "Delete file", "status": "pending"},
{"description": "Send email", "status": "pending"}
]
}
}
ErrorEvent
Indicates an error occurred during processing.Copy
class ErrorEvent(BaseModel):
"""Error event model."""
type: Literal["error"] = "error"
error: str
Copy
{
"type": "error",
"error": "Agent execution failed: Invalid tool parameters"
}
Event Flow
Simple Text Response
Copy
1. TextEvent (content: "Hello")
2. TextEvent (content: "! How")
3. TextEvent (content: " can I")
4. TextEvent (content: " help?")
5. [DONE]
Tool Call Flow
Copy
1. ToolCallStartEvent (toolCallId: "call_1", toolCallName: "search")
2. ToolCallArgsEvent (toolCallId: "call_1", delta: "{\"query\":")
3. ToolCallArgsEvent (toolCallId: "call_1", delta: " \"Python\"}")
4. ToolCallEndEvent (toolCallId: "call_1")
5. ToolCallResultEvent (toolCallId: "call_1", result: "{...}")
6. TextEvent (content: "Based on the search...")
7. [DONE]
Error Flow
Copy
1. TextEvent (content: "Processing...")
2. ErrorEvent (error: "Tool execution failed")
3. [DONE]
Interrupt Flow
Copy
1. TextEvent (content: "I need to delete a file...")
2. InterruptEvent (id: "int_1", reason: "Approval required", payload: {...})
3. [DONE]
Server-Sent Events Format
All events are sent using the SSE format:Copy
data: {"type":"text","content":"Hello"}\n\n
data: {"type":"text","content":" World"}\n\n
data: [DONE]\n\n
SSE Headers
Copy
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
Event Handler
Thehandler function processes agent execution and yields events:
Copy
from typing import AsyncGenerator
from ag_kit_py.server.send_message.models import SendMessageEvent
async def handler(
input_data: dict,
agent: Any
) -> AsyncGenerator[SendMessageEvent, None]:
"""Handle HTTP requests and process agent execution with streaming.
Args:
input_data: Dictionary containing request data
agent: The agent instance to execute
Yields:
SendMessageEvent: Formatted event objects for client consumption
Raises:
RuntimeError: When agent execution or message processing fails
"""
Event Conversion
The handler converts AG-UI events to client events:Copy
from ag_ui.core.events import EventType
async for event in agent.run(run_data):
match event.type:
case EventType.TEXT_MESSAGE_CONTENT:
data = {"content": event.raw_event["data"]["chunk"]["content"]}
yield TextEvent.model_validate(data)
case EventType.TOOL_CALL_START:
data = {
"tool_call_id": event.tool_call_id,
"tool_call_name": event.tool_call_name
}
yield ToolCallStartEvent.model_validate(data)
case EventType.TOOL_CALL_ARGS:
data = {"tool_call_id": event.tool_call_id, "delta": event.delta}
yield ToolCallArgsEvent.model_validate(data)
case EventType.TOOL_CALL_END:
data = {"tool_call_id": event.tool_call_id}
yield ToolCallEndEvent.model_validate(data)
case EventType.TOOL_CALL_RESULT:
data = {
"tool_call_id": event.tool_call_id,
"result": json.loads(event.result)
}
yield ToolCallResultEvent.model_validate(data)
Client Integration
JavaScript/TypeScript
Copy
const eventSource = new EventSource('/send-message');
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
const data = JSON.parse(event.data);
switch (data.type) {
case 'text':
console.log('Text:', data.content);
break;
case 'tool-call-start':
console.log('Tool call started:', data.toolCallName);
break;
case 'tool-result':
console.log('Tool result:', data.result);
break;
case 'error':
console.error('Error:', data.error);
break;
case 'interrupt':
console.log('Interrupt:', data.reason);
handleInterrupt(data);
break;
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close();
};
Python Client
Copy
import httpx
import json
async with httpx.AsyncClient() as client:
async with client.stream(
'POST',
'http://localhost:8000/send-message',
json={
"messages": [{"role": "user", "content": "Hello"}],
"conversationId": "conv_123"
}
) as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
if data == '[DONE]':
break
event = json.loads(data)
if event['type'] == 'text':
print(event['content'], end='', flush=True)
elif event['type'] == 'tool-call-start':
print(f"\n[Tool: {event['toolCallName']}]")
elif event['type'] == 'error':
print(f"\nError: {event['error']}")
React Hook
Copy
import { useEffect, useState } from 'react';
function useAgentStream(url: string, request: any) {
const [messages, setMessages] = useState<string[]>([]);
const [toolCalls, setToolCalls] = useState<any[]>([]);
const [error, setError] = useState<string | null>(null);
const [isStreaming, setIsStreaming] = useState(false);
useEffect(() => {
const eventSource = new EventSource(url);
setIsStreaming(true);
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
setIsStreaming(false);
return;
}
const data = JSON.parse(event.data);
switch (data.type) {
case 'text':
setMessages(prev => [...prev, data.content]);
break;
case 'tool-call-start':
setToolCalls(prev => [...prev, {
id: data.toolCallId,
name: data.toolCallName,
status: 'running'
}]);
break;
case 'tool-result':
setToolCalls(prev => prev.map(call =>
call.id === data.toolCallId
? { ...call, result: data.result, status: 'completed' }
: call
));
break;
case 'error':
setError(data.error);
break;
}
};
eventSource.onerror = () => {
eventSource.close();
setIsStreaming(false);
};
return () => eventSource.close();
}, [url]);
return { messages, toolCalls, error, isStreaming };
}
Message Models
Request Messages
SystemMessage
Copy
class SystemMessage(BaseModel):
"""System message model."""
role: Literal["system"] = "system"
content: str
UserMessage
Copy
class UserMessage(BaseModel):
"""User message model."""
role: Literal["user"] = "user"
content: str
AssistantMessage
Copy
class AssistantMessage(BaseModel):
"""AI assistant message model."""
id: str
role: Literal["assistant"] = "assistant"
content: Optional[str] = None
tool_calls: Optional[List[ToolCall]] = Field(None, alias="toolCalls")
ToolMessage
Copy
class ToolMessage(BaseModel):
"""Tool result message model."""
role: Literal["tool"] = "tool"
content: str
tool_call_id: str = Field(..., alias="toolCallId")
Tool Models
Tool
Copy
class Tool(BaseModel):
"""Tool definition model."""
name: str
description: str
parameters: Any
ToolCall
Copy
class ToolCall(BaseModel):
"""Tool function call model."""
id: str
type: Literal["function"] = "function"
function: ToolFunction
ToolFunction
Copy
class ToolFunction(BaseModel):
"""Tool function definition model."""
name: str
arguments: str
Complete Example
Server
Copy
from ag_kit_py.server import AGKitAPIApp
from ag_kit_py.agents import LangGraphAgent
def create_agent():
return {"agent": my_agent}
AGKitAPIApp().run(create_agent, port=8000)
Client
Copy
import httpx
import json
async def stream_agent_response():
async with httpx.AsyncClient() as client:
async with client.stream(
'POST',
'http://localhost:8000/send-message',
json={
"messages": [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Search for Python tutorials"}
],
"tools": [
{
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
],
"conversationId": "conv_123"
}
) as response:
current_text = ""
current_tool_calls = {}
async for line in response.aiter_lines():
if not line.startswith('data: '):
continue
data = line[6:]
if data == '[DONE]':
break
event = json.loads(data)
if event['type'] == 'text':
current_text += event['content']
print(event['content'], end='', flush=True)
elif event['type'] == 'tool-call-start':
print(f"\n[Calling {event['toolCallName']}...]")
current_tool_calls[event['toolCallId']] = {
'name': event['toolCallName'],
'args': ''
}
elif event['type'] == 'tool-call-args':
current_tool_calls[event['toolCallId']]['args'] += event['delta']
elif event['type'] == 'tool-call-end':
tool_call = current_tool_calls[event['toolCallId']]
print(f"[Arguments: {tool_call['args']}]")
elif event['type'] == 'tool-result':
print(f"[Result: {event['result']}]")
elif event['type'] == 'error':
print(f"\nError: {event['error']}")
elif event['type'] == 'interrupt':
print(f"\nInterrupt: {event['reason']}")
print(f"Payload: {event['payload']}")
print("\n\nFinal text:", current_text)
# Run
import asyncio
asyncio.run(stream_agent_response())
Best Practices
1. Handle All Event Types
Copy
def handle_event(event):
"""Handle all possible event types."""
event_type = event.get('type')
if event_type == 'text':
display_text(event['content'])
elif event_type == 'tool-call-start':
show_tool_indicator(event['toolCallName'])
elif event_type == 'tool-result':
display_tool_result(event['result'])
elif event_type == 'error':
show_error(event['error'])
elif event_type == 'interrupt':
handle_interrupt(event)
else:
print(f"Unknown event type: {event_type}")
2. Implement Reconnection Logic
Copy
function connectWithRetry(url: string, maxRetries: number = 3) {
let retries = 0;
function connect() {
const eventSource = new EventSource(url);
eventSource.onerror = () => {
eventSource.close();
if (retries < maxRetries) {
retries++;
setTimeout(connect, 1000 * retries);
}
};
return eventSource;
}
return connect();
}
3. Buffer Text Events
Copy
class TextBuffer:
def __init__(self):
self.buffer = []
def add(self, content: str):
self.buffer.append(content)
def get_full_text(self) -> str:
return ''.join(self.buffer)
def clear(self):
self.buffer.clear()
# Usage
buffer = TextBuffer()
async for event in stream_events():
if event['type'] == 'text':
buffer.add(event['content'])
print(event['content'], end='', flush=True)
print("\n\nComplete response:", buffer.get_full_text())
4. Track Tool Calls
Copy
class ToolCallTracker:
def __init__(self):
self.calls = {}
def start(self, call_id: str, name: str):
self.calls[call_id] = {
'name': name,
'args': '',
'result': None,
'status': 'running'
}
def add_args(self, call_id: str, delta: str):
if call_id in self.calls:
self.calls[call_id]['args'] += delta
def complete(self, call_id: str, result: str):
if call_id in self.calls:
self.calls[call_id]['result'] = result
self.calls[call_id]['status'] = 'completed'
def get_all(self):
return self.calls
# Usage
tracker = ToolCallTracker()
async for event in stream_events():
if event['type'] == 'tool-call-start':
tracker.start(event['toolCallId'], event['toolCallName'])
elif event['type'] == 'tool-call-args':
tracker.add_args(event['toolCallId'], event['delta'])
elif event['type'] == 'tool-result':
tracker.complete(event['toolCallId'], event['result'])
print("All tool calls:", tracker.get_all())
5. Handle Interrupts
Copy
async def handle_interrupt_event(event):
"""Handle interrupt events for human-in-the-loop."""
interrupt_id = event['id']
reason = event['reason']
payload = event['payload']
print(f"Interrupt: {reason}")
print(f"Payload: {payload}")
# Get user approval
approval = input("Approve? (y/n): ")
if approval.lower() == 'y':
# Resume with approval
await resume_conversation(interrupt_id, {"approved": True})
else:
# Cancel
print("Operation cancelled")
Related Documentation
- Server Overview - Server system overview
- Server API - Server API reference
- Agents Overview - Agent system
- LangGraph Agent - LangGraph implementation