Skip to main content

其核心在于,LangGraph 将智能体工作流建模为图。您使用三个关键组件来定义智能体的行为:
  1. State:表示应用程序当前快照的共享数据结构。它可以是任何数据类型,但通常使用共享状态模式定义。
  2. Nodes:编码智能体逻辑的函数。它们接收当前状态作为输入,执行某些计算或副作用,并返回更新后的状态。
  3. Edges:根据当前状态确定下一个要执行的 Node 的函数。它们可以是条件分支或固定转换。
通过组合 NodesEdges,您可以创建复杂的、循环的工作流,使状态随时间演变。然而,真正的力量来自于 LangGraph 如何管理该状态。 强调一下:NodesEdges 仅仅是函数——它们可以包含 LLM 或只是普通的代码。 简而言之:节点做工作,边告诉下一步做什么 LangGraph 的底层图算法使用 消息传递 来定义通用程序。当一个 Node 完成其操作时,它沿着一条或多条边向其他节点发送消息。这些接收节点随后执行它们的函数,将结果消息传递给下一组节点,过程继续。受 Google 的 Pregel 系统启发,程序以离散的“超级步骤”进行。 一个超级步骤可以被视为对图节点的一次单轮迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点属于不同的超级步骤。在图执行开始时,所有节点都处于 inactive 状态。当节点在其任何传入边(或“通道”)上收到新消息(状态)时,该节点变为 active。活动节点随后运行其函数并响应更新。在每个超级步骤结束时,没有传入消息的节点通过将自己标记为 inactive 来投票 halt。当所有节点都 inactive 且没有消息在传输中时,图执行终止。

StateGraph

StateGraph 类是要使用的主要图类。这是由用户定义的 State 对象参数化的。

编译您的图

要构建您的图,首先定义 state,然后添加 nodesedges,最后编译它。究竟什么是编译您的图以及为什么需要它? 编译是一个相当简单的步骤。它对图的结构提供一些基本检查(例如没有孤立的节点等)。这也是您可以指定运行时参数的地方,如 checkpointers 和 breakpoints。您只需调用 .compile 方法即可编译您的图:
const graph = new StateGraph(StateAnnotation)
  .addNode("nodeA", nodeA)
  .addEdge(START, "nodeA")
  .addEdge("nodeA", END)
  .compile();
必须在使用前编译您的图。

状态

定义图时做的第一件事就是定义图的 StateState 包括 图的模式 以及 reducer 函数,后者指定如何将更新应用到状态。State 的模式将是图中所有 NodesEdges 的输入模式。您使用 StateSchema 类定义状态,该类接受用于各个字段的任何 标准模式(如 Zod),以及特殊值类型如 ReducedValueMessagesValue。所有 Nodes 都会向 State 发出更新,然后使用指定的 reducer 函数应用这些更新。

模式

指定图模式的主要方法是使用 StateSchema 类。模式中的每个字段可以是:
  • 标准模式:用于简单字段(变为“最后一个值”通道,更新时覆盖)
  • ReducedValue:用于需要自定义 reducer 函数的字段(当节点并行运行时)
  • MessagesValue:用于聊天消息列表(预构建了消息感知的 reducer)
  • UntrackedValue:用于不应被检查点的临时状态
import {
  StateSchema,
  ReducedValue,
  MessagesValue,
  UntrackedValue
} from "@langchain/langgraph";
import { z } from "zod/v4";

const AgentState = new StateSchema({
  // 预构建的消息值,带有内置 reducer
  messages: MessagesValue,

  // 简单字段直接使用 Zod 模式
  currentStep: z.string(),

  // 带默认值的字段
  retryCount: z.number().default(0),

  // 用于累积值的自定义 reducer
  allSteps: new ReducedValue(
    z.array(z.string()).default(() => []),
    {
      inputSchema: z.string(),
      reducer: (current, newStep) => [...current, newStep],
    }
  ),

  // 不保存到检查点的临时状态
  tempCache: new UntrackedValue(z.record(z.string(), z.unknown())),
});

// 类型提取
type State = typeof AgentState.State;   // 完整状态类型
type Update = typeof AgentState.Update; // 部分更新类型

// 在图中使用
const graph = new StateGraph(AgentState)
  .addNode("myNode", ...)
  .compile();
