Skip to main content
Functional API 允许你在对现有代码进行最小更改的情况下,将 LangGraph 的关键功能(持久化内存人机回环流式传输)添加到你的应用程序中。 它旨在将这些功能集成到可能使用标准语言原语进行分支和流程控制的现有代码中,例如 if 语句、for 循环和函数调用。与许多要求将代码重构为显式管道或 DAG 的数据编排框架不同,Functional API 允许你整合这些能力,而无需强制执行僵化的执行模型。 Functional API 使用两个关键构建块:
  • entrypoint:Entrypoint 封装工作流逻辑并管理执行流,包括处理长时间运行的任务和中断。
  • task:代表一个离散的工作单元,例如 API 调用或数据处理步骤,可以在 entrypoint 内异步执行。任务返回一个类似 Future 的对象,可以等待或同步解析。
这为构建具有状态管理和流式传输的工作流提供了最小的抽象。
有关如何使用 Functional API 的信息,请参阅 Use Functional API

Functional API 与 Graph API

对于更喜欢声明式方法的用户,LangGraph 的 Graph API 允许你使用图范式定义工作流。两种 API 共享相同的底层运行时,因此你可以在同一个应用程序中一起使用它们。 以下是一些关键区别:
  • 控制流:Functional API 不需要考虑图结构。你可以使用标准的 Python 构造来定义工作流。这通常会减少你需要编写的代码量。
  • 短期记忆Graph API 需要声明 State,并且可能需要定义 reducers 来管理图状态的更新。@entrypoint@tasks 不需要显式的状态管理,因为它们的状态限定在函数内,且不跨函数共享。
  • 检查点:两种 API 都会生成和使用检查点。在 Graph API 中,每个 superstep 之后都会生成一个新的检查点。在 Functional API 中,当任务执行时,其结果会保存到与给定 entrypoint 关联的现有检查点中,而不是创建新的检查点。
  • 可视化:Graph API 使得将工作流可视化为图变得容易,这对于调试、理解工作流和与他人分享很有用。Functional API 不支持可视化,因为图是在运行时动态生成的。

示例

下面我们演示一个简单的应用程序,该程序撰写一篇文章并 中断 以请求人工审查。
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // A placeholder for a long-running task.
  await new Promise((resolve) => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);
此工作流将撰写一篇关于主题“猫”的文章,然后暂停以获取人工审查。工作流可以无限期地中断,直到提供审查。当工作流恢复时,它将从头开始执行,但由于 writeEssay 任务的结果已经保存,任务结果将从检查点加载,而不是重新计算。
import { v4 as uuidv4 } from "uuid";
import { MemorySaver, entrypoint, task, interrupt } from "@langchain/langgraph";

const writeEssay = task("writeEssay", async (topic: string) => {
  // This is a placeholder for a long-running task.
  await new Promise(resolve => setTimeout(resolve, 1000));
  return `An essay about topic: ${topic}`;
});

const workflow = entrypoint(
  { checkpointer: new MemorySaver(), name: "workflow" },
  async (topic: string) => {
    const essay = await writeEssay(topic);
    const isApproved = interrupt({
      // Any json-serializable payload provided to interrupt as argument.
      // It will be surfaced on the client side as an Interrupt when streaming data
      // from the workflow.
      essay, // The essay we want reviewed.
      // We can add any additional information that we need.
      // For example, introduce a key called "action" with some instructions.
      action: "Please approve/reject the essay",
    });

    return {
      essay, // The essay that was generated
      isApproved, // Response from HIL
    };
  }
);

const threadId = uuidv4();

const config = {
  configurable: {
    thread_id: threadId
  }
};

for await (const item of workflow.stream("cat", config)) {
  console.log(item);
}
{ writeEssay: 'An essay about topic: cat' }
{
  __interrupt__: [{
    value: { essay: 'An essay about topic: cat', action: 'Please approve/reject the essay' },
    resumable: true,
    ns: ['workflow:f7b8508b-21c0-8b4c-5958-4e8de74d2684'],
    when: 'during'
  }]
}
文章已写好,准备审查。一旦提供审查,我们可以恢复工作流:
import { Command } from "@langchain/langgraph";

// Get review from a user (e.g., via a UI)
// In this case, we're using a bool, but this can be any json-serializable value.
const humanReview = true;

