- 预览
- 代码
- 文档
复制
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Human-in-the-Loop Agent Implementation.
A LangGraph implementation of an interactive AI agent that requires human
approval for task execution steps. This agent demonstrates how to implement
interrupts, user confirmation workflows, and step-by-step approval mechanisms
using LangGraph's interrupt and approval systems.
Key Features:
- Interactive workflow with human approval checkpoints
- Step-by-step task planning and confirmation
- Interrupt handling for user input collection
- Creative task execution with user oversight
- Streaming responses with approval requests
- Error handling and workflow recovery
Architecture:
The agent uses a multi-node workflow (start_node -> chat_node -> process_steps_node)
that pauses execution at critical points to request user approval before
proceeding with task execution.
Usage:
This agent is ideal for scenarios where human oversight is required,
such as complex task planning, system operations, or any workflow that
benefits from user confirmation and customization.
"""
import os
from typing import Annotated, Any, Dict, List, Optional
from langchain_core.messages import SystemMessage
# LangGraph imports
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import Command, interrupt
class Step(BaseModel):
"""Task Step Model.
Represents a single step in a task execution plan that can be
enabled or disabled by the user during the approval process.
:param description: The text of the step in imperative form (e.g., "Open file", "Send email")
:type description: str
:param status: The status of the step, typically 'enabled' or 'disabled'
:type status: str
Example:
Creating a task step::
step = Step(
description="Delete temporary files",
status="enabled"
)
"""
description: str = Field(description="The text of the step in imperative form")
status: str = Field(description="The status of the step, always 'enabled'")
@tool
def plan_execution_steps(
steps: Annotated[List[Step], "An array of 10 step objects, each containing text and status"],
) -> None:
"""Generate task execution steps for user approval.
This tool creates a structured list of steps required to complete a task.
Each step is presented to the user for approval/modification before execution.
The steps should be concise, actionable, and in imperative form.
:param steps: Array of step objects with description and status
:type steps: List[Step]
Example Steps:
- "Open browser"
- "Navigate to website"
- "Fill form fields"
- "Submit request"
- "Verify response"
Note:
This is a tool function that will be called by the AI agent
when task planning is required. The actual step generation
is handled by the language model.
"""
class AgentState(MessagesState):
"""Human-in-the-Loop Agent State Model.
Extends LangGraph's MessagesState to include task steps and tools
for interactive workflow management with user approval.
:param steps: List of task steps with descriptions and status
:type steps: List[Dict[str, str]]
:param tools: List of available tools for the agent
:type tools: List[Any]
Inherited Fields:
- messages: Conversation history from MessagesState
Example:
Creating agent state::
state = AgentState(
messages=[HumanMessage(content="Plan a file cleanup task")],
steps=[
{"description": "Scan directory", "status": "enabled"},
{"description": "Delete temp files", "status": "disabled"}
],
tools=[]
)
"""
steps: List[Dict[str, str]]
tools: List[Any]
# Create LangChain ChatOpenAI model from environment variables
chat_model = ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7")),
)
async def start_node(state: Dict[str, Any], config: RunnableConfig) -> Command[Any]:
"""Workflow entry point and state initialization.
This is the entry point for the human-in-the-loop workflow. It initializes
the agent state and routes to the main chat processing node.
:param state: Current workflow state
:type state: Dict[str, Any]
:param config: LangGraph runnable configuration
:type config: RunnableConfig
:return: Command to route to chat_node with initialized state
:rtype: Command
Note:
This node ensures the steps list is properly initialized
before proceeding to the main conversation processing.
"""
# Initialize steps list if not exists
if "steps" not in state:
state["steps"] = []
# Return command to route to chat_node
return Command(
goto="chat_node",
update={
"messages": state["messages"],
"steps": state["steps"],
},
)
async def chat_node(state: AgentState, config: Optional[RunnableConfig] = None) -> Command:
"""Main chat processing node with task planning capabilities.
This node processes user messages and generates responses. When the user
requests task execution, it calls the plan_execution_steps tool to create
a structured task plan that will be presented to the user for approval.
Processing Flow:
1. Set up system prompt for task planning behavior
2. Bind tools including plan_execution_steps
3. Generate AI response with tool calling capability
4. Handle plan_execution_steps tool calls specially
5. Route to process_steps_node for user approval or END for regular chat
:param state: Current agent state with messages, steps, and tools
:type state: AgentState
:param config: Optional runnable configuration
:type config: Optional[RunnableConfig]
:return: Command to route to next node with updated state
:rtype: Command
Note:
This node uses "predict_state" metadata to enable streaming
for the plan_execution_steps tool, allowing real-time step updates.
"""
system_prompt = """
You are a helpful assistant that can perform any task.
You MUST call the `plan_execution_steps` function when the user asks you to perform a task.
Always make sure you will provide tasks based on the user query
"""
# Define the model
# model = ChatOpenAI(model="gpt-4o-mini")
# Define config for the model
if config is None:
config = RunnableConfig(recursion_limit=25)
# Use "predict_state" metadata to set up streaming for the write_document tool
if "metadata" not in config:
config["metadata"] = {}
config["metadata"]["predict_state"] = [
{"state_key": "steps", "tool": "plan_execution_steps", "tool_argument": "steps"}
]
# Bind the tools to the model
tools = state.get("tools", [])
model_with_tools = chat_model.bind_tools(
[*tools, plan_execution_steps],
# Disable parallel tool calls to avoid race conditions
parallel_tool_calls=False,
)
# Run the model and generate a response
response = await model_with_tools.ainvoke(
[
SystemMessage(content=system_prompt),
*state["messages"],
],
config,
)
# Update messages with the response
messages = state["messages"] + [response]
# Handle tool calls
if hasattr(response, "tool_calls") and response.tool_calls and len(response.tool_calls) > 0:
# Handle dicts or object (backward compatibility)
tool_call = response.tool_calls[0] if isinstance(response.tool_calls[0], dict) else vars(response.tool_calls[0])
if tool_call["name"] == "plan_execution_steps":
# Get the steps from the tool call
steps_raw = tool_call["args"]["steps"]
# Set initial status to "enabled" for all steps
steps_data = []
# Handle different potential formats of steps data
if isinstance(steps_raw, list):
for step in steps_raw:
if isinstance(step, dict) and "description" in step:
steps_data.append({"description": step["description"], "status": "enabled"})
elif isinstance(step, str):
steps_data.append({"description": step, "status": "enabled"})
elif hasattr(step, "description"):
steps_data.append({"description": step.description, "status": "enabled"})
# If no steps were processed correctly, return to END with the updated messages
if not steps_data:
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
# Update steps in state and emit to frontend
state["steps"] = steps_data
# Add a tool response to satisfy OpenAI's requirements
from langchain_core.messages import ToolMessage
tool_message = ToolMessage(content="Task steps generated.", tool_call_id=tool_call["id"])
messages = messages + [tool_message]
# Move to the process_steps_node which will handle the interrupt and final response
return Command(
goto="process_steps_node",
update={
"messages": messages,
"steps": state["steps"],
},
)
# If no tool calls or not plan_execution_steps, return to END with the updated messages
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
async def process_steps_node(state: Dict[str, Any], config: RunnableConfig) -> Command:
"""Handle user interrupt for step customization and generate final response.
This node manages the human-in-the-loop interaction by pausing execution
to collect user input on task steps, then generating a creative response
based on the user's step approvals/modifications.
Processing Flow:
1. Check for existing user_response from previous interrupt
2. If no response, trigger interrupt to collect user input
3. Generate creative task execution description based on user choices
4. Handle disabled steps with creative workarounds
5. Return final response and clean up state
:param state: Current workflow state including steps and user responses
:type state: Dict[str, Any]
:param config: LangGraph runnable configuration
:type config: RunnableConfig
:return: Command to END with final response and updated state
:rtype: Command
Note:
This node uses LangGraph's interrupt() function to pause execution
and wait for user input. The user_response is temporarily stored
in state to handle node restarts after interrupts.
"""
# Check if we already have a user_response in the state
# This happens when the node restarts after an interrupt
if "user_response" in state and state["user_response"]:
user_response = state["user_response"]
else:
# Use LangGraph interrupt to get user input on steps
# This will pause execution and wait for user input in the frontend
user_response = interrupt({"steps": state["steps"]})
# Store the user response in state for when the node restarts
state["user_response"] = user_response
# Generate the creative completion response
final_prompt = """
Provide a textual description of how you are performing the task.
If the user has disabled a step, you are not allowed to perform that step.
However, you should find a creative workaround to perform the task, and if an essential step is disabled,
you can even use some humor in the description of how you are performing the task.
Don't just repeat a list of steps, come up with a creative but short description (3 sentences max)
of how you are performing the task.
"""
# Use the provider's chat model for final response generation
final_response = await chat_model.ainvoke(
[SystemMessage(content=final_prompt), {"role": "user", "content": user_response}], config
)
# Add the final response to messages
messages = state["messages"] + [final_response]
# Clear the user_response from state to prepare for future interactions
if "user_response" in state:
state.pop("user_response")
# Return to END with the updated messages
return Command(
goto=END,
update={
"messages": messages,
"steps": state["steps"],
},
)
def build_chat_workflow() -> CompiledStateGraph:
"""Build and configure the human-in-the-loop LangGraph workflow.
This function creates a complete LangGraph workflow for handling
interactive conversations with human approval checkpoints. The workflow
supports task planning, user approval, and creative task execution.
Workflow Architecture:
- start_node: Initialize state and route to chat processing
- chat_node: Process messages and handle task planning
- process_steps_node: Manage user interrupts and final responses
Workflow Structure:
START -> start_node -> chat_node -> [process_steps_node | END]
:return: Compiled LangGraph workflow with memory persistence
:rtype: CompiledStateGraph
Example:
Creating and using the workflow::
from ag_kit_py_langgraph import LangGraphAgent
workflow = build_chat_workflow()
agent = LangGraphAgent(
name="HumanInTheLoopBot",
description="Interactive agent with user approval",
graph=workflow
)
# Process a task planning request
result = await workflow.ainvoke(
{
"messages": [HumanMessage(content="Plan a file cleanup task")],
"steps": [],
"tools": []
},
config={"configurable": {"thread_id": "session_1"}}
)
Note:
The workflow includes a MemorySaver checkpointer for conversation
persistence and proper interrupt handling across sessions.
"""
# Define the graph
workflow = StateGraph(AgentState)
memory = MemorySaver()
# Add nodes
workflow.add_node("start_node", start_node) # type: ignore[type-var]
workflow.add_node("chat_node", chat_node)
workflow.add_node("process_steps_node", process_steps_node)
# Add edges
workflow.set_entry_point("start_node")
workflow.add_edge(START, "start_node")
workflow.add_edge("start_node", "chat_node")
workflow.add_edge("process_steps_node", END)
return workflow.compile(checkpointer=memory)
Human in the Loop - LangGraph (Python)
本演示展示
本演示展示了 AG-Kit 使用 Python LangGraph 的 Human in the Loop 功能:- Python LangGraph:使用带有自定义
State类的StateGraph - 任务规划:Agent 将复杂任务分解为 10 个可执行步骤
- 工作流中断:LangGraph 使用
interrupt()中断以获取人工输入 - 交互式审批:用户可以在执行前启用/禁用单个步骤
- 上下文保持:在人工交互期间保持对话上下文
- 恢复功能:工作流继续执行人工批准的步骤
如何交互
尝试这些建议以触发 Human in the Loop 工作流:- “给我做个三明治”(触发 10 步三明治制作计划)
- “送我去火星”(触发复杂的太空任务规划)
- “规划一个生日派对”(生成派对规划步骤)
技术实现
后端 (Python LangGraph):- 带有
messages: Annotated[list, add_messages]的自定义State类 plan_execution_steps工具生成 10 步任务分解interrupt()暂停工作流以获取人工输入resume()继续执行批准的步骤- 带有对话历史的状态管理
- 带有优雅回退的错误处理
- 相同的
useChat钩子和AgKitChat组件 interrupt.renderWithResume处理中断 UISteps组件提供步骤选择界面- 用于步骤启用/禁用的复选框状态管理
- 恢复 Agent 执行的确认流程
- 跨语言的一致用户体验
技术流程
- 用户请求:用户请求复杂任务
- 任务规划:Agent 调用
plan_execution_steps工具 - 工作流中断:LangGraph 使用
interrupt()中断 - 人工审查:前端显示交互式步骤选择 UI
- 审批:用户启用/禁用步骤并确认
- 恢复:工作流继续执行批准的步骤
- 执行:Agent 继续执行人工批准的计划
关键特性
- 流式 API:使用
astream进行实时响应生成 - 错误恢复:带有信息性消息的优雅错误处理
- 记忆管理:对话历史持久化
- 多语言支持:Python 强大的异步能力