默认情况下,图将具有相同的输入和输出模式。如果您想更改此内容,也可以直接指定显式的输入和输出模式。当您有很多键,其中一些明确用于输入,另一些用于输出时,这很有用。

多个模式

通常,所有图节点都与单个模式通信。这意味着它们将读取和写入相同的状态通道。但是,有些情况下我们需要对此有更多的控制:
  • 内部节点可以传递不需要在图的输入/输出中的信息。
  • 我们也可能希望为图使用不同的输入/输出模式。输出可能仅包含单个相关输出键。
可以在图内部定义私有状态通道供内部节点通信。我们可以简单地定义一个私有模式 PrivateState 还可以为图定义显式的输入和输出模式。在这些情况下,我们定义一个包含与图操作相关的 所有 键的“内部”模式。但是,我们也定义 inputoutput 模式,它们是“内部”模式的子集,以约束图的输入和输出。有关更多详细信息,请参阅 定义输入和输出模式 让我们看一个例子:
import { StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const InputState = new StateSchema({
  userInput: z.string(),
});

const OutputState = new StateSchema({
  graphOutput: z.string(),
});

const OverallState = new StateSchema({
  foo: z.string(),
  userInput: z.string(),
  graphOutput: z.string(),
});

const PrivateState = new StateSchema({
  bar: z.string(),
});

const graph = new StateGraph({
  state: OverallState,
  input: InputState,
  output: OutputState,
})
  .addNode("node1", (state) => {
    // 写入 OverallState
    return { foo: state.userInput + " name" };
  })
  .addNode("node2", (state) => {
    // 从 OverallState 读取,写入 PrivateState
    return { bar: state.foo + " is" };
  })
  .addNode(
    "node3",
    (state) => {
      // 从 PrivateState 读取,写入 OutputState
      return { graphOutput: state.bar + " Lance" };
    },
    { input: PrivateState }
  )
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

await graph.invoke({ userInput: "My" });
// { graphOutput: 'My name is Lance' }
这里有两点微妙且重要需要注意:
  1. 我们将 state 作为输入模式传递给 node1。但是,我们写入 foo,这是 OverallState 中的一个通道。我们如何写入不在输入模式中的状态通道?这是因为节点 可以写入图状态中的任何状态通道。图状态是在初始化时定义的状态通道的并集,其中包括 OverallState 以及过滤器 InputStateOutputState
  2. 我们用 StateGraph({ state: OverallState, input: InputState, output: OutputState }) 初始化图。我们如何在 node2 中写入 PrivateState?如果未在 StateGraph 初始化中传递,图如何获得对该模式的访问权限?我们可以这样做是因为 节点也可以声明额外的状态通道,只要存在状态模式定义。在这种情况下,定义了 PrivateState 模式,因此我们可以将 bar 作为图中的新状态通道并写入它。

归约器

归约器对于理解如何将从节点发出的更新应用到 State 至关重要。State 中的每个键都有自己独立的归约器函数。如果没有明确指定归约器函数,则假定对该键的所有更新都应覆盖它。有几种不同类型的归约器,从默认类型的归约器开始:

默认归约器

这两个示例展示了如何使用默认归约器:
Example A
import { StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  foo: z.number(),
  bar: z.array(z.string()),
});
在此示例中,未为任何键指定归约器函数。假设图的输入是: { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。注意 Node 不需要返回整个 State 模式 - 只需要一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["bye"] }
Example B
import { StateSchema, ReducedValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  foo: z.number(),
  bar: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});
在此示例中,我们使用 ReducedValue 指定了第二个键 (bar) 的归约器函数。注意第一个键保持不变。假设图的输入是 { foo: 1, bar: ["hi"] }。然后假设第一个 Node 返回 { foo: 2 }。这被视为对状态的更新。注意 Node 不需要返回整个 State 模式 - 只需要一个更新。应用此更新后,State 将变为 { foo: 2, bar: ["hi"] }。如果第二个节点返回 { bar: ["bye"] },则 State 将变为 { foo: 2, bar: ["hi", "bye"] }。注意这里 bar 键是通过将两个数组连接起来更新的。

未跟踪的值

UntrackedValue 用于在图执行期间存在但应永远不被检查点的状态字段。当图从检查点恢复时,未跟踪的值将重置为其初始状态(或不可用)。 这适用于:
  • 无法序列化的数据库连接
  • 应在恢复时重建的临时缓存
  • 您不想持久化的大型对象
  • 每次都需要新鲜传递的仅运行时配置