const stream = await workflow.stream(
  new Command({ resume: humanReview }),
  config
);
for await (const item of stream) {
  console.log(item);
}
{ workflow: { essay: 'An essay about topic: cat', isApproved: true } }
工作流已完成,审查已添加到文章中。

入口点

entrypoint 函数可用于从函数创建工作流。它封装工作流逻辑并管理执行流,包括处理 长时间运行的任务中断

定义

入口点 通过调用 entrypoint 函数并传入配置和函数来定义。 该函数 必须接受单个位置参数,作为工作流输入。如果需要传递多个数据片段,请使用对象作为第一个参数的输入类型。 使用函数创建入口点会产生一个工作流实例,有助于管理工作流的执行(例如,处理流式传输、恢复和检查点)。 你通常希望向 entrypoint 函数传递一个 checkpointer 以启用持久化并使用 人机回环 等功能。
import { entrypoint } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: Record<string, any>): Promise<number> => {
    // some logic that may involve long-running tasks like API calls,
    // and may be interrupted for human-in-the-loop
    return result;
  }
);
序列化 入口点的 输入输出 必须是 JSON 可序列化的,以支持检查点。有关更多详细信息,请参阅 序列化 部分。

执行

使用 entrypoint 函数将返回一个可以使用 invokestream 方法执行的对象。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};
await myWorkflow.invoke(someInput, config); // Wait for the result

恢复

interrupt 后恢复执行可以通过向 Command 原语传递 resume 值来完成。
import { Command } from "@langchain/langgraph";

const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(new Command({ resume: someResumeValue }), config);
错误后恢复 要在错误后恢复,请使用 null 和相同的 thread id (config) 运行 entrypoint 这假设底层的 error 已解决,执行可以成功继续。
const config = {
  configurable: {
    thread_id: "some_thread_id"
  }
};

await myWorkflow.invoke(null, config);

短期记忆

entrypoint 使用 checkpointer 定义时,它会在相同 thread id 的连续调用之间存储信息到 checkpoints 这允许使用 getPreviousState 函数访问前一次调用的状态。 默认情况下,getPreviousState 函数返回前一次调用的返回值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    return number + previous;
  }
);

const config = {
  configurable: {
    thread_id: "some_thread_id",
  },
};

await myWorkflow.invoke(1, config); // 1 (previous was undefined)
await myWorkflow.invoke(2, config); // 3 (previous was 1 from the previous invocation)

entrypoint.final

entrypoint.final 是一个特殊原语,可以从 entrypoint 返回,允许 解耦 保存在 检查点 中的值和 entrypoint 的返回值 第一个值是 entrypoint 的返回值,第二个值是将保存在检查点中的值。
import { entrypoint, getPreviousState } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (number: number) => {
    const previous = getPreviousState<number>() ?? 0;
    // This will return the previous value to the caller, saving
    // 2 * number to the checkpoint, which will be used in the next invocation
    // for the `previous` parameter.
    return entrypoint.final({
      value: previous,
      save: 2 * number,
    });
  }
);

const config = {
  configurable: {
    thread_id: "1",
  },
};

await myWorkflow.invoke(3, config); // 0 (previous was undefined)
await myWorkflow.invoke(1, config); // 6 (previous was 3 * 2 from the previous invocation)

任务

任务 代表一个离散的工作单元,例如 API 调用或数据处理步骤。它具有两个关键特征:
  • 异步执行:任务设计为异步执行,允许多个操作并发运行而不阻塞。
  • 检查点:任务结果保存到检查点,使工作流能够从上次保存的状态恢复。(有关更多详细信息,请参阅 持久化)。

定义

任务使用 task 函数定义,该函数包装常规函数。
import { task } from "@langchain/langgraph";

const slowComputation = task("slowComputation", async (inputValue: any) => {
  // Simulate a long-running operation
  return result;
});
序列化 任务的 输出 必须是 JSON 可序列化的,以支持检查点。

执行

任务 只能从 入口点、另一个 任务状态图节点 内部调用。 任务 不能 直接从主应用程序代码中调用。 当你调用 任务 时,它会返回一个可以等待的 Promise。
const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (someInput: number): Promise<number> => {
    return await slowComputation(someInput);
  }
);

何时使用任务

