Skip to main content

Streaming Events

The AG-Kit Python SDK uses an event-driven architecture to enable real-time communication between agents and frontend applications through Server-Sent Events (SSE).

Overview

The streaming event system provides:
  • Real-time Streaming: Server-Sent Events (SSE) for live updates
  • Structured Events: Type-safe event models for different interactions
  • Tool Execution: Streaming tool calls with arguments and results
  • Error Handling: Comprehensive error event support
  • Interrupt Support: Human-in-the-loop workflow events

Event Types

All events are defined as Pydantic models in ag_kit_py.server.send_message.models.

TextEvent

Streams text content from the agent.
from pydantic import BaseModel
from typing import Literal

class TextEvent(BaseModel):
    """Text streaming event model."""
    
    type: Literal["text"] = "text"
    content: str
Example:
{
  "type": "text",
  "content": "Hello! How can I help you today?"
}

ToolCallStartEvent

Indicates the start of a tool function call.
class ToolCallStartEvent(BaseModel):
    """Tool call start event model."""
    
    type: Literal["tool-call-start"] = "tool-call-start"
    tool_call_id: str = Field(..., alias="toolCallId")
    tool_call_name: str = Field(..., alias="toolCallName")
Example:
{
  "type": "tool-call-start",
  "toolCallId": "call_abc123",
  "toolCallName": "web_search"
}

ToolCallArgsEvent

Streams tool function arguments incrementally.
class ToolCallArgsEvent(BaseModel):
    """Tool call arguments streaming event model."""
    
    type: Literal["tool-call-args"] = "tool-call-args"
    tool_call_id: str = Field(..., alias="toolCallId")
    delta: str
Example:
{
  "type": "tool-call-args",
  "toolCallId": "call_abc123",
  "delta": "{\"query\": \"Python"
}

ToolCallEndEvent

Indicates the completion of a tool function call.
class ToolCallEndEvent(BaseModel):
    """Tool call completion event model."""
    
    type: Literal["tool-call-end"] = "tool-call-end"
    tool_call_id: str = Field(..., alias="toolCallId")
Example:
{
  "type": "tool-call-end",
  "toolCallId": "call_abc123"
}

ToolCallResultEvent

Contains the result of a tool function execution.
class ToolCallResultEvent(BaseModel):
    """Tool result streaming event model."""
    
    type: Literal["tool-result"] = "tool-result"
    tool_call_id: str = Field(..., alias="toolCallId")
    result: str
Example:
{
  "type": "tool-result",
  "toolCallId": "call_abc123",
  "result": "{\"results\": [\"Python Tutorial 1\", \"Python Guide 2\"]}"
}

InterruptEvent

Signals an interruption requiring user input (human-in-the-loop).
from typing import Any, Optional

class InterruptEvent(BaseModel):
    """Interrupt event model."""
    
    type: Literal["interrupt"] = "interrupt"
    id: str
    reason: Optional[str] = None
    payload: Any
Example:
{
  "type": "interrupt",
  "id": "interrupt_123",
  "reason": "User approval required for next steps",
  "payload": {
    "steps": [
      {"description": "Delete file", "status": "pending"},
      {"description": "Send email", "status": "pending"}
    ]
  }
}

ErrorEvent

Indicates an error occurred during processing.
class ErrorEvent(BaseModel):
    """Error event model."""
    
    type: Literal["error"] = "error"
    error: str
Example:
{
  "type": "error",
  "error": "Agent execution failed: Invalid tool parameters"
}

Event Flow

Simple Text Response

1. TextEvent (content: "Hello")
2. TextEvent (content: "! How")
3. TextEvent (content: " can I")
4. TextEvent (content: " help?")
5. [DONE]

Tool Call Flow