import { StateSchema, UntrackedValue, MessagesValue } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: MessagesValue,

  // 未跟踪:如果多个节点在同一步骤中写入会抛出错误(guard: true 是默认值)
  dbConnection: new UntrackedValue<DatabaseConnection>(),

  // 未跟踪且 guard: false 允许多次写入,保留最后一个值
  tempCache: new UntrackedValue(
    z.record(z.string(), z.unknown()),
    { guard: false }
  ),

  // 无模式的未跟踪值(用于最大灵活性)
  runtimeConfig: new UntrackedValue(),
});
行为:
  • 执行期间:值像正常状态一样存储和访问
  • 检查点:未跟踪的值排除在检查点数据之外
  • 恢复:未跟踪的值从头开始(空或带有默认值)
  • 使用 guard: true(默认):如果多个节点在同一步骤中写入则抛出错误
  • 使用 guard: false:允许多次写入,最后一个值获胜
不要对需要在中断或时间旅行之间持久化的数据使用 UntrackedValue。对于持久化数据,请使用常规状态字段或 ReducedValue

类型工具

LangGraph 提供多种类型工具,以便在定义节点和条件边时获得更好的 TypeScript 类型安全性。

GraphNode

使用 GraphNode 对图构建器外部定义的节点函数进行类型处理:
import { GraphNode, StateSchema, Command } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  count: z.number().default(0),
  result: z.string(),
});

// 基本节点 - 接收状态,返回部分更新
const incrementNode: GraphNode<typeof State> = (state) => {
  return { count: state.count + 1 };
};

// 异步节点
const fetchNode: GraphNode<typeof State> = async (state, config) => {
  const response = await fetch(`/api/data/${state.count}`);
  return { result: await response.text() };
};

// 带有 Command 路由的节点 - 指定有效目标
const routerNode: GraphNode<typeof State, "process" | "done"> = (state) => {
  if (state.count >= 10) {
    return new Command({ goto: "done" });
  }
  return new Command({
    update: { count: state.count + 1 },
    goto: "process"
  });
};

State.Node 简写

每个 StateSchema 实例都有一个 Node 属性,提供用于类型处理节点的简写:
const State = new StateSchema({
  messages: MessagesValue,
  step: z.string(),
});

// 以下是等效的:
const myNode1: GraphNode<typeof State> = (state) => ({ step: "done" });
const myNode2: typeof State.Node = (state) => ({ step: "done" });

ConditionalEdgeRouter

在条件边中使用 ConditionalEdgeRouter 进行路由函数(无状态更新,仅路由):
import { ConditionalEdgeRouter, END } from "@langchain/langgraph";

const State = new StateSchema({
  shouldContinue: z.boolean(),
  step: z.string(),
});

// 路由器返回节点名称或 END
const router: ConditionalEdgeRouter<typeof State, "process" | "summarize"> = (state) => {
  if (!state.shouldContinue) {
    return END;
  }
  return state.step === "initial" ? "process" : "summarize";
};

// 在图中使用
graph.addConditionalEdges("check", router);

StateSchema.StateStateSchema.Update

从模式中提取状态和更新类型以在自定义类型定义中使用:
import { StateSchema } from "@langchain/langgraph";

const MyStateSchema = new StateSchema({
  messages: MessagesValue,
  count: z.number().default(0),
});

// 提取完整状态类型
type MyState = typeof MyStateSchema.State;
// { messages: BaseMessage[], count: number }

// 提取更新类型(部分,带有归约器输入类型)
type MyUpdate = typeof MyStateSchema.Update;
// { messages?: Messages, count?: number }

在图状态中处理消息

为什么要使用消息?

大多数现代 LLM 提供商都具有聊天模型接口,接受消息列表作为输入。特别是 LangChain 的 聊天模型接口 接受消息对象列表作为输入。这些消息有多种形式,如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。 有关消息对象的更多信息,请参阅 消息概念指南

在图中使用消息