任务 在以下场景中很有用:
  • 检查点:当你需要将长时间运行的操作结果保存到检查点时,这样在恢复工作流时就不需要重新计算它。
  • 人机回环:如果你正在构建需要人工干预的工作流,你必须使用 任务 来封装任何随机性(例如 API 调用),以确保工作流可以正确恢复。有关更多详细信息,请参阅 确定性 部分。
  • 并行执行:对于 I/O 绑定任务,任务 支持并行执行,允许多个操作并发运行而不阻塞(例如,调用多个 API)。
  • 可观测性:将操作包装在 任务 中提供了一种跟踪工作流进度并使用 LangSmith 监控单个操作执行的方法。
  • 可重试工作:当工作需要重试以处理失败或不一致时,任务 提供了一种封装和管理重试逻辑的方法。

序列化

LangGraph 中的序列化有两个关键方面:
  1. entrypoint 的输入和输出必须是 JSON 可序列化的。
  2. task 的输出必须是 JSON 可序列化的。
这些要求对于启用检查点和工作流恢复是必要的。使用原语如对象、数组、字符串、数字和布尔值,以确保你的输入和输出是可序列化的。 序列化确保工作流状态(如任务结果和中间值)可以可靠地保存和恢复。这对于启用人机回环交互、容错性和并行执行至关重要。 提供不可序列化的输入或输出将在配置了 checkpointer 的工作流运行时导致错误。

确定性

为了利用 人机回环 等功能,任何随机性都应封装在 任务 内部。这保证了当执行被中止(例如,用于人机回环)然后恢复时,它将遵循相同的 步骤序列,即使 任务 结果是非确定性的。 LangGraph 通过在执行时持久化 任务子图 结果来实现此行为。设计良好的工作流确保恢复执行遵循 相同的步骤序列,允许正确检索先前计算的结果而无需重新执行它们。这对于长时间运行的 任务 或具有非确定性结果的 任务 特别有用,因为它避免了重复之前完成的工作,并允许从本质上相同的地方恢复。 虽然工作流的不同运行可能会产生不同的结果,但恢复 特定 运行应始终遵循相同的记录步骤序列。这允许 LangGraph 高效查找在图被中断之前执行的 任务子图 结果,并避免重新计算它们。

幂等性

幂等性确保多次运行同一操作会产生相同的结果。这有助于防止如果步骤因失败而重新运行而导致重复的 API 调用和冗余处理。始终将 API 调用放在 任务 函数中以进行检查点,并设计它们在重新执行时是幂等的。如果 任务 开始但未成功完成,则可能会发生重新执行。然后,如果工作流恢复,任务 将再次运行。使用幂等键或验证现有结果以避免重复。

常见陷阱

处理副作用

将副作用(例如,写入文件、发送邮件)封装在任务中,以确保在恢复工作流时不会多次执行它们。
在此示例中,副作用(写入文件)直接包含在工作流中,因此在恢复工作流时将再次执行。
import { entrypoint, interrupt } from "@langchain/langgraph";
import fs from "fs";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow },
  async (inputs: Record<string, any>) => {
    // This code will be executed a second time when resuming the workflow.
    // Which is likely not what you want.
    fs.writeFileSync("output.txt", "Side effect executed");
    const value = interrupt("question");
    return value;
  }
);

非确定性控制流

每次可能给出不同结果的操作(如获取当前时间或随机数)应封装在任务中,以确保在恢复时返回相同的结果。
  • 在任务中:获取随机数 (5) → 中断 → 恢复 → (再次返回 5) → …
  • 不在任务中:获取随机数 (5) → 中断 → 恢复 → 获取新随机数 (7) → …
这在具有多个中断调用的 人机回环 工作流中尤为重要。LangGraph 为每个任务/入口点保留恢复值列表。遇到中断时,它将与相应的恢复值匹配。此匹配严格 基于索引,因此恢复值的顺序应与中断的顺序匹配。 如果在恢复时未保持执行顺序,一个 interrupt 调用可能与错误的 resume 值匹配,导致不正确的结果。 请阅读 确定性 部分以了解更多信息。
在此示例中,工作流使用当前时间来确定执行哪个任务。这是非确定性的,因为工作流的结果取决于执行时的时间。
import { entrypoint, interrupt } from "@langchain/langgraph";

const myWorkflow = entrypoint(
  { checkpointer, name: "workflow" },
  async (inputs: { t0: number }) => {
    const t1 = Date.now();

    const deltaT = t1 - inputs.t0;

    if (deltaT > 1000) {
      const result = await slowTask(1);
      const value = interrupt("question");
      return { result, value };
    } else {
      const result = await slowTask(2);
      const value = interrupt("question");
      return { result, value };
    }
  }
);

了解更多