Skip to main content
本指南演示了 LangGraph 的 Graph API 的基础知识。它将介绍 状态,以及组合常见的图结构,如 序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流的 Send API 以及用于将状态更新与节点间的“跳转”相结合的 Command API

设置

安装 langgraph
npm install @langchain/langgraph
设置 LangSmith 以获得更好的调试体验注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用——有关如何入门的更多信息,请阅读 文档

定义和更新状态

在此我们展示如何在 LangGraph 中定义和更新 状态。我们将演示:
  1. 如何使用状态来定义图的 模式
  2. 如何使用 归约器 来控制如何处理状态更新。

定义状态

LangGraph 中的 状态 是使用 StateSchema 类定义的。这提供了一个统一的 API,接受 标准模式(如 Zod)作为各个字段,以及特殊值类型如 ReducedValueMessagesValueUntrackedValue 默认情况下,图将具有相同的输入和输出模式,状态决定了该模式。有关如何定义不同的输入和输出模式,请参阅 定义输入和输出模式 让我们考虑一个使用 消息 的简单示例。这代表了适用于许多 LLM 应用的通用状态表述。有关更多详细信息,请参阅我们的 概念页面
import { StateSchema, MessagesValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  extraField: z.number(),
});
此状态跟踪 消息 对象列表,以及一个额外的整数字段。

更新状态

让我们构建一个包含单个节点的示例图。我们的 节点 只是一个读取图的 state 并对其进行更新的 TypeScript 函数。此函数的第一个参数始终是 state:
import { AIMessage } from "@langchain/core/messages";
import { GraphNode } from "@langchain/langgraph";