在许多情况下,将先前的对话历史作为消息列表存储在图状态中很有帮助。为此,您可以使用预建的 MessagesValue,它提供消息感知的归约器,自动处理消息 ID、更新和删除。 MessagesValue 归约器对于告诉图如何在每次状态更新时更新状态中的 Message 对象列表至关重要。如果您不指定归约器,每次状态更新都将用最近提供的值覆盖消息列表。MessagesValue 正确处理此问题:对于全新的消息,它附加到现有列表,对于现有消息(按 ID 匹配),它在原地更新它们。
MessagesValue 实际上是 ReducedValue 的一种特殊情况,预配置了内部 messagesStateReducer 来处理消息列表和更新。这为 LangGraph 图中的聊天消息历史提供了方便的消息感知状态管理。

序列化

除了跟踪消息 ID 外,MessagesValue 还会尝试在接收到 messages 通道上的状态更新时将消息反序列化为 LangChain Message 对象。这允许以下格式发送图输入/状态更新:
// 这是支持的
{
  messages: [new HumanMessage("message")];
}

// 这也支持
{
  messages: [{ role: "human", content: "message" }];
}
由于使用 MessagesValue 时状态更新始终反序列化为 LangChain Messages,您应该使用点符号访问消息属性,如 state.messages.at(-1).content。下面是使用 MessagesValue 的图的示例:
import { StateGraph, StateSchema, MessagesValue } from "@langchain/langgraph";

const State = new StateSchema({
  messages: MessagesValue,
});

const graph = new StateGraph(State)
  ...
messages 字段定义为 MessagesValue,它是带有内置归约器的 BaseMessage 对象列表。通常,需要跟踪的状态不仅仅是消息,所以我们看到人们扩展此状态并添加更多字段,例如:
import { StateSchema, MessagesValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  documents: z.array(z.string()),
});

节点

在 LangGraph 中,节点通常是函数(同步或异步),接受以下参数:
  1. state—图的 状态
  2. config—包含配置信息(如 thread_id)和追踪信息(如 tags)的 RunnableConfig 对象
您可以使用 addNode 方法将节点添加到图中。为了更好的类型安全性,请使用 GraphNode 类型工具或 State.Node 对节点函数进行类型处理:
import { StateGraph, StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  results: z.string(),
});

// 选项 1:使用 GraphNode 类型工具
const myNode: GraphNode<typeof State> = (state, config) => {
  console.log("In node: ", config?.configurable?.user_id);
  return { results: `Hello, ${state.input}!` };
};

// 选项 2:使用 State.Node 简写
const otherNode: typeof State.Node = (state) => {
  return state;
};

const builder = new StateGraph(State)
  .addNode("myNode", myNode)
  .addNode("otherNode", otherNode)
  ...
在幕后,函数被转换为 RunnableLambda,为您的函数添加批量和异步支持,以及 原生追踪和调试 如果您在不指定名称的情况下将节点添加到图中,它将获得等同于函数名的默认名称。
builder.addNode(myNode);
// 然后您可以通过将其引用为 `"myNode"` 来创建到此节点/从此节点的边

START 节点

START 节点是一个特殊节点,代表向图发送用户输入的节点。引用此节点的主要目的是确定哪些节点应该首先被调用。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

END 节点

END 节点是一个特殊节点,代表终止节点。当您想要表示哪些边在完成之后没有操作时,会引用此节点。
import { END } from "@langchain/langgraph";

graph.addEdge("nodeA", END);

节点缓存

LangGraph 支持基于节点输入的任务/节点缓存。要使用缓存:
  • 在编译图(或指定入口点)时指定缓存
  • 为节点指定缓存策略。每个缓存策略支持:
    • keyFunc,用于基于节点输入生成缓存键。
    • ttl,缓存的生存时间(秒)。如果未指定,缓存将永不过期。
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import { InMemoryCache } from "@langchain/langgraph-checkpoint";
import { z } from "zod/v4";

const State = new StateSchema({
  x: z.number(),
  result: z.number(),
});

const expensiveNode: GraphNode<typeof State> = async (state) => {
  // 模拟昂贵操作
  await new Promise((resolve) => setTimeout(resolve, 3000));
  return { result: state.x * 2 };
};

const graph = new StateGraph(State)
  .addNode("expensive_node", expensiveNode, { cachePolicy: { ttl: 3 } })
  .addEdge(START, "expensive_node")
  .compile({ cache: new InMemoryCache() });

await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}}]
await graph.invoke({ x: 5 }, { streamMode: "updates" });
// [{"expensive_node": {"result": 10}, "__metadata__": {"cached": true}}]