1. ToolCallStartEvent (toolCallId: "call_1", toolCallName: "search")
2. ToolCallArgsEvent (toolCallId: "call_1", delta: "{\"query\":")
3. ToolCallArgsEvent (toolCallId: "call_1", delta: " \"Python\"}")
4. ToolCallEndEvent (toolCallId: "call_1")
5. ToolCallResultEvent (toolCallId: "call_1", result: "{...}")
6. TextEvent (content: "Based on the search...")
7. [DONE]

Error Flow

1. TextEvent (content: "Processing...")
2. ErrorEvent (error: "Tool execution failed")
3. [DONE]

Interrupt Flow

1. TextEvent (content: "I need to delete a file...")
2. InterruptEvent (id: "int_1", reason: "Approval required", payload: {...})
3. [DONE]

Server-Sent Events Format

All events are sent using the SSE format:
data: {"type":"text","content":"Hello"}\n\n
data: {"type":"text","content":" World"}\n\n
data: [DONE]\n\n

SSE Headers

Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive

Event Handler

The handler function processes agent execution and yields events:
from typing import AsyncGenerator
from ag_kit_py.server.send_message.models import SendMessageEvent

async def handler(
    input_data: dict, 
    agent: Any
) -> AsyncGenerator[SendMessageEvent, None]:
    """Handle HTTP requests and process agent execution with streaming.
    
    Args:
        input_data: Dictionary containing request data
        agent: The agent instance to execute
        
    Yields:
        SendMessageEvent: Formatted event objects for client consumption
        
    Raises:
        RuntimeError: When agent execution or message processing fails
    """

Event Conversion

The handler converts AG-UI events to client events:
from ag_ui.core.events import EventType

async for event in agent.run(run_data):
    match event.type:
        case EventType.TEXT_MESSAGE_CONTENT:
            data = {"content": event.raw_event["data"]["chunk"]["content"]}
            yield TextEvent.model_validate(data)
            
        case EventType.TOOL_CALL_START:
            data = {
                "tool_call_id": event.tool_call_id,
                "tool_call_name": event.tool_call_name
            }
            yield ToolCallStartEvent.model_validate(data)
            
        case EventType.TOOL_CALL_ARGS:
            data = {"tool_call_id": event.tool_call_id, "delta": event.delta}
            yield ToolCallArgsEvent.model_validate(data)
            
        case EventType.TOOL_CALL_END:
            data = {"tool_call_id": event.tool_call_id}
            yield ToolCallEndEvent.model_validate(data)
            
        case EventType.TOOL_CALL_RESULT:
            data = {
                "tool_call_id": event.tool_call_id,
                "result": json.loads(event.result)
            }
            yield ToolCallResultEvent.model_validate(data)

Client Integration

JavaScript/TypeScript

const eventSource = new EventSource('/send-message');

eventSource.onmessage = (event) => {
  if (event.data === '[DONE]') {
    eventSource.close();
    return;
  }
  
  const data = JSON.parse(event.data);
  
  switch (data.type) {
    case 'text':
      console.log('Text:', data.content);
      break;
      
    case 'tool-call-start':
      console.log('Tool call started:', data.toolCallName);
      break;
      
    case 'tool-result':
      console.log('Tool result:', data.result);
      break;
      
    case 'error':
      console.error('Error:', data.error);
      break;
      
    case 'interrupt':
      console.log('Interrupt:', data.reason);
      handleInterrupt(data);
      break;
  }
};

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  eventSource.close();
};

Python Client

import httpx
import json

async with httpx.AsyncClient() as client:
    async with client.stream(
        'POST',
        'http://localhost:8000/send-message',
        json={
            "messages": [{"role": "user", "content": "Hello"}],
            "conversationId": "conv_123"
        }
    ) as response:
        async for line in response.aiter_lines():
            if line.startswith('data: '):
                data = line[6:]  # Remove 'data: ' prefix
                
                if data == '[DONE]':
                    break
                
                event = json.loads(data)
                
                if event['type'] == 'text':
                    print(event['content'], end='', flush=True)
                elif event['type'] == 'tool-call-start':
                    print(f"\n[Tool: {event['toolCallName']}]")
                elif event['type'] == 'error':
                    print(f"\nError: {event['error']}")