const node: GraphNode<typeof State> = (state) => {
  const messages = state.messages;
  const newMessage = new AIMessage("Hello!");
  return { messages: [newMessage], extraField: 10 };
};
此节点只是将一条消息附加到我们的消息列表中(归约器处理连接),并填充一个额外字段。
节点应直接返回对状态的更新,而不是修改状态。
接下来让我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个在此状态下操作的图。然后我们使用 addNode 来填充我们的图。
import { StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge("__start__", "node")
  .compile();
LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查我们的图。有关可视化的详细信息,请参阅 可视化您的图
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
在这种情况下,我们的图只执行单个节点。让我们继续进行简单的调用:
import { HumanMessage } from "@langchain/core/messages";

const result = await graph.invoke({ messages: [new HumanMessage("Hi")], extraField: 0 });
console.log(result);
{ messages: [HumanMessage { content: 'Hi' }, AIMessage { content: 'Hello!' }], extraField: 10 }
请注意:
  • 我们通过更新状态的单个键来启动调用。
  • 我们在调用结果中接收整个状态。
为了方便起见,我们经常通过日志记录来检查 消息对象 的内容:
for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

使用归约器处理状态更新

状态中的每个键都可以有自己的独立 归约器 函数,它控制如何应用来自节点的更新。如果没有显式指定归约器函数,则假设对该键的所有更新都应覆盖它。 在之前的示例中,我们使用了已经内置归约器的 MessagesValue。对于自定义字段,您可以使用 ReducedValue 来定义如何应用更新。 在之前的示例中,我们的节点通过在消息列表中追加一条消息来更新状态中的 "messages" 键。MessagesValue 归约器会自动处理此操作:
import { StateSchema, MessagesValue, ReducedValue } from "@langchain/langgraph";
import * as z from "zod";

// MessagesValue 已经有内置归约器
const State = new StateSchema({
  messages: MessagesValue,
  extraField: z.number(),
});
我们的节点只需返回新消息即可(归约器处理连接):
import { GraphNode } from "@langchain/langgraph";

const node: GraphNode<typeof State> = (state) => {
  const newMessage = new AIMessage("Hello!");
  return { messages: [newMessage], extraField: 10 };
};
import { START } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge(START, "node")
  .compile();

const result = await graph.invoke({ messages: [new HumanMessage("Hi")] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!

MessagesValue

在实践中,更新消息列表还有其他注意事项:
  • 我们可能希望更新状态中的现有消息。
  • 我们可能希望接受 消息格式 的简写,例如 OpenAI 格式
LangGraph 包含内置的 MessagesValue,可处理这些注意事项:
import { StateSchema, StateGraph, MessagesValue, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  extraField: z.number(),
});

const node: GraphNode<typeof State> = (state) => {
  const newMessage = new AIMessage("Hello!");
  return { messages: [newMessage], extraField: 10 };
};

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge(START, "node")
  .compile();
const inputMessage = { role: "user", content: "Hi" };

const result = await graph.invoke({ messages: [inputMessage] });

for (const message of result.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: Hi
ai: Hello!
这是涉及 聊天模型 的应用程序的通用状态表示。LangGraph 包含预构建的 MessagesValue 以供方便使用,因此我们可以拥有:
import { StateSchema, MessagesValue } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  extraField: z.number(),
});

定义输入和输出模式

默认情况下,StateGraph 使用单一模式运行,所有节点都预期使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。 当指定不同的模式时,内部仍将使用模式用于节点之间的通信。输入模式确保提供的输入符合预期的结构,而输出模式过滤内部数据,仅根据定义的模式返回相关信息。 下面,我们将看到如何定义不同的输入和输出模式。
import { StateGraph, StateSchema, GraphNode, START, END } from "@langchain/langgraph";
import * as z from "zod";

// 定义输入模式
const InputState = new StateSchema({
  question: z.string(),
});

// 定义输出模式
const OutputState = new StateSchema({
  answer: z.string(),
});

// 定义整体模式,结合输入和输出
const OverallState = new StateSchema({
  question: z.string(),
  answer: z.string(),
});

// 定义处理输入的节点
const answerNode: GraphNode<typeof OverallState> = (state) => {
  // 示例答案和一个额外键
  return { answer: "bye", question: state.question };
};

// 构建图,指定输入和输出模式
const graph = new StateGraph({
  input: InputState,
  output: OutputState,
  state: OverallState,
})
  .addNode("answerNode", answerNode)
  .addEdge(START, "answerNode")
  .addEdge("answerNode", END)
  .compile();

// 使用输入调用图并打印结果
console.log(await graph.invoke({ question: "hi" }));
{ answer: 'bye' }
请注意,调用的输出仅包含输出模式。

在节点之间传递私有状态

在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图主模式一部分的信息。这种私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。 下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三个步骤(node_3)只能访问公共整体状态。
import { StateGraph, StateSchema, GraphNode, START, END } from "@langchain/langgraph";
import * as z from "zod";

// 图的整体状态(这是在节点间共享的公共状态)
const OverallState = new StateSchema({
  a: z.string(),
});

// node1 的输出包含不属于整体状态的私有数据
const Node1Output = new StateSchema({
  privateData: z.string(),
});

// node2 的输入仅请求 node1 之后可用的私有数据
const Node2Input = new StateSchema({
  privateData: z.string(),
});

// 私有数据仅在 node1 和 node2 之间共享
const node1: GraphNode<typeof OverallState> = (state) => {
  const output = { privateData: "set by node1" };
  console.log(`进入节点 'node1':\n\t输入:${JSON.stringify(state)}\n\t返回:${JSON.stringify(output)}`);
  return output;
};

const node2: GraphNode<typeof Node2Input> = (state) => {
  const output = { a: "set by node2" };
  console.log(`进入节点 'node2':\n\t输入:${JSON.stringify(state)}\n\t返回:${JSON.stringify(output)}`);
  return output;
};

// node3 只能访问整体状态(无法访问 node1 的私有数据)
const node3: GraphNode<typeof OverallState> = (state) => {
  const output = { a: "set by node3" };
  console.log(`进入节点 'node3':\n\t输入:${JSON.stringify(state)}\n\t返回:${JSON.stringify(output)}`);
  return output;
};

// 按顺序连接节点
// node2 接受来自 node1 的私有数据,而
// node3 看不到私有数据。
const graph = new StateGraph(OverallState)
  .addNode("node1", node1)
  .addNode("node2", node2, { input: Node2Input })
  .addNode("node3", node3)
  .addEdge(START, "node1")
  .addEdge("node1", "node2")
  .addEdge("node2", "node3")
  .addEdge("node3", END)
  .compile();

// 使用初始状态调用图
const response = await graph.invoke({ a: "set at start" });

console.log(`\n图调用输出:${JSON.stringify(response)}`);
进入节点 'node1':
	输入:{"a":"set at start"}。
	返回:{"privateData":"set by node1"}
进入节点 'node2':
	输入:{"privateData":"set by node1"}。
	返回:{"a":"set by node2"}
进入节点 'node3':
	输入:{"a":"set by node2"}。
	返回:{"a":"set by node3"}

图调用输出:{"a":"set by node3"}

替代状态定义

虽然 StateSchema 是定义状态的推荐方法,但 LangGraph 支持几种其他方法。本节涵盖所有可用选项。

通道 API

通道 API 提供对状态管理的低级控制。LangGraph 提供几种内置通道类型:
通道类型行为用例
LastValue存储最新值被覆盖的简单字段
BinaryOperatorAggregate使用归约器函数组合值累积值(计数器、列表)
Topic将所有值收集到序列中事件流、审计日志
EphemeralValue在超级步骤之间重置的值临时计算状态
使用对象简写: 当您传递具有 reducerdefault 的对象时,它会创建 BinaryOperatorAggregate 通道。传递 null 会创建 LastValue 通道:
import { BaseMessage } from "@langchain/core/messages";
import { StateGraph } from "@langchain/langgraph";

interface WorkflowState {
  messages: BaseMessage[];
  question: string;
  answer: string;
}

const workflow = new StateGraph<WorkflowState>({
  channels: {
    // BinaryOperatorAggregate:使用归约器组合值
    messages: {
      reducer: (current, update) => current.concat(update),
      default: () => [],
    },
    // LastValue:存储最新值(null = 无归约器)
    question: null,
    answer: null,
  },
});
直接使用通道类: 为了获得更大的控制权,您可以直接实例化通道类:
import { BaseMessage } from "@langchain/core/messages";
import { StateGraph, LastValue, BinaryOperatorAggregate, Topic } from "@langchain/langgraph";

interface WorkflowState {
  messages: BaseMessage[];
  question: string;
  events: string[];
}

const workflow = new StateGraph<WorkflowState>({
  channels: {
    messages: new BinaryOperatorAggregate<BaseMessage[]>(
      (current, update) => current.concat(update),
      () => []
    ),
    question: new LastValue<string>(),
    // Topic 收集执行期间推送的所有值
    events: new Topic<string>(),
  },
});

Annotation.Root

Annotation.Root 提供了一种声明式方式来定义带有归约器的状态。它与 StateSchema 类似,但使用不同的语法:
import { BaseMessage } from "@langchain/core/messages";
import { Annotation, StateGraph, messagesStateReducer } from "@langchain/langgraph";

const State = Annotation.Root({
  messages: Annotation<BaseMessage[]>({
    reducer: messagesStateReducer,
    default: () => [],
  }),
  question: Annotation<string>(),
  count: Annotation<number>({
    reducer: (current, update) => current + update,
    default: () => 0,
  }),
});

const graph = new StateGraph(State);

带 Zod v3 的对象

当使用 Zod v3 时,您可以使用普通 z.object() 模式定义状态。LangGraph 扩展了 Zod v3,提供了 .reducer().metadata() 方法的 .langgraph 插件:
import { z } from "zod/v3";
import { BaseMessage } from "@langchain/core/messages";
import { StateGraph, messagesStateReducer } from "@langchain/langgraph";

const State = z.object({
  // 使用 .langgraph.reducer() 附加归约器函数
  messages: z
    .array(z.custom<BaseMessage>())
    .default([])
    .langgraph.reducer(messagesStateReducer),
  // 简单字段直接工作(最后写入获胜)
  question: z.string().optional(),
  answer: z.string().optional(),
  // 用于累积值的自定义归约器
  count: z
    .number()
    .default(0)
    .langgraph.reducer((current, update) => current + update),
});

const graph = new StateGraph(State);

带 Zod v4 的对象

Zod v4 使用基于注册表的方法。使用 LangGraph 注册表将元数据附加到模式字段:
import * as z from "zod";
import { BaseMessage } from "@langchain/core/messages";
import { StateGraph, MessagesZodMeta, messagesStateReducer } from "@langchain/langgraph";
import { registry } from "@langchain/langgraph/zod";

const State = z.object({
  // 使用 .register() 配合 LangGraph 注册表和 MessagesZodMeta
  messages: z
    .array(z.custom<BaseMessage>())
    .default([])
    .register(registry, MessagesZodMeta),
  // 简单字段直接工作(最后写入获胜)
  question: z.string().optional(),
  answer: z.string().optional(),
  // 通过注册表元数据的自定义归约器
  count: z
    .number()
    .default(0)
    .register(registry, { reducer: (current: number, update: number) => current + update }),
});

const graph = new StateGraph(State);

比较表

方法归约器类型安全Zod 版本推荐
StateSchema✅ 内置✅ 完整v3 或 v4✅ 是
通道 API✅ 手动⚠️ 部分N/A高级用例
Annotation.Root✅ 内置✅ 完整N/A遗留
Zod v3 + .langgraph✅ 通过插件✅ 完整仅限 v3遗留
Zod v4 + 注册表✅ 通过注册表✅ 完整仅限 v4遗留

添加运行时配置

有时您希望在调用图时能够配置它。例如,您可能希望在运行时指定要使用什么 LLM 或系统提示,而 无需将这些参数污染到图状态中 要添加运行时配置:
  1. 指定配置的 schema
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递给图。
下面是一个简单示例:
import { StateGraph, StateSchema, GraphNode, END, START } from "@langchain/langgraph";
import * as z from "zod";

// 1. 指定配置 schema
const ContextSchema = z.object({
  myRuntimeValue: z.string(),
});

// 2. 定义一个在节点中访问配置的图
const State = new StateSchema({
  myStateValue: z.number(),
});

const node: GraphNode<typeof State> = (state, runtime) => {
  if (runtime?.context?.myRuntimeValue === "a") {
    return { myStateValue: 1 };
  } else if (runtime?.context?.myRuntimeValue === "b") {
    return { myStateValue: 2 };
  } else {
    throw new Error("未知值。");
  }
};

const graph = new StateGraph(State, ContextSchema)
  .addNode("node", node)
  .addEdge(START, "node")
  .addEdge("node", END)
  .compile();

// 3. 在运行时传入配置:
console.log(await graph.invoke({}, { context: { myRuntimeValue: "a" } }));
console.log(await graph.invoke({}, { context: { myRuntimeValue: "b" } }));
{ myStateValue: 1 }
{ myStateValue: 2 }
下面我们演示一个实际示例,在其中配置在运行时使用什么 LLM。我们将同时使用 OpenAI 和 Anthropic 模型。
import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, END } from "@langchain/langgraph";
import * as z from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
});