边定义了逻辑如何路由以及图如何决定停止。这是您的智能体工作方式以及不同节点相互通信的重要部分。有几种关键的边类型:
  • 普通边:直接从一节点到下一节点。
  • 条件边:调用函数以确定接下来要前往哪个节点。
  • 入口点:用户输入到达时首先调用的节点。
  • 条件入口点:调用函数以确定用户输入到达时首先调用哪些节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,所有这些目标节点将在下一个超级步骤中作为一部分并行执行。

普通边

如果您总是想从节点 A 到节点 B,可以直接使用 addEdge 方法。
graph.addEdge("nodeA", "nodeB");

条件边

如果您想可选地路由到一个或多个边(或可选地终止),可以使用 addConditionalEdges 方法。此方法接受一个节点名称和一个在该节点执行后要调用的“路由函数”:
graph.addConditionalEdges("nodeA", routingFunction);
类似于节点,routingFunction 接受图的当前 state 并返回值。 默认情况下,routingFunction 的返回值用作要将状态发送到下一个的节点名称(或节点列表)。所有这些节点将在下一个超级步骤中作为一部分并行运行。 您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges("nodeA", routingFunction, {
  true: "nodeB",
  false: "nodeC",
});
如果您想在单个函数中结合状态更新和路由,请使用 Command 代替条件边。

入口点

入口点是图启动时运行的第一个节点。您可以使用来自虚拟 START 节点到要执行的第一节点的 addEdge 方法来指定进入图的位置。
import { START } from "@langchain/langgraph";

graph.addEdge(START, "nodeA");

条件入口点

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用来自虚拟 START 节点的 addConditionalEdges 来完成此操作。
import { START } from "@langchain/langgraph";

graph.addConditionalEdges(START, routingFunction);
您可以选择提供一个对象,将 routingFunction 的输出映射到下一个节点的名称。
graph.addConditionalEdges(START, routingFunction, {
  true: "nodeB",
  false: "nodeC",
});

Send

默认情况下,NodesEdges 是预先定义的,并在相同的共享状态上运行。但是,可能存在确切边在事前未知的情况,和/或您可能希望同时存在不同版本的 State。一个常见的例子是 map-reduce 设计模式。在这种设计模式中,第一个节点可能会生成对象列表,您可能希望对所有这些对象应用其他节点。对象的数量可能在事前未知(意味着边的数量可能未知),并且下游 Node 的输入 State 应该不同(每个生成的对象一个)。 为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点名称,第二个是要传递给该节点的状态。
import { Send } from "@langchain/langgraph";

graph.addConditionalEdges("nodeA", (state) => {
  return state.subjects.map(
    (subject) => new Send("generateJoke", { subject })
  );
});

Command

Command 是控制图执行的多功能原语。它接受四个参数:
  • update:应用状态更新(类似于从节点返回更新)。
  • goto:导航到特定节点(类似于 条件边)。
  • graph:从 子图 导航时定位父图。
  • resume:提供值以在 中断 后恢复执行。
Command 在三种上下文中使用:

从节点返回

updategoto

从节点函数返回 Command 以在单步中更新状态并路由到下一个节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "myOtherNode",
  });
});
使用 Command,您还可以实现动态控制流行为(与 条件边 相同):
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  if (state.foo === "bar") {
    return new Command({
      update: { foo: "baz" },
      goto: "myOtherNode",
    });
  }
});
当您既需要更新状态需要路由到不同节点时使用 Command。如果您只需要路由而不更新状态,请使用 条件边 在节点函数中使用 Command 时,您必须在添加节点时添加 ends 参数以指定它可以路由到的节点:
builder.addNode("myNode", myNode, {
  ends: ["myOtherNode", END],
});
Command 仅添加动态边—使用 add_edge / addEdge 定义的静态边仍然执行。例如,如果 node_a 返回 Command(goto="my_other_node"),并且您也有 graph.add_edge("node_a", "node_b"),则 node_bmy_other_node 都将运行。
查看此 操作指南 以获取如何使用 Command 的端到端示例。

graph

如果您正在使用 子图,可以通过在 Command 中指定 graph: Command.PARENT 从子图中的节点导航到父图中的不同节点:
import { Command } from "@langchain/langgraph";

