- Preview
- Code
- Doc
Copy
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Human-in-the-Loop Agent Implementation.
A LangGraph implementation of an interactive AI agent that requires human
approval for task execution steps. This agent demonstrates how to implement
interrupts, user confirmation workflows, and step-by-step approval mechanisms
using LangGraph's interrupt and approval systems.
Key Features:
- Interactive workflow with human approval checkpoints
- Step-by-step task planning and confirmation
- Interrupt handling for user input collection
- Creative task execution with user oversight
- Streaming responses with approval requests
- Error handling and workflow recovery
Architecture:
The agent uses a multi-node workflow (start_node -> chat_node -> process_steps_node)
that pauses execution at critical points to request user approval before
proceeding with task execution.
Usage:
This agent is ideal for scenarios where human oversight is required,
such as complex task planning, system operations, or any workflow that
benefits from user confirmation and customization.
"""
import os
from typing import Annotated, Any, Dict, List, Optional
from langchain_core.messages import SystemMessage
# LangGraph imports
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import Command, interrupt
class Step(BaseModel):
"""Task Step Model.
Represents a single step in a task execution plan that can be
enabled or disabled by the user during the approval process.
:param description: The text of the step in imperative form (e.g., "Open file", "Send email")
:type description: str
:param status: The status of the step, typically 'enabled' or 'disabled'
:type status: str
Example:
Creating a task step::
step = Step(
description="Delete temporary files",
status="enabled"
)
"""
description: str = Field(description="The text of the step in imperative form")
status: str = Field(description="The status of the step, always 'enabled'")
@tool
def plan_execution_steps(
steps: Annotated[List[Step], "An array of 10 step objects, each containing text and status"],
) -> None:
"""Generate task execution steps for user approval.
This tool creates a structured list of steps required to complete a task.
Each step is presented to the user for approval/modification before execution.
The steps should be concise, actionable, and in imperative form.
:param steps: Array of step objects with description and status
:type steps: List[Step]
Example Steps:
- "Open browser"
- "Navigate to website"
- "Fill form fields"
- "Submit request"
- "Verify response"
Note:
This is a tool function that will be called by the AI agent
when task planning is required. The actual step generation
is handled by the language model.
"""
class AgentState(MessagesState):
"""Human-in-the-Loop Agent State Model.
Extends LangGraph's MessagesState to include task steps and tools
for interactive workflow management with user approval.
:param steps: List of task steps with descriptions and status
:type steps: List[Dict[str, str]]
:param tools: List of available tools for the agent
:type tools: List[Any]
Inherited Fields:
- messages: Conversation history from MessagesState
Example:
Creating agent state::
state = AgentState(
messages=[HumanMessage(content="Plan a file cleanup task")],
steps=[
{"description": "Scan directory", "status": "enabled"},
{"description": "Delete temp files", "status": "disabled"}
],
tools=[]
)
"""
steps: List[Dict[str, str]]
tools: List[Any]
# Create LangChain ChatOpenAI model from environment variables
chat_model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7")),
)
async def start_node(state: Dict[str, Any], config: RunnableConfig) -> Command[Any]:
"""Workflow entry point and state initialization.
This is the entry point for the human-in-the-loop workflow. It initializes
the agent state and routes to the main chat processing node.
:param state: Current workflow state
:type state: Dict[str, Any]
:param config: LangGraph runnable configuration
:type config: RunnableConfig
:return: Command to route to chat_node with initialized state
:rtype: Command
Note:
This node ensures the steps list is properly initialized
before proceeding to the main conversation processing.
"""
# Initialize steps list if not exists
if "steps" not in state:
state["steps"] = []
# Return command to route to chat_node
return Command(
goto="chat_node",
update={
"messages": state["messages"],
"steps": state["steps"],
},
)
async def chat_node(state: AgentState, config: Optional[RunnableConfig] = None) -> Command:
"""Main chat processing node with task planning capabilities.
This node processes user messages and generates responses. When the user
requests task execution, it calls the plan_execution_steps tool to create
a structured task plan that will be presented to the user for approval.
Processing Flow:
1. Set up system prompt for task planning behavior
2. Bind tools including plan_execution_steps
3. Generate AI response with tool calling capability
4. Handle plan_execution_steps tool calls specially
5. Route to process_steps_node for user approval or END for regular chat
:param state: Current agent state with messages, steps, and tools
:type state: AgentState
:param config: Optional runnable configuration
:type config: Optional[RunnableConfig]
:return: Command to route to next node with updated state
:rtype: Command
Note:
This node uses "predict_state" metadata to enable streaming
for the plan_execution_steps tool, allowing real-time step updates.
"""
system_prompt = """
You are a helpful assistant that can perform any task.
You MUST call the `plan_execution_steps` function when the user asks you to perform a task.
Always make sure you will provide tasks based on the user query
"""
# Define the model
# model = ChatOpenAI(model="gpt-4o-mini")
# Define config for the model
if config is None:
config = RunnableConfig(recursion_limit=25)
# Use "predict_state" metadata to set up streaming for the write_document tool
if "metadata" not in config:
config["metadata"] = {}
config["metadata"]["predict_state"] = [
{"state_key": "steps", "tool": "plan_execution_steps", "tool_argument": "steps"}
]
# Bind the tools to the model
tools = state.get("tools", [])
model_with_tools = chat_model.bind_tools(
[*tools, plan_execution_steps],
# Disable parallel tool calls to avoid race conditions
parallel_tool_calls=False,
)
# Run the model and generate a response
response = await model_with_tools.ainvoke(
[
SystemMessage(content=system_prompt),
*state["messages"],
],
config,
)
# Update messages with the response
messages = state["messages"] + [response]
# Handle tool calls
if hasattr(response, "tool_calls") and response.tool_calls and len(response.tool_calls) > 0:
# Handle dicts or object (backward compatibility)
tool_call = response.tool_calls[0] if isinstance(response.tool_calls[0], dict) else vars(response.tool_calls[0])
if tool_call["name"] == "plan_execution_steps":
# Get the steps from the tool call
steps_raw = tool_call["args"]["steps"]
# Set initial status to "enabled" for all steps
steps_data = []
# Handle different potential formats of steps data
if isinstance(steps_raw, list):
for step in steps_raw:
if isinstance(step, dict) and "description" in step:
steps_data.append({"description": step["description"], "status": "enabled"})
elif isinstance(step, str):
steps_data.append({"description": step, "status": "enabled"})
elif hasattr(step, "description"):
steps_data.append({"description": step.description, "status": "enabled"})
# If no steps were processed correctly, return to END with the updated messages
if not steps_data:
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
# Update steps in state and emit to frontend
state["steps"] = steps_data
# Add a tool response to satisfy OpenAI's requirements
from langchain_core.messages import ToolMessage
tool_message = ToolMessage(content="Task steps generated.", tool_call_id=tool_call["id"])
messages = messages + [tool_message]
# Move to the process_steps_node which will handle the interrupt and final response
return Command(
goto="process_steps_node",
update={
"messages": messages,
"steps": state["steps"],
},
)
# If no tool calls or not plan_execution_steps, return to END with the updated messages
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
async def process_steps_node(state: Dict[str, Any], config: RunnableConfig) -> Command:
"""Handle user interrupt for step customization and generate final response.
This node manages the human-in-the-loop interaction by pausing execution
to collect user input on task steps, then generating a creative response
based on the user's step approvals/modifications.
Processing Flow:
1. Check for existing user_response from previous interrupt
2. If no response, trigger interrupt to collect user input
3. Generate creative task execution description based on user choices
4. Handle disabled steps with creative workarounds
5. Return final response and clean up state
:param state: Current workflow state including steps and user responses
:type state: Dict[str, Any]
:param config: LangGraph runnable configuration
:type config: RunnableConfig
:return: Command to END with final response and updated state
:rtype: Command
Note:
This node uses LangGraph's interrupt() function to pause execution
and wait for user input. The user_response is temporarily stored
in state to handle node restarts after interrupts.
"""
# Check if we already have a user_response in the state
# This happens when the node restarts after an interrupt
if "user_response" in state and state["user_response"]:
user_response = state["user_response"]
else:
# Use LangGraph interrupt to get user input on steps
# This will pause execution and wait for user input in the frontend
user_response = interrupt({"steps": state["steps"]})
# Store the user response in state for when the node restarts
state["user_response"] = user_response
# Generate the creative completion response
final_prompt = """
Provide a textual description of how you are performing the task.
If the user has disabled a step, you are not allowed to perform that step.
However, you should find a creative workaround to perform the task, and if an essential step is disabled,
you can even use some humor in the description of how you are performing the task.
Don't just repeat a list of steps, come up with a creative but short description (3 sentences max)
of how you are performing the task.
"""
# Use the provider's chat model for final response generation
final_response = await chat_model.ainvoke(
[SystemMessage(content=final_prompt), {"role": "user", "content": user_response}], config
)
# Add the final response to messages
messages = state["messages"] + [final_response]
# Clear the user_response from state to prepare for future interactions
if "user_response" in state:
state.pop("user_response")
# Return to END with the updated messages
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
def build_chat_workflow() -> CompiledStateGraph:
"""Build and configure the human-in-the-loop LangGraph workflow.
This function creates a complete LangGraph workflow for handling
interactive conversations with human approval checkpoints. The workflow
supports task planning, user approval, and creative task execution.
Workflow Architecture:
- start_node: Initialize state and route to chat processing
- chat_node: Process messages and handle task planning
- process_steps_node: Manage user interrupts and final responses
Workflow Structure:
START -> start_node -> chat_node -> [process_steps_node | END]
:return: Compiled LangGraph workflow with memory persistence
:rtype: CompiledStateGraph
Example:
Creating and using the workflow::
from ag_kit_py_langgraph import LangGraphAgent
workflow = build_chat_workflow()
agent = LangGraphAgent(
name="HumanInTheLoopBot",
description="Interactive agent with user approval",
graph=workflow
)
# Process a task planning request
result = await workflow.ainvoke(
{
"messages": [HumanMessage(content="Plan a file cleanup task")],
"steps": [],
"tools": []
},
config={"configurable": {"thread_id": "session_1"}}
)
Note:
The workflow includes a MemorySaver checkpointer for conversation
persistence and proper interrupt handling across sessions.
"""
# Define the graph
workflow = StateGraph(AgentState)
memory = MemorySaver()
# Add nodes
workflow.add_node("start_node", start_node) # type: ignore[type-var]
workflow.add_node("chat_node", chat_node)
workflow.add_node("process_steps_node", process_steps_node)
# Add edges
workflow.set_entry_point("start_node")
workflow.add_edge(START, "start_node")
workflow.add_edge("start_node", "chat_node")
workflow.add_edge("process_steps_node", END)
return workflow.compile(checkpointer=memory)
Human in the Loop - LangGraph (Python)
What This Demo Shows
This demo showcases AG-Kit’s human-in-the-loop capabilities using LangGraph with Python:- Python LangGraph: Uses
StateGraphwith customStateclass - Task Planning: Agent breaks down complex tasks into 10 executable steps
- Workflow Interruption: LangGraph interrupts for human input using
interrupt() - Interactive Approval: Users can enable/disable individual steps before execution
- Context Preservation: Maintains conversation context during human interaction
- Resume Functionality: Workflow continues with human-approved steps
How to Interact
Try these suggestions to trigger the human-in-the-loop workflow:- “Make me a sandwich” (triggers 10-step sandwich making plan)
- “Send me to Mars” (triggers complex space mission planning)
- “Plan a birthday party” (generates party planning steps)
Technical Implementation
Backend (Python LangGraph):- Custom
Stateclass withmessages: Annotated[list, add_messages] plan_execution_stepstool generates 10-step task breakdownsinterrupt()pauses workflow for human inputresume()continues with approved steps- State management with conversation history
- Error handling with graceful fallbacks
- Same
useChathook andAgKitChatcomponent interrupt.renderWithResumehandles interruption UIStepscomponent provides step selection interface- Checkbox state management for step enable/disable
- Confirmation flow to resume agent execution
- Consistent user experience across languages
Technical Flow
- User Request: User asks for a complex task
- Task Planning: Agent calls
plan_execution_stepstool - Workflow Interruption: LangGraph interrupts with
interrupt() - Human Review: Frontend shows interactive step selection UI
- Approval: User enables/disables steps and confirms
- Resume: Workflow continues with approved steps
- Execution: Agent proceeds with human-approved plan
Key Features
- Streaming API: Uses
astreamfor real-time response generation - Error Recovery: Graceful error handling with informative messages
- Memory Management: Conversation history persistence
- Multi-language Support: Python’s powerful async capabilities