LanggraphAgent 将 AG-Kit 与 LangGraph 工作流集成。它流式传输执行事件、将 AG-UI 消息映射到 LangGraph 消息、通过 state.agKit.actions 向 LangGraph 暴露 AG-Kit 工具,并支持检查点功能(内存或 TDAI 支持)。
pnpm add @ag-kit/agents @ag-kit/adapter-langgraph @langchain/langgraph
可选依赖:
@langchain/openai – 如果您使用 OpenAI 模型(例如示例中的 ChatOpenAI)
所有导出都可从 @ag-kit/adapter-langgraph 获取:
LanggraphAgent - 用于 LangGraph 集成的主 Agent 类
AGKitStateAnnotation - LangGraph 工作流的状态注解
AGKitPropertiesAnnotation - 包含 AG-Kit actions 的属性注解
type AGKitState - AG-Kit 状态结构的 TypeScript 类型
TDAISaver - 使用 TDAI Memory 的检查点保存器实现
type TDAISaverConfig - TDAISaver 的配置类型
快速开始
import { LanggraphAgent, AGKitStateAnnotation } from "@ag-kit/adapter-langgraph";
import { StateGraph, Command, START, END, MemorySaver } from "@langchain/langgraph";
import type { AGKitState } from "@ag-kit/adapter-langgraph";
const workflow = new StateGraph(AGKitStateAnnotation)
.addNode("chat_node", async (state: AGKitState) => {
// 生成响应并追加到消息列表
const response = {
role: "assistant",
content: "Hello from LangGraph!"
};
return new Command({
goto: END,
update: { messages: [response] }
});
})
.addEdge(START, "chat_node")
.addEdge("chat_node", END);
const compiledWorkflow = workflow.compile({
checkpointer: new MemorySaver()
});
const agent = new LanggraphAgent({
name: "langgraph-agent",
description: "A LangGraph workflow agent",
compiledWorkflow,
});
Server 集成
将 Agent 传递给服务器。服务器会暴露一个 POST /send-message SSE 端点。
import { run } from "@ag-kit/server";
run({
createAgent: () => ({ agent: myAgent }),
port: 3000
});
在工作流中使用 AG-Kit 工具
AG-Kit 工具会自动通过 state.agKit.actions 暴露给您的 LangGraph 工作流。您可以在工作流节点中访问这些工具:
import { AGKitStateAnnotation, type AGKitState } from "@ag-kit/adapter-langgraph";
import { StateGraph, Command, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import { DynamicStructuredTool } from "@langchain/core/tools";
const workflow = new StateGraph(AGKitStateAnnotation)
.addNode("agent", async (state: AGKitState) => {
// 从 state 访问 AG-Kit 工具
// state.agKit.actions 包含传递给 Agent 的工具定义
const agKitActions = state.agKit.actions || [];
// 如有需要,将 AG-Kit actions 转换为 LangChain 工具
const tools = agKitActions.map(action =>
new DynamicStructuredTool({
name: action.name,
description: action.description,
schema: action.schema, // 应该是 Zod schema
func: async () => "" // 工具处理器应该被实现
})
);
const model = new ChatOpenAI({ modelName: "gpt-4" });
const response = await model.bindTools(tools).invoke(state.messages);
return new Command({
update: { messages: [response] }
});
})
.addEdge(START, "agent")
.addEdge("agent", END);
注意: state.agKit.actions 数组包含工具定义,这些定义会从传递给 agent.run() 的工具中自动填充。它们已经转换为与 LangGraph 工作流兼容的格式。
检查点
LanggraphAgent 支持检查点功能以持久化状态。您可以使用内存检查点或 TDAI 支持的检查点:
内存检查点:
import { MemorySaver } from "@langchain/langgraph";
const compiledWorkflow = workflow.compile({
checkpointer: new MemorySaver()
});
TDAI 支持的检查点:
import { TDAISaver } from "@ag-kit/adapter-langgraph";
const saver = new TDAISaver({
// TDAI Memory 配置
// 查看 TDAISaverConfig 了解可用选项
});
const compiledWorkflow = workflow.compile({
checkpointer: saver
});
API
class LanggraphAgent
LanggraphAgent 类将 AG-Kit 与 LangGraph 工作流集成,提供事件流、消息映射和工具暴露功能。
new LanggraphAgent(config: AgentConfig & {
compiledWorkflow: CompiledWorkflow;
})
要执行的已编译 LangGraph 工作流。通常通过 workflow.compile(…) 创建。必须使用 AGKitStateAnnotation 作为状态模式。
run()
run(input: RunAgentInput): Observable<BaseEvent>
执行 LangGraph 工作流并在执行期间发出来自 @ag-ui/client 的 BaseEvent 值。
参数:
input - RunAgentInput 对象,包含消息、运行 ID、线程 ID、工具和其他配置
返回值:
Observable<BaseEvent> - RxJS Observable,发出执行事件
示例:
import { RunAgentInput } from "@ag-ui/client";
const input: RunAgentInput = {
messages: [{ role: "user", content: "Hello" }],
runId: "run-123",
threadId: "thread-456",
tools: [/* 工具定义 */],
state: {},
context: [],
forwardedProps: {}
};
agent.run(input).subscribe(event => {
console.log("Event:", event.type);
});
事件类型
Agent 在执行期间发出以下事件类型:
RUN_STARTED - 执行开始时发出
RUN_FINISHED - 执行完成时发出(对于中断的运行,可能包含 outcome: 'interrupt' 和 interrupt 负载)
RUN_ERROR - 执行期间发生错误时发出
TEXT_MESSAGE_START - 文本消息开始时发出
TEXT_MESSAGE_CONTENT - 为每个消息内容块发出(支持流式传输)
TEXT_MESSAGE_END - 文本消息完成时发出
TOOL_CALL_START - 工具调用开始时发出
TOOL_CALL_ARGS - 为工具调用参数发出(支持流式增量)
TOOL_CALL_END - 工具调用完成时发出
TOOL_CALL_RESULT - 发出工具调用的结果
提示: 在大多数应用程序中,您无需直接处理这些事件。@ag-kit/server 会从 Agent 消费它们并通过 SSE 为您流式传输。
AGKitStateAnnotation
AGKitStateAnnotation 为 LangGraph 工作流提供状态模式。它包括:
messages - LangGraph 消息数组
agKit.actions - 暴露给工作流的 AG-Kit 工具数组
用法:
import { AGKitStateAnnotation, type AGKitState } from "@ag-kit/adapter-langgraph";
import { StateGraph } from "@langchain/langgraph";
const workflow = new StateGraph(AGKitStateAnnotation)
.addNode("node", async (state: AGKitState) => {
// 访问消息和 AG-Kit 工具
const messages = state.messages;
const tools = state.agKit.actions;
// ...
});
TDAISaver
TDAISaver 类使用 TDAI Memory 存储提供检查点持久化。
new TDAISaver(config: TDAISaverConfig)
用于存储检查点的集合名称。默认值:“checkpoints”
用于存储检查点写入的集合名称。默认值:“checkpoint_writes”
额外的 TDAI Memory 客户端配置选项。详情请参阅 TDAI Memory 文档。
示例:
import { TDAISaver } from "@ag-kit/adapter-langgraph";
const saver = new TDAISaver({
checkpointType: "my_checkpoints",
// TDAI Memory 配置
});
const compiledWorkflow = workflow.compile({
checkpointer: saver
});
最佳实践
- 使用 AGKitStateAnnotation:始终使用
AGKitStateAnnotation 作为您的状态模式以启用 AG-Kit 集成
- 从 State 访问工具:使用
state.agKit.actions 在工作流节点中访问 AG-Kit 工具
- 检查点:在生产应用中使用检查点进行状态持久化
- 错误处理:在应用程序中适当处理
RUN_ERROR 事件
- 流式传输:利用事件流提供实时用户反馈
另请参阅