React Hook

import { useEffect, useState } from 'react';

function useAgentStream(url: string, request: any) {
  const [messages, setMessages] = useState<string[]>([]);
  const [toolCalls, setToolCalls] = useState<any[]>([]);
  const [error, setError] = useState<string | null>(null);
  const [isStreaming, setIsStreaming] = useState(false);

  useEffect(() => {
    const eventSource = new EventSource(url);
    setIsStreaming(true);

    eventSource.onmessage = (event) => {
      if (event.data === '[DONE]') {
        eventSource.close();
        setIsStreaming(false);
        return;
      }

      const data = JSON.parse(event.data);

      switch (data.type) {
        case 'text':
          setMessages(prev => [...prev, data.content]);
          break;
        case 'tool-call-start':
          setToolCalls(prev => [...prev, {
            id: data.toolCallId,
            name: data.toolCallName,
            status: 'running'
          }]);
          break;
        case 'tool-result':
          setToolCalls(prev => prev.map(call =>
            call.id === data.toolCallId
              ? { ...call, result: data.result, status: 'completed' }
              : call
          ));
          break;
        case 'error':
          setError(data.error);
          break;
      }
    };

    eventSource.onerror = () => {
      eventSource.close();
      setIsStreaming(false);
    };

    return () => eventSource.close();
  }, [url]);

  return { messages, toolCalls, error, isStreaming };
}

Message Models

Request Messages

SystemMessage

class SystemMessage(BaseModel):
    """System message model."""
    
    role: Literal["system"] = "system"
    content: str

UserMessage

class UserMessage(BaseModel):
    """User message model."""
    
    role: Literal["user"] = "user"
    content: str

AssistantMessage

class AssistantMessage(BaseModel):
    """AI assistant message model."""
    
    id: str
    role: Literal["assistant"] = "assistant"
    content: Optional[str] = None
    tool_calls: Optional[List[ToolCall]] = Field(None, alias="toolCalls")

ToolMessage

class ToolMessage(BaseModel):
    """Tool result message model."""
    
    role: Literal["tool"] = "tool"
    content: str
    tool_call_id: str = Field(..., alias="toolCallId")

Tool Models

Tool

class Tool(BaseModel):
    """Tool definition model."""
    
    name: str
    description: str
    parameters: Any

ToolCall

class ToolCall(BaseModel):
    """Tool function call model."""
    
    id: str
    type: Literal["function"] = "function"
    function: ToolFunction

ToolFunction

class ToolFunction(BaseModel):
    """Tool function definition model."""
    
    name: str
    arguments: str

Complete Example

Server

from ag_kit_py.server import AGKitAPIApp
from ag_kit_py.agents import LangGraphAgent

def create_agent():
    return {"agent": my_agent}

AGKitAPIApp().run(create_agent, port=8000)

Client

import httpx
import json

async def stream_agent_response():
    async with httpx.AsyncClient() as client:
        async with client.stream(
            'POST',
            'http://localhost:8000/send-message',
            json={
                "messages": [
                    {"role": "system", "content": "You are helpful."},
                    {"role": "user", "content": "Search for Python tutorials"}
                ],
                "tools": [
                    {
                        "name": "web_search",
                        "description": "Search the web",
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "query": {"type": "string"}
                            },
                            "required": ["query"]
                        }
                    }
                ],
                "conversationId": "conv_123"
            }
        ) as response:
            current_text = ""
            current_tool_calls = {}
            
            async for line in response.aiter_lines():
                if not line.startswith('data: '):
                    continue
                
                data = line[6:]
                if data == '[DONE]':
                    break
                
                event = json.loads(data)
                
                if event['type'] == 'text':
                    current_text += event['content']
                    print(event['content'], end='', flush=True)
                    
                elif event['type'] == 'tool-call-start':
                    print(f"\n[Calling {event['toolCallName']}...]")
                    current_tool_calls[event['toolCallId']] = {
                        'name': event['toolCallName'],
                        'args': ''
                    }
                    
                elif event['type'] == 'tool-call-args':
                    current_tool_calls[event['toolCallId']]['args'] += event['delta']
                    
                elif event['type'] == 'tool-call-end':
                    tool_call = current_tool_calls[event['toolCallId']]
                    print(f"[Arguments: {tool_call['args']}]")
                    
                elif event['type'] == 'tool-result':
                    print(f"[Result: {event['result']}]")
                    
                elif event['type'] == 'error':
                    print(f"\nError: {event['error']}")
                    
                elif event['type'] == 'interrupt':
                    print(f"\nInterrupt: {event['reason']}")
                    print(f"Payload: {event['payload']}")
            
            print("\n\nFinal text:", current_text)