const State = new StateSchema({
  messages: MessagesValue,
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-haiku-4-5-20251001" }),
  openai: new ChatOpenAI({ model: "gpt-4.1-mini" }),
};

const callModel: GraphNode<typeof State> = async (state, config) => {
  const modelProvider = config?.configurable?.modelProvider || "anthropic";
  const model = MODELS[modelProvider as keyof typeof MODELS];
  const response = await model.invoke(state.messages);
  return { messages: [response] };
};

const graph = new StateGraph(State, ConfigSchema)
  .addNode("model", callModel)
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// 用法
const inputMessage = { role: "user", content: "hi" };
// 没有配置时,使用默认值(Anthropic)
const response1 = await graph.invoke({ messages: [inputMessage] });
// 或者,可以设置 OpenAI
const response2 = await graph.invoke(
  { messages: [inputMessage] },
  { configurable: { modelProvider: "openai" } },
);

console.log(response1.messages.at(-1)?.response_metadata?.model);
console.log(response2.messages.at(-1)?.response_metadata?.model);
claude-haiku-4-5-20251001
gpt-4.1-mini-2025-04-14
下面我们演示一个实际示例,在其中配置两个参数:在运行时使用的 LLM 和系统消息。
import { ChatOpenAI } from "@langchain/openai";
import { ChatAnthropic } from "@langchain/anthropic";
import { SystemMessage } from "@langchain/core/messages";
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, END } from "@langchain/langgraph";
import * as z from "zod";