graph.addNode("myNode", (state) => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph", // `otherSubgraph` 是父图中的节点
    graph: Command.PARENT,
  });
});
graph 设置为 Command.PARENT 将导航到最近的父图。当您将更新从子图节点发送到父图节点,针对父图和子图 状态模式 共享的键时,您必须在父图状态中为您正在更新的键定义 归约器
这在实现 多智能体交接 时特别有用。有关详细信息,请查看 导航到父图中的节点

输入到 invoke/stream

new Command({ resume: ... }) 是唯一 intended 作为 invoke()/stream() 输入的 Command 模式。不要使用 new Command({ update: ... }) 作为输入来继续多轮对话——因为传递任何 Command 作为输入会从最新的检查点恢复(即最后运行的步骤,而不是 __start__),如果图已经完成,它将看起来卡住。要在现有线程上继续对话,请传递普通输入对象:
// 错误 - 图从最新检查点恢复
// (最后运行的步骤),看起来卡住
await graph.invoke(new Command({ update: { messages: [{ role: "user", content: "follow up" }] } }), config);

// 正确 - 普通对象从 __start__ 重启
await graph.invoke({ messages: [{ role: "user", content: "follow up" }] }, config);

resume

使用 new Command({ resume: ... }) 提供值并在 中断 后恢复图执行。传递给 resume 的值成为暂停节点内 interrupt() 调用的返回值:
import { Command, interrupt } from "@langchain/langgraph";

const humanReview = async (state: typeof StateAnnotation.State) => {
  // 暂停图并等待值
  const answer = interrupt("Do you approve?");
  return { messages: [{ role: "user", content: answer }] };
};

// 第一次调用 - 命中中断并暂停
const result = await graph.invoke({ messages: [...] }, config);

// 使用值恢复 - interrupt() 调用返回 "yes"
const resumed = await graph.invoke(new Command({ resume: "yes" }), config);
查看 中断概念指南 以获取中断模式的完整详细信息,包括多个中断和验证循环。

从工具返回

您可以从工具返回 Command 以更新图状态和控制流。使用 update 修改状态(例如,保存对话期间查找的客户信息),并使用 goto 在工具完成后路由到特定节点。
在工具内部使用时,goto 添加动态边—已调用工具的节点上已定义的任何静态边仍将执行。
有关详细信息,请参阅 在工具中使用

图迁移

即使使用检查点器跟踪状态,LangGraph 也能轻松处理图定义(节点、边和状态)的迁移。
  • 对于图末尾的线程(即未中断),您可以更改图的整个拓扑(即所有节点和边,移除、添加、重命名等)
  • 对于当前中断的线程,我们支持除重命名/移除节点之外的所有拓扑更改(因为该线程现在可能即将进入不再存在的节点)— 如果这是障碍,请联系我们,我们可以优先考虑解决方案。
  • 对于修改状态,我们完全支持添加和移除键的向后和向前兼容性
  • 重命名的状态键在现有线程中丢失其保存的状态
  • 以不兼容方式更改类型的状态键目前可能导致具有更改前状态的线程出现问题 — 如果这是障碍,请联系我们,我们可以优先考虑解决方案。

运行时上下文

创建图时,您可以为传递给节点的运行时上下文指定 contextSchema。这对于传递不属于图状态的信息到节点很有用。例如,您可能想要传递依赖项,如模型名称或数据库连接。
import { StateGraph, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  input: z.string(),
  output: z.string(),
});

const ContextSchema = z.object({
  llm: z.union([z.literal("openai"), z.literal("anthropic")]),
});

const graph = new StateGraph(State, ContextSchema);
然后您可以使用 context 属性将此配置传递给图。
const config = { context: { llm: "anthropic" } };