# Run
import asyncio
asyncio.run(stream_agent_response())

Best Practices

1. Handle All Event Types

def handle_event(event):
    """Handle all possible event types."""
    event_type = event.get('type')
    
    if event_type == 'text':
        display_text(event['content'])
    elif event_type == 'tool-call-start':
        show_tool_indicator(event['toolCallName'])
    elif event_type == 'tool-result':
        display_tool_result(event['result'])
    elif event_type == 'error':
        show_error(event['error'])
    elif event_type == 'interrupt':
        handle_interrupt(event)
    else:
        print(f"Unknown event type: {event_type}")

2. Implement Reconnection Logic

function connectWithRetry(url: string, maxRetries: number = 3) {
  let retries = 0;
  
  function connect() {
    const eventSource = new EventSource(url);
    
    eventSource.onerror = () => {
      eventSource.close();
      
      if (retries < maxRetries) {
        retries++;
        setTimeout(connect, 1000 * retries);
      }
    };
    
    return eventSource;
  }
  
  return connect();
}

3. Buffer Text Events

class TextBuffer:
    def __init__(self):
        self.buffer = []
    
    def add(self, content: str):
        self.buffer.append(content)
    
    def get_full_text(self) -> str:
        return ''.join(self.buffer)
    
    def clear(self):
        self.buffer.clear()

# Usage
buffer = TextBuffer()

async for event in stream_events():
    if event['type'] == 'text':
        buffer.add(event['content'])
        print(event['content'], end='', flush=True)

print("\n\nComplete response:", buffer.get_full_text())

4. Track Tool Calls

class ToolCallTracker:
    def __init__(self):
        self.calls = {}
    
    def start(self, call_id: str, name: str):
        self.calls[call_id] = {
            'name': name,
            'args': '',
            'result': None,
            'status': 'running'
        }
    
    def add_args(self, call_id: str, delta: str):
        if call_id in self.calls:
            self.calls[call_id]['args'] += delta
    
    def complete(self, call_id: str, result: str):
        if call_id in self.calls:
            self.calls[call_id]['result'] = result
            self.calls[call_id]['status'] = 'completed'
    
    def get_all(self):
        return self.calls

# Usage
tracker = ToolCallTracker()

async for event in stream_events():
    if event['type'] == 'tool-call-start':
        tracker.start(event['toolCallId'], event['toolCallName'])
    elif event['type'] == 'tool-call-args':
        tracker.add_args(event['toolCallId'], event['delta'])
    elif event['type'] == 'tool-result':
        tracker.complete(event['toolCallId'], event['result'])

print("All tool calls:", tracker.get_all())

5. Handle Interrupts

async def handle_interrupt_event(event):
    """Handle interrupt events for human-in-the-loop."""
    interrupt_id = event['id']
    reason = event['reason']
    payload = event['payload']
    
    print(f"Interrupt: {reason}")
    print(f"Payload: {payload}")
    
    # Get user approval
    approval = input("Approve? (y/n): ")
    
    if approval.lower() == 'y':
        # Resume with approval
        await resume_conversation(interrupt_id, {"approved": True})
    else:
        # Cancel
        print("Operation cancelled")