const ConfigSchema = z.object({
  modelProvider: z.string().default("anthropic"),
  systemMessage: z.string().optional(),
});

const State = new StateSchema({
  messages: MessagesValue,
});

const MODELS = {
  anthropic: new ChatAnthropic({ model: "claude-haiku-4-5-20251001" }),
  openai: new ChatOpenAI({ model: "gpt-4.1-mini" }),
};

const callModel: GraphNode<typeof State> = async (state, config) => {
  const modelProvider = config?.configurable?.modelProvider || "anthropic";
  const systemMessage = config?.configurable?.systemMessage;

  const model = MODELS[modelProvider as keyof typeof MODELS];
  let messages = state.messages;

  if (systemMessage) {
    messages = [new SystemMessage(systemMessage), ...messages];
  }

  const response = await model.invoke(messages);
  return { messages: [response] };
};

const graph = new StateGraph(State, ConfigSchema)
  .addNode("model", callModel)
  .addEdge(START, "model")
  .addEdge("model", END)
  .compile();

// 用法
const inputMessage = { role: "user", content: "hi" };
const response = await graph.invoke(
  { messages: [inputMessage] },
  {
    configurable: {
      modelProvider: "openai",
      systemMessage: "Respond in Italian."
    }
  }
);

for (const message of response.messages) {
  console.log(`${message.getType()}: ${message.content}`);
}
human: hi
ai: Ciao! Come posso aiutarti oggi?

添加重试策略

有许多用例可能需要您的节点具有自定义重试策略,例如如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您将重试策略添加到节点。 要配置重试策略,请将 retryPolicy 参数传递给 addNoderetryPolicy 参数接受 RetryPolicy 对象。下面我们使用默认参数实例化 RetryPolicy 对象并将其与节点关联:
import { RetryPolicy } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("nodeName", nodeFunction, { retryPolicy: {} })
  .compile();
默认情况下,重试策略会在除以下之外的任何异常上重试:
  • TypeError
  • SyntaxError
  • ReferenceError
考虑一个我们从 SQL 数据库读取的示例。下面我们将两个不同的重试策略传递给节点:
import Database from "better-sqlite3";
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, END } from "@langchain/langgraph";
import { AIMessage } from "@langchain/core/messages";

const State = new StateSchema({
  messages: MessagesValue,
});

// 创建内存数据库
const db: typeof Database.prototype = new Database(":memory:");

const model = new ChatAnthropic({ model: "claude-3-5-sonnet-20240620" });

const callModel: GraphNode<typeof State> = async (state) => {
  const response = await model.invoke(state.messages);
  return { messages: [response] };
};

const queryDatabase: GraphNode<typeof State> = async (state) => {
  const queryResult: string = JSON.stringify(
    db.prepare("SELECT * FROM Artist LIMIT 10;").all(),
  );

  return { messages: [new AIMessage({ content: "queryResult" })] };
};

const workflow = new StateGraph(State)
  // 定义我们将循环的两个节点
  .addNode("call_model", callModel, { retryPolicy: { maxAttempts: 5 } })
  .addNode("query_database", queryDatabase, {
    retryPolicy: {
      retryOn: (e: any): boolean => {
        if (e instanceof Database.SqliteError) {
          // 在 "SQLITE_BUSY" 错误上重试
          return e.code === "SQLITE_BUSY";
        }
        return false; // 不在其他错误上重试
      },
    },
  })
  .addEdge(START, "call_model")
  .addEdge("call_model", "query_database")
  .addEdge("query_database", END);

const graph = workflow.compile();

创建步骤序列

先决条件 本指南假设您熟悉上面关于 状态 的部分。
在此我们演示如何构建简单的步骤序列。我们将展示:
  1. 如何构建顺序图
  2. 构建类似图的内置简写。