await graph.invoke(inputs, config);
然后您可以在节点或条件边中访问和使用此上下文:
import { Runtime, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const nodeA: GraphNode<typeof State> = (state, config) => {
  const llm = getLLM(runtime.context?.llm);
  // ...
  return {};
};
有关配置的完整分解,请参阅 添加运行时配置
graph.addNode("myNode", (state, config) => {
  const llmType = config.context?.llm || "openai";
  const llm = getLLM(llmType);
  return { results: `Hello, ${state.input}!` };
});

递归限制

递归限制设置单次执行期间图可以执行的 超级步骤 的最大数量。一旦达到限制,LangGraph 将抛出 GraphRecursionError。默认情况下此值设置为 25 步。递归限制可以在运行时设置在任何图上,并通过配置对象传递给 invoke/stream。重要的是,recursionLimit 是一个独立的 config 键,不应像所有其他用户定义的配置一样传递在 configurable 键内。参见下面的示例:
await graph.invoke(inputs, {
  recursionLimit: 5,
  context: { llm: "anthropic" },
});

访问和处理递归计数器

当前步骤计数器可在任何节点内的 config.metadata.langgraph_step 中访问,允许在达到递归限制之前主动处理递归。这使得您可以在图逻辑中实现优雅降级策略。

工作原理

步骤计数器存储在 config.metadata.langgraph_step 中。递归限制检查遵循逻辑:step > stop,其中 stop = step + recursionLimit + 1。当超过限制时,LangGraph 抛出 GraphRecursionError

访问当前步骤计数器

您可以在任何节点内访问当前步骤计数器以监控执行进度。
import { RunnableConfig } from "@langchain/core/runnables";
import { StateGraph } from "@langchain/langgraph";

const myNode: GraphNode<typeof State> = async (state, config) => {
  const currentStep = config.metadata?.langgraph_step;
  console.log(`Currently on step: ${currentStep}`);
  return state;
}
设计图时包含明确的终止条件,并将 GraphRecursionError 作为安全网捕获:
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});

// 构建带有明确终止逻辑的图
const graph = new StateGraph(State)
  .addNode("reasoning", async (state) => {
    // 正常处理 - 设计图时包含明确的终止条件
    return {
      messages: ["thinking..."]
    };
  })
  .addConditionalEdges("reasoning", (state) => {
    // 在此处添加您的终止条件
    if (state.messages.length >= 5) {
      return END;
    }
    return "reasoning";
  });

const app = graph.compile();

// 捕获 GraphRecursionError 作为安全网
try {
  const result = await app.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion limit reached, handling gracefully");
    // 处理错误 - 返回部分结果、通知用户等。
  }
}

主动与被动方法

处理递归限制主要有两种方法:主动(在图内监控)和被动(在外部捕获错误)。
import {
  StateGraph,
  StateSchema,
  ReducedValue,
  GraphNode,
  ConditionalEdgeRouter,
  END,
  GraphRecursionError
} from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  messages: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});


// 构建带有明确终止逻辑的图
const builder = new StateGraph(State)
  .addNode("agent", async (state) => {
    return {
      messages: ["Processing..."]
    };
  })
  .addConditionalEdges("agent", (state) => {
    // 在图中设计终止条件
    if (state.messages.length >= 5) {
      return END;
    }
    return "agent";
  });

const graph = builder.compile();

// 被动方法 - 捕获 GraphRecursionError 作为安全网
try {
  const result = await graph.invoke(
    { messages: [] },
    { recursionLimit: 10 }
  );
} catch (error) {
  if (error instanceof GraphRecursionError) {
    // 在图执行失败后在外部处理
    console.log("Recursion limit exceeded, handling gracefully");
  }
}
被动方法在超过限制后捕获 GraphRecursionError。设计图时包含明确的终止条件以避免首先达到限制。
方法检测处理控制流
被动(捕获 GraphRecursionError超过限制之后在 try/catch 外的图外图执行终止
被动优势:
  • 简单实现
  • 无需修改图逻辑
  • 集中式错误处理

其他可用元数据

除了 langgraph_step,以下元数据也可在 config.metadata 中获取:
const inspectMetadata: GraphNode<typeof State> = async (state, config) => {
  const metadata = config.metadata;

  console.log(`Step: ${metadata?.langgraph_step}`);
  console.log(`Node: ${metadata?.langgraph_node}`);
  console.log(`Triggers: ${metadata?.langgraph_triggers}`);
  console.log(`Path: ${metadata?.langgraph_path}`);
  console.log(`Checkpoint NS: ${metadata?.langgraph_checkpoint_ns}`);

  return state;
}

可视化

能够可视化图通常很好,尤其是随着它们变得越来越复杂。LangGraph 自带几种内置方式来可视化图。有关更多信息,请参阅 可视化您的图

可观测性与追踪

要追踪、调试和评估您的智能体,请使用 LangSmith

了解更多