Skip to main content
持久化执行是一种技术,通过该技术,进程或工作流在关键节点保存其进度,使其能够暂停并在之后从离开的位置准确恢复。这在需要人工介入的场景中特别有用,用户可以在继续之前检查、验证或修改流程;同时,它也适用于可能遇到中断或错误(例如,调用LLM超时)的长时间运行任务。通过保留已完成的工作,持久化执行使得流程能够在无需重新处理先前步骤的情况下恢复——即使在经历显著延迟(例如,一周后)之后。 LangGraph内置的持久化层为工作流提供了持久化执行功能,确保每个执行步骤的状态都保存到持久化存储中。这一能力保证了如果工作流被中断——无论是由于系统故障还是为了人工介入交互——它都可以从最后记录的状态恢复。
如果你在使用带有检查点的LangGraph,那么你已经启用了持久化执行。你可以在任何点暂停和恢复工作流,即使是在中断或故障之后。 为了充分利用持久化执行,请确保你的工作流被设计为确定性的幂等的,并将任何副作用或非确定性操作包装在任务中。你可以同时使用StateGraph(Graph API)Functional API中的任务

要求

要在LangGraph中利用持久化执行,你需要:
  1. 通过指定一个将保存工作流进度的检查点来在工作流中启用持久化
  2. 在执行工作流时指定一个线程标识符。这将跟踪特定工作流实例的执行历史。
  3. 将任何非确定性操作(例如,随机数生成)或具有副作用的操作(例如,文件写入、API调用)包装在任务中,以确保当工作流恢复时,这些操作不会在特定运行中重复执行,而是从持久化层检索其结果。更多信息请参见确定性与一致重放

确定性与一致重放

当你恢复一个工作流运行时,代码不会从执行停止的同一行代码恢复;相反,它将识别一个适当的起始点,从中继续执行。这意味着工作流将从起始点重放所有步骤,直到到达它停止的位置。 因此,在为持久化执行编写工作流时,你必须将任何非确定性操作(例如,随机数生成)和任何具有副作用的操作(例如,文件写入、API调用)包装在任务节点中。 为确保你的工作流是确定性的并且可以一致地重放,请遵循以下指南:
  • 避免重复工作:如果一个节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在单独的任务中。这确保了当工作流恢复时,操作不会重复执行,并且它们的结果从持久化层检索。
  • 封装非确定性操作:将任何可能产生非确定性结果的代码(例如,随机数生成)包装在任务节点中。这确保了在恢复时,工作流遵循完全相同的记录步骤序列,并产生相同的结果。
  • 使用幂等操作:在可能的情况下,确保副作用(例如,API调用、文件写入)是幂等的。这意味着如果操作在工作流失败后重试,它将产生与第一次执行相同的效果。这对于导致数据写入的操作尤其重要。如果任务开始但未能成功完成,工作流的恢复将重新运行该任务,并依赖记录的结果来保持一致性。使用幂等键或验证现有结果,以避免意外的重复,确保工作流执行平稳且可预测。
有关应避免的常见陷阱示例,请参见Functional API中的常见陷阱部分,其中展示了如何使用任务来构建代码以避免这些问题。同样的原则也适用于StateGraph(Graph API)

持久化模式

LangGraph支持三种持久化模式,允许你根据应用程序的需求在性能和数据一致性之间进行权衡。更高的持久化模式会增加工作流执行的开销。你可以在调用任何图执行方法时指定持久化模式:
await graph.stream(
  { input: "test" },
  { durability: "sync" }
)
持久化模式从最不持久到最持久,如下所示:
  • "exit":LangGraph仅在图执行成功退出、出错或因人工介入中断时持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此你无法从执行过程中发生的系统故障(如进程崩溃)中恢复。
  • "async":LangGraph在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但存在一个小的风险,即如果进程在执行期间崩溃,LangGraph可能不会写入检查点。
  • "sync":LangGraph在下一步开始之前同步持久化更改。这确保了LangGraph在继续执行之前写入每个检查点,以一定的性能开销为代价提供了高持久性。

在节点中使用任务

如果一个节点包含多个操作,你可能会发现将每个操作转换为任务比将操作重构为单独的节点更容易。
import { StateGraph, StateSchema, GraphNode, START, END, MemorySaver } from "@langchain/langgraph";
import { v4 as uuidv4 } from "uuid";
import * as z from "zod";

// 定义一个StateSchema来表示状态
const State = new StateSchema({
  url: z.string(),
  result: z.string().optional(),
});

const callApi: GraphNode<typeof State> = async (state) => {
  const response = await fetch(state.url);
  const text = await response.text();
  const result = text.slice(0, 100); // 副作用
  return {
    result,
  };
};

// 创建一个StateGraph构建器,并为callApi函数添加一个节点
const builder = new StateGraph(State)
  .addNode("callApi", callApi)
  .addEdge(START, "callApi")
  .addEdge("callApi", END);

// 指定一个检查点
const checkpointer = new MemorySaver();

// 使用检查点编译图
const graph = builder.compile({ checkpointer });

// 定义一个包含线程ID的配置。
const threadId = uuidv4();
const config = { configurable: { thread_id: threadId } };

// 调用图
await graph.invoke({ url: "https://www.example.com" }, config);

恢复工作流

一旦你在工作流中启用了持久化执行,你可以在以下场景中恢复执行:
  • 暂停和恢复工作流:使用interrupt函数在特定点暂停工作流,并使用Command原语以更新后的状态恢复它。更多详情请参见中断
  • 从故障中恢复:在异常(例如,LLM提供商中断)后自动从最后一个成功的检查点恢复工作流。这涉及通过提供相同的线程标识符并输入null作为输入值来执行工作流(请参见Functional API中的这个示例)。

恢复工作流的起始点

  • 如果你使用的是StateGraph(Graph API),起始点是执行停止的节点的开始处。
  • 如果你在节点内部调用子图,起始点将是调用被暂停子图的节点。 在子图内部,起始点将是执行停止的特定节点
  • 如果你使用的是Functional API,起始点是执行停止的入口点的开始处。