要添加节点序列,我们使用 .addNode.addEdge 方法:
import { START, StateGraph } from "@langchain/langgraph";

const builder = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3");
LangGraph 使为您的应用程序添加底层持久层变得容易。 这允许在节点执行之间对状态进行检查点,因此您的 LangGraph 节点管理:它们还决定执行步骤如何 流式传输,以及如何使用 Studio 可视化和调试您的应用程序。让我们演示一个端到端示例。我们将创建三个步骤的序列:
  1. 在状态的键中填充值
  2. 更新相同的值
  3. 填充不同的值
首先让我们定义我们的 状态。这管理图的 模式,还可以指定如何应用更新。有关更多详细信息,请参阅 使用归约器处理状态更新在我们的例子中,我们将只跟踪两个值:
import { StateSchema, GraphNode } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  value1: z.string(),
  value2: z.number(),
});
我们的 节点 只是读取图的 state 并对其进行更新的 TypeScript 函数。此函数的第一个参数始终是 state:
const step1: GraphNode<typeof State> = (state) => {
  return { value1: "a" };
};

const step2: GraphNode<typeof State> = (state) => {
  const currentValue1 = state.value1;
  return { value1: `${currentValue1} b` };
};

const step3: GraphNode<typeof State> = (state) => {
  return { value2: 10 };
};
请注意,在向状态发出更新时,每个节点只需指定其希望更新的键的值。默认情况下,这将 覆盖 相应键的值。您还可以使用 归约器 来控制如何处理更新——例如,您可以将连续更新附加到键。有关更多详细信息,请参阅 使用归约器处理状态更新
最后,我们定义图。我们使用 StateGraph 来定义一个在此状态下操作的图。然后我们将使用 addNodeaddEdge 来填充我们的图并定义其控制流。
import { START, StateGraph } from "@langchain/langgraph";

const graph = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addNode("step3", step3)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", "step3")
  .compile();
指定自定义名称 您可以使用 .addNode 为节点指定自定义名称:
const graph = new StateGraph(State)
.addNode("myNode", step1)
.compile();
请注意:
  • .addEdge 接受节点的名称,对于函数默认为 node.name
  • 我们必须指定图的入口点。为此,我们添加一条带有 START 节点 的边。
  • 当没有更多节点可执行时,图停止。
我们接下来 编译 我们的图。这提供了对图结构的几个基本检查(例如,识别孤立节点)。如果我们通过 检查点器 为我们的应用程序添加持久性,它也将在此处传递。LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查我们的序列。有关可视化的详细信息,请参阅 可视化您的图
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
让我们继续进行简单的调用:
const result = await graph.invoke({ value1: "c" });
console.log(result);
{ value1: 'a b', value2: 10 }
请注意:
  • 我们通过为单个状态键提供值来启动调用。我们必须始终至少提供一个键的值。
  • 我们传入的值被第一个节点覆盖。
  • 第二个节点更新了该值。
  • 第三个节点填充了不同的值。

创建分支

节点的并行执行对于加快整体图操作至关重要。LangGraph 原生支持节点的并行执行,可以显著提高基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用标准边和 条件边。以下是一些示例,展示如何添加适合您的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。使用我们的状态,我们指定归约器 add 操作。这将组合或累积特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用归约器更新状态的更多详细信息,请参阅上面的 状态归约器 部分。
import { StateGraph, StateSchema, ReducedValue, GraphNode, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  // 归约器使其变为追加模式
  aggregate: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});

const nodeA: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "A"`);
  return { aggregate: ["A"] };
};

const nodeB: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "B"`);
  return { aggregate: ["B"] };
};

const nodeC: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "C"`);
  return { aggregate: ["C"] };
};

const nodeD: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "D"`);
  return { aggregate: ["D"] };
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addNode("d", nodeD)
  .addEdge(START, "a")
  .addEdge("a", "b")
  .addEdge("a", "c")
  .addEdge("b", "d")
  .addEdge("c", "d")
  .addEdge("d", END)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
使用归约器,您可以看到每个节点中添加的值都被累积了。
const result = await graph.invoke({
  aggregate: [],
});
console.log(result);
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
{ aggregate: ['A', 'B', 'C', 'D'] }
在上述示例中,节点 "b""c" 在同一个 超级步骤 中并发执行。因为它们在同一步骤中,所以节点 "d""b""c" 完成后执行。重要的是,来自并行超级步骤的更新可能不会按一致的顺序排列。如果您需要来自并行超级步骤的一致、预定顺序的更新,您应该将输出写入状态中的单独字段,并附带用于排序的值。
LangGraph 在 超级步骤 内执行节点,这意味着虽然并行分支并行执行,但整个超级步骤是 事务性 的。如果这些分支中的任何一个抛出异常,没有任何 更新应用于状态(整个超级步骤出错)。重要的是,当使用 检查点器 时,超级步骤内成功节点的結果会被保存,并且在恢复时不会重复。如果您有容易出错的(也许想处理不稳定的 API 调用)节点,LangGraph 提供两种解决方法:
  1. 您可以在节点内编写常规 Python 代码来捕获和处理异常。
  2. 您可以设置 retry_policy 以指导图重试抛出某些类型异常的节点。只有失败的分支会被重试,因此您无需担心执行冗余工作。
结合这两者,您可以执行并行执行并完全控制异常处理。
设置最大并发数 您可以通过在调用图时在 配置 中设置 max_concurrency 来控制最大并发任务数。
const result = await graph.invoke({ value1: "c" }, {configurable: {max_concurrency: 10}});

条件分支

如果您的扇出应根据状态在运行时变化,您可以使用 addConditionalEdges 使用图状态选择一个或多个路径。见下方示例,其中节点 a 生成确定下一个节点的状态更新。
import { StateGraph, StateSchema, ReducedValue, GraphNode, ConditionalEdgeRouter, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  aggregate: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
  // 向状态添加一个键。我们将设置此键以确定
  // 我们如何分支。
  which: z.string(),
});

const nodeA: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "A"`);
  return { aggregate: ["A"], which: "c" };
};

const nodeB: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "B"`);
  return { aggregate: ["B"] };
};

const nodeC: GraphNode<typeof State> = (state) => {
  console.log(`向 ${state.aggregate} 添加 "C"`);
  return { aggregate: ["C"] };
};

const conditionalEdge: ConditionalEdgeRouter<typeof State, "b" | "c"> = (state) => {
  // 在此填写任意逻辑,使用状态
  // 来确定下一个节点
  return state.which as "b" | "c";
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addNode("c", nodeC)
  .addEdge(START, "a")
  .addEdge("b", END)
  .addEdge("c", END)
  .addConditionalEdges("a", conditionalEdge)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
const result = await graph.invoke({ aggregate: [] });
console.log(result);
Adding "A" to []
Adding "C" to ['A']
{ aggregate: ['A', 'C'], which: 'c' }
您的条件边可以路由到多个目标节点。例如:
const routeBcOrCd: ConditionalEdgeRouter<typeof State, "b" | "c" | "d"> = (state) => {
  if (state.which === "cd") {
    return ["c", "d"];
  }
  return ["b", "c"];
};

Map-Reduce 和 Send API

LangGraph 支持使用 Send API 进行 map-reduce 和其他高级分支模式。以下是如何使用它的示例:
import { StateGraph, StateSchema, ReducedValue, GraphNode, START, END, Send } from "@langchain/langgraph";
import * as z from "zod";

const OverallState = new StateSchema({
  topic: z.string(),
  subjects: z.array(z.string()),
  jokes: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
  bestSelectedJoke: z.string(),
});

const generateTopics: GraphNode<typeof OverallState> = (state) => {
  return { subjects: ["lions", "elephants", "penguins"] };
};

const generateJoke: GraphNode<typeof OverallState> = (state) => {
  const jokeMap: Record<string, string> = {
    lions: "Why don't lions like fast food? Because they can't catch it!",
    elephants: "Why don't elephants use computers? They're afraid of the mouse!",
    penguins: "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
  };
  return { jokes: [jokeMap[state.subject]] };
};

const continueToJokes: ConditionalEdgeRouter<typeof OverallState, "generateJoke"> = (state) => {
  return state.subjects.map((subject) => new Send("generateJoke", { subject }));
};

const bestJoke: GraphNode<typeof OverallState> = (state) => {
  return { bestSelectedJoke: "penguins" };
};

const graph = new StateGraph(OverallState)
  .addNode("generateTopics", generateTopics)
  .addNode("generateJoke", generateJoke)
  .addNode("bestJoke", bestJoke)
  .addEdge(START, "generateTopics")
  .addConditionalEdges("generateTopics", continueToJokes)
  .addEdge("generateJoke", "bestJoke")
  .addEdge("bestJoke", END)
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
// 调用图:这里我们调用它以生成笑话列表
for await (const step of await graph.stream({ topic: "animals" })) {
  console.log(step);
}
{ generateTopics: { subjects: [ 'lions', 'elephants', 'penguins' ] } }
{ generateJoke: { jokes: [ "Why don't lions like fast food? Because they can't catch it!" ] } }
{ generateJoke: { jokes: [ "Why don't elephants use computers? They're afraid of the mouse!" ] } }
{ generateJoke: { jokes: [ "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice." ] } }
{ bestJoke: { bestSelectedJoke: 'penguins' } }

创建和控制循环

在创建带有循环的图时,我们需要一种终止执行的机制。这通常是通过添加 条件边 来实现的,一旦达到某些终止条件,该边就会路由到 END 节点。 您也可以在调用或流式传输图时设置图递归限制。递归限制设置图在抛出错误之前允许执行的 超级步骤 数量。有关 递归限制概念 的更多信息。 让我们考虑一个简单的带有循环的图,以更好地了解这些机制如何工作。
若要返回状态的最后一个值而不是收到递归限制错误,请参阅 下一节
创建循环时,您可以包含指定终止条件的条件边:
const route: ConditionalEdgeRouter<typeof State, "b"> = (state) => {
  if (terminationCondition(state)) {
    return END;
  } else {
    return "b";
  }
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();
要控制递归限制,请在配置中指定 "recursionLimit"。这将抛出 GraphRecursionError,您可以捕获并处理:
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke(inputs, { recursionLimit: 3 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
import { StateGraph, StateSchema, ReducedValue, GraphNode, ConditionalEdgeRouter, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  // 归约器使其变为追加模式
  aggregate: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (x, y) => x.concat(y) }
  ),
});

const nodeA: GraphNode<typeof State> = (state) => {
  console.log(`节点 A 看到 ${state.aggregate}`);
  return { aggregate: ["A"] };
};

const nodeB: GraphNode<typeof State> = (state) => {
  console.log(`节点 B 看到 ${state.aggregate}`);
  return { aggregate: ["B"] };
};

// 定义边
const route: ConditionalEdgeRouter<typeof State, "b"> = (state) => {
  if (state.aggregate.length < 7) {
    return "b";
  } else {
    return END;
  }
};

const graph = new StateGraph(State)
  .addNode("a", nodeA)
  .addNode("b", nodeB)
  .addEdge(START, "a")
  .addConditionalEdges("a", route)
  .addEdge("b", "a")
  .compile();
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
此架构类似于 ReAct agent,其中节点 "a" 是调用工具的模型,节点 "b" 代表工具。 在我们的 route 条件边中,我们指定在状态中的 "aggregate" 列表超过阈值长度后我们应该结束。 调用图时,我们看到我们在达到终止条件之前在节点 "a""b" 之间交替。
const result = await graph.invoke({ aggregate: [] });
console.log(result);
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']
{ aggregate: ['A', 'B', 'A', 'B', 'A', 'B', 'A'] }

施加递归限制

在某些应用中,我们无法保证将达到给定的终止条件。在这些情况下,我们可以设置图的 递归限制。这将在给定数量的 超级步骤 后抛出 GraphRecursionError。然后我们可以捕获并处理此异常:
import { GraphRecursionError } from "@langchain/langgraph";

try {
  await graph.invoke({ aggregate: [] }, { recursionLimit: 4 });
} catch (error) {
  if (error instanceof GraphRecursionError) {
    console.log("Recursion Error");
  }
}
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Recursion Error

使用 Command 结合控制流和状态更新

结合控制流(边)和状态更新(节点)可能很有用。例如,您可能希望 同时 执行状态更新 决定下一个节点去哪里。LangGraph 提供了一种方法,即从节点函数返回 Command 对象:
import { Command } from "@langchain/langgraph";

const myNode = (state: State): Command => {
  return new Command({
    // 状态更新
    update: { foo: "bar" },
    // 控制流
    goto: "myOtherNode"
  });
};
我们在下面展示了一个端到端示例。让我们创建一个包含 3 个节点:A、B 和 C 的简单图。我们将首先执行节点 A,然后根据节点 A 的输出决定下一步是去节点 B 还是节点 C。
import { StateGraph, StateSchema, GraphNode, Command, START } from "@langchain/langgraph";
import * as z from "zod";

// 定义图状态
const State = new StateSchema({
  foo: z.string(),
});

// 定义节点

const nodeA: GraphNode<typeof State, "nodeB" | "nodeC"> = (state) => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "b" : "c";
  // 这是条件边函数的替代
  const goto = value === "b" ? "nodeB" : "nodeC";

  // 注意 Command 允许您 BOTH 更新图状态 AND 路由到下一个节点
  return new Command({
    // 这是状态更新
    update: { foo: value },
    // 这是边的替代
    goto,
  });
};

const nodeB: GraphNode<typeof State> = (state) => {
  console.log("Called B");
  return { foo: state.foo + "b" };
};

const nodeC: GraphNode<typeof State> = (state) => {
  console.log("Called C");
  return { foo: state.foo + "c" };
};
我们现在可以使用上述节点创建 StateGraph。请注意,图没有 条件边 用于路由!这是因为控制流是在 nodeA 中使用 Command 定义的。
const graph = new StateGraph(State)
  .addNode("nodeA", nodeA, {
    ends: ["nodeB", "nodeC"],
  })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "nodeA")
  .compile();
您可能注意到我们使用了 ends 来指定 nodeA 可以导航到的节点。这对于图渲染是必要的,并告诉 LangGraph nodeA 可以导航到 nodeBnodeC
import * as fs from "node:fs/promises";

const drawableGraph = await graph.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);
如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采取不同的路径(A -> B 或 A -> C)。
const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'cc' }

导航到父图中的节点

如果您正在使用 子图,您可能希望从子图中的节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT
const myNode = (state: State): Command => {
  return new Command({
    update: { foo: "bar" },
    goto: "otherSubgraph",  // 其中 `otherSubgraph` 是父图中的节点
    graph: Command.PARENT
  });
};
让我们使用上面的示例演示此操作。我们将这样做,通过将上面的示例中的 nodeA 更改为我们将其作为子图添加到父图中的单节点图。
使用 Command.PARENT 的状态更新 当您从子图节点向父图节点发送更新时,对于父图和子图 状态模式 共享的键,您 必须 为父图状态中您要更新的键定义 归约器。见下方示例。
import { StateGraph, StateSchema, ReducedValue, GraphNode, Command, START } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  // 注意:我们在这里定义归约器
  foo: new ReducedValue(  
    z.string().default(""),
    { reducer: (x, y) => x + y }
  ),
});

const nodeA: GraphNode<typeof State, "nodeB" | "nodeC"> = (state) => {
  console.log("Called A");
  const value = Math.random() > 0.5 ? "nodeB" : "nodeC";

  // 注意 Command 允许您 BOTH 更新图状态 AND 路由到下一个节点
  return new Command({
    update: { foo: "a" },
    goto: value,
    // 这告诉 LangGraph 导航到父图中的 nodeB 或 nodeC
    // 注意:这将导航到相对于子图最近的父图
    graph: Command.PARENT,
  });
};

const subgraph = new StateGraph(State)
  .addNode("nodeA", nodeA, { ends: ["nodeB", "nodeC"] })
  .addEdge(START, "nodeA")
  .compile();

const nodeB: GraphNode<typeof State> = (state) => {
  console.log("Called B");
  // 注意:由于我们定义了归约器,我们不需要手动附加
  // 新字符到现有的 'foo' 值。相反,归约器将自动附加这些
  return { foo: "b" };
};

const nodeC: GraphNode<typeof State> = (state) => {
  console.log("Called C");
  return { foo: "c" };
};

const graph = new StateGraph(State)
  .addNode("subgraph", subgraph, { ends: ["nodeB", "nodeC"] })
  .addNode("nodeB", nodeB)
  .addNode("nodeC", nodeC)
  .addEdge(START, "subgraph")
  .compile();
const result = await graph.invoke({ foo: "" });
console.log(result);
Called A
Called C
{ foo: 'ac' }

在工具中使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用中,您可能希望在对话开始时根据账户号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
import { tool } from "@langchain/core/tools";
import { Command } from "@langchain/langgraph";
import * as z from "zod";

const lookupUserInfo = tool(
  async (input, config) => {
    const userId = config.configurable?.userId;
    const userInfo = getUserInfo(userId);
    return new Command({
      update: {
        // 更新状态键
        userInfo: userInfo,
        // 更新消息历史
        messages: [{
          role: "tool",
          content: "Successfully looked up user information",
          tool_call_id: config.toolCall.id
        }]
      }
    });
  },
  {
    name: "lookupUserInfo",
    description: "Use this to look up user information to better assist them with their questions.",
    schema: z.object({}),
  }
);
当您从工具返回 Command 时,您 必须Command.update 中包含 messages(或用于消息历史的任何状态键),并且 messages 中的消息列表 必须 包含 ToolMessage。这对于生成的消息历史有效是必要的(LLM 提供商要求带有工具调用的 AI 消息后跟工具结果消息)。
如果您正在使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode,它自动处理工具返回 Command 对象并将它们传播到图状态。如果您正在编写调用工具的自定义节点,则需要手动传播工具返回的 Command 对象作为节点的更新。

可视化您的图

在此我们演示如何可视化您创建的图。 您可以可视化任何任意 Graph,包括 StateGraph 让我们创建一个简单的示例图来演示可视化。
import { StateGraph, StateSchema, MessagesValue, ReducedValue, GraphNode, ConditionalEdgeRouter, START, END } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  messages: MessagesValue,
  value: new ReducedValue(
    z.number().default(0),
    { reducer: (x, y) => x + y }
  ),
});

const node1: GraphNode<typeof State> = (state) => {
  return { value: state.value + 1 };
};

const node2: GraphNode<typeof State> = (state) => {
  return { value: state.value * 2 };
};

const router: ConditionalEdgeRouter<typeof State, "node2"> = (state) => {
  if (state.value < 10) {
    return "node2";
  }
  return END;
};

const app = new StateGraph(State)
  .addNode("node1", node1)
  .addNode("node2", node2)
  .addEdge(START, "node1")
  .addConditionalEdges("node1", router)
  .addEdge("node2", "node1")
  .compile();

Mermaid

我们还可以将图类转换为 Mermaid 语法。
const drawableGraph = await app.getGraphAsync();
console.log(drawableGraph.drawMermaid());
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    tart__([<p>__start__</p>]):::first
    e1(node1)
    e2(node2)
    nd__([<p>__end__</p>]):::last
    tart__ --> node1;
    e1 -.-> node2;
    e1 -.-> __end__;
    e2 --> node1;
    ssDef default fill:#f2f0ff,line-height:1.2
    ssDef first fill-opacity:0
    ssDef last fill:#bfb6fc

PNG

如果更喜欢,我们可以将图渲染为 .png。这使用 Mermaid.ink API 生成图表。
import * as fs from "node:fs/promises";

const drawableGraph = await app.getGraphAsync();
const image = await drawableGraph.drawMermaidPng();
const imageBuffer = new Uint8Array(await image.arrayBuffer());

await fs.writeFile("graph.png", imageBuffer);