跳转到主要内容

概述

LanggraphAgent 将 AG-Kit 与 LangGraph 工作流集成。它流式传输执行事件、将 AG-UI 消息映射到 LangGraph 消息、通过 state.agKit.actions 向 LangGraph 暴露 AG-Kit 工具,并支持检查点功能(内存或 TDAI 支持)。

安装

pnpm add @ag-kit/agents @ag-kit/adapter-langgraph @langchain/langgraph
可选依赖:
  • @langchain/openai – 如果您使用 OpenAI 模型(例如示例中的 ChatOpenAI

导出

所有导出都可从 @ag-kit/adapter-langgraph 获取:
  • LanggraphAgent - 用于 LangGraph 集成的主 Agent 类
  • AGKitStateAnnotation - LangGraph 工作流的状态注解
  • AGKitPropertiesAnnotation - 包含 AG-Kit actions 的属性注解
  • type AGKitState - AG-Kit 状态结构的 TypeScript 类型
  • TDAISaver - 使用 TDAI Memory 的检查点保存器实现
  • type TDAISaverConfig - TDAISaver 的配置类型

快速开始

import { LanggraphAgent, AGKitStateAnnotation } from "@ag-kit/adapter-langgraph";
import { StateGraph, Command, START, END, MemorySaver } from "@langchain/langgraph";
import type { AGKitState } from "@ag-kit/adapter-langgraph";

const workflow = new StateGraph(AGKitStateAnnotation)
  .addNode("chat_node", async (state: AGKitState) => {
    // 生成响应并追加到消息列表
    const response = { 
      role: "assistant", 
      content: "Hello from LangGraph!" 
    };
    return new Command({ 
      goto: END, 
      update: { messages: [response] } 
    });
  })
  .addEdge(START, "chat_node")
  .addEdge("chat_node", END);

const compiledWorkflow = workflow.compile({ 
  checkpointer: new MemorySaver() 
});

const agent = new LanggraphAgent({
  name: "langgraph-agent",
  description: "A LangGraph workflow agent",
  compiledWorkflow,
});

Server 集成

将 Agent 传递给服务器。服务器会暴露一个 POST /send-message SSE 端点。
import { run } from "@ag-kit/server";

run({ 
  createAgent: () => ({ agent: myAgent }),
  port: 3000 
});

在工作流中使用 AG-Kit 工具

AG-Kit 工具会自动通过 state.agKit.actions 暴露给您的 LangGraph 工作流。您可以在工作流节点中访问这些工具:
import { AGKitStateAnnotation, type AGKitState } from "@ag-kit/adapter-langgraph";
import { StateGraph, Command, START, END } from "@langchain/langgraph";
import { ChatOpenAI } from "@langchain/openai";
import { DynamicStructuredTool } from "@langchain/core/tools";

const workflow = new StateGraph(AGKitStateAnnotation)
  .addNode("agent", async (state: AGKitState) => {
    // 从 state 访问 AG-Kit 工具
    // state.agKit.actions 包含传递给 Agent 的工具定义
    const agKitActions = state.agKit.actions || [];
    
    // 如有需要,将 AG-Kit actions 转换为 LangChain 工具
    const tools = agKitActions.map(action => 
      new DynamicStructuredTool({
        name: action.name,
        description: action.description,
        schema: action.schema, // 应该是 Zod schema
        func: async () => "" // 工具处理器应该被实现
      })
    );
    
    const model = new ChatOpenAI({ modelName: "gpt-4" });
    const response = await model.bindTools(tools).invoke(state.messages);
    
    return new Command({
      update: { messages: [response] }
    });
  })
  .addEdge(START, "agent")
  .addEdge("agent", END);
注意: state.agKit.actions 数组包含工具定义,这些定义会从传递给 agent.run() 的工具中自动填充。它们已经转换为与 LangGraph 工作流兼容的格式。

检查点

LanggraphAgent 支持检查点功能以持久化状态。您可以使用内存检查点或 TDAI 支持的检查点: 内存检查点:
import { MemorySaver } from "@langchain/langgraph";

const compiledWorkflow = workflow.compile({
  checkpointer: new MemorySaver()
});
TDAI 支持的检查点:
import { TDAISaver } from "@ag-kit/adapter-langgraph";

const saver = new TDAISaver({
  // TDAI Memory 配置
  // 查看 TDAISaverConfig 了解可用选项
});

const compiledWorkflow = workflow.compile({
  checkpointer: saver
});

API

class LanggraphAgent

LanggraphAgent 类将 AG-Kit 与 LangGraph 工作流集成,提供事件流、消息映射和工具暴露功能。
new LanggraphAgent(config: AgentConfig & {
  compiledWorkflow: CompiledWorkflow;
})
name
string
必填
Agent 的唯一标识符。
description
string
Agent 的人类可读描述。
compiledWorkflow
CompiledWorkflow
必填
要执行的已编译 LangGraph 工作流。通常通过 workflow.compile(…) 创建。必须使用 AGKitStateAnnotation 作为状态模式。

方法

run()
run(input: RunAgentInput): Observable<BaseEvent>
执行 LangGraph 工作流并在执行期间发出来自 @ag-ui/clientBaseEvent 值。 参数:
  • input - RunAgentInput 对象,包含消息、运行 ID、线程 ID、工具和其他配置
返回值:
  • Observable<BaseEvent> - RxJS Observable,发出执行事件
示例:
import { RunAgentInput } from "@ag-ui/client";

const input: RunAgentInput = {
  messages: [{ role: "user", content: "Hello" }],
  runId: "run-123",
  threadId: "thread-456",
  tools: [/* 工具定义 */],
  state: {},
  context: [],
  forwardedProps: {}
};

agent.run(input).subscribe(event => {
  console.log("Event:", event.type);
});

事件类型

Agent 在执行期间发出以下事件类型:
  • RUN_STARTED - 执行开始时发出
  • RUN_FINISHED - 执行完成时发出(对于中断的运行,可能包含 outcome: 'interrupt'interrupt 负载)
  • RUN_ERROR - 执行期间发生错误时发出
  • TEXT_MESSAGE_START - 文本消息开始时发出
  • TEXT_MESSAGE_CONTENT - 为每个消息内容块发出(支持流式传输)
  • TEXT_MESSAGE_END - 文本消息完成时发出
  • TOOL_CALL_START - 工具调用开始时发出
  • TOOL_CALL_ARGS - 为工具调用参数发出(支持流式增量)
  • TOOL_CALL_END - 工具调用完成时发出
  • TOOL_CALL_RESULT - 发出工具调用的结果
提示: 在大多数应用程序中,您无需直接处理这些事件。@ag-kit/server 会从 Agent 消费它们并通过 SSE 为您流式传输。

AGKitStateAnnotation

AGKitStateAnnotation 为 LangGraph 工作流提供状态模式。它包括:
  • messages - LangGraph 消息数组
  • agKit.actions - 暴露给工作流的 AG-Kit 工具数组
用法:
import { AGKitStateAnnotation, type AGKitState } from "@ag-kit/adapter-langgraph";
import { StateGraph } from "@langchain/langgraph";

const workflow = new StateGraph(AGKitStateAnnotation)
  .addNode("node", async (state: AGKitState) => {
    // 访问消息和 AG-Kit 工具
    const messages = state.messages;
    const tools = state.agKit.actions;
    // ...
  });

TDAISaver

TDAISaver 类使用 TDAI Memory 存储提供检查点持久化。
new TDAISaver(config: TDAISaverConfig)
checkpointType
string
用于存储检查点的集合名称。默认值:“checkpoints”
checkpointWritesType
string
用于存储检查点写入的集合名称。默认值:“checkpoint_writes”
...IMemoryClientOptions
object
额外的 TDAI Memory 客户端配置选项。详情请参阅 TDAI Memory 文档。
示例:
import { TDAISaver } from "@ag-kit/adapter-langgraph";

const saver = new TDAISaver({
  checkpointType: "my_checkpoints",
  // TDAI Memory 配置
});

const compiledWorkflow = workflow.compile({
  checkpointer: saver
});

最佳实践

  1. 使用 AGKitStateAnnotation:始终使用 AGKitStateAnnotation 作为您的状态模式以启用 AG-Kit 集成
  2. 从 State 访问工具:使用 state.agKit.actions 在工作流节点中访问 AG-Kit 工具
  3. 检查点:在生产应用中使用检查点进行状态持久化
  4. 错误处理:在应用程序中适当处理 RUN_ERROR 事件
  5. 流式传输:利用事件流提供实时用户反馈

另请参阅