Skip to main content
LangGraph 实现了一个流式系统,用于展示实时更新。流式传输对于增强基于 LLM 构建的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式传输也能显著改善用户体验 (UX),特别是在处理 LLM 延迟时。

入门

基本用法

LangGraph 图暴露了 stream 方法来生成流式输出作为迭代器。
for await (const chunk of await graph.stream(inputs, {
  streamMode: "updates",
})) {
  console.log(chunk);
}

流模式

将以下一个或多个流模式作为列表传递给 stream 方法:
模式描述
values每一步后的完整状态。
updates每一步后的状态更新。同一步骤中的多个更新将单独流式传输。
messagesLLM 调用的 (LLM 令牌,元数据) 2-元组。
custom通过 writer 配置参数从节点发出的自定义数据。
tools工具调用生命周期事件 (on_tool_start, on_tool_event, on_tool_end, on_tool_error)。
debug图执行过程中的所有可用信息。

图状态

使用流模式 updatesvalues 来流式传输图执行时的状态。
  • updates 流式传输图每一步后对状态的更新
  • values 流式传输图每一步后的状态完整值
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod/v4";

const State = new StateSchema({
  topic: z.string(),
  joke: z.string(),
});

const graph = new StateGraph(State)
  .addNode("refineTopic", (state) => {
    return { topic: state.topic + " and cats" };
  })
  .addNode("generateJoke", (state) => {
    return { joke: `This is a joke about ${state.topic}` };
  })
  .addEdge(START, "refineTopic")
  .addEdge("refineTopic", "generateJoke")
  .addEdge("generateJoke", END)
  .compile();
使用此功能仅流式传输节点在每一步后返回的状态更新。流式输出包括节点名称以及更新内容。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "updates" }
)) {
  for (const [nodeName, state] of Object.entries(chunk)) {
    console.log(`Node ${nodeName} updated:`, state);
  }
}

LLM 令牌

使用 messages 流式模式从图的任何部分(包括节点、工具、子图或任务)逐个令牌地流式传输大型语言模型 (LLM) 输出。 messages 模式 的流式输出是一个元组 [message_chunk, metadata],其中:
  • message_chunk: LLM 的令牌或消息段。
  • metadata: 包含有关图节点和 LLM 调用详细信息的字典。
如果您的 LLM 不可用作 LangChain 集成,则可以使用 custom 模式流式传输其输出。详见 与任何 LLM 配合使用
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";

const MyState = new StateSchema({
  topic: z.string(),
  joke: z.string().default(""),
});

const model = new ChatOpenAI({ model: "gpt-4.1-mini" });

const callModel: GraphNode<typeof MyState> = async (state) => {
  // 调用 LLM 生成关于主题的笑话
  // 注意,即使 LLM 是使用 .invoke 而不是 .stream 运行,也会发出消息事件
  const modelResponse = await model.invoke([
    { role: "user", content: `Generate a joke about ${state.topic}` },
  ]);
  return { joke: modelResponse.content };
};

const graph = new StateGraph(MyState)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

// "messages" 流模式返回一个元组迭代器 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [messageChunk, metadata] of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "messages" }
)) {
  if (messageChunk.content) {
    console.log(messageChunk.content + "|");
  }
}

按 LLM 调用筛选

您可以将 tags 关联到 LLM 调用,以便按 LLM 调用筛选流式传输的令牌。
import { ChatOpenAI } from "@langchain/openai";

// model1 标记为 "joke"
const model1 = new ChatOpenAI({
  model: "gpt-4.1-mini",
  tags: ['joke']
});
// model2 标记为 "poem"
const model2 = new ChatOpenAI({
  model: "gpt-4.1-mini",
  tags: ['poem']
});

const graph = // ... 定义使用这些 LLM 的图

// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// 元数据包含有关 LLM 调用的信息,包括标签
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // 根据元数据中的 tags 字段筛选流式传输的令牌,仅包含
  // 来自带有 "joke" 标签的 LLM 调用的令牌
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";

// jokeModel 标记为 "joke"
const jokeModel = new ChatOpenAI({
  model: "gpt-4.1-mini",
  tags: ["joke"]
});
// poemModel 标记为 "poem"
const poemModel = new ChatOpenAI({
  model: "gpt-4.1-mini",
  tags: ["poem"]
});

const State = new StateSchema({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const callModel: GraphNode<typeof State> = async (state) => {
  const topic = state.topic;
  console.log("Writing joke...");

  const jokeResponse = await jokeModel.invoke([
    { role: "user", content: `Write a joke about ${topic}` }
  ]);

  console.log("\n\nWriting poem...");
  const poemResponse = await poemModel.invoke([
    { role: "user", content: `Write a short poem about ${topic}` }
  ]);

  return {
    joke: jokeResponse.content,
    poem: poemResponse.content
  };
};

const graph = new StateGraph(State)
  .addNode("callModel", callModel)
  .addEdge(START, "callModel")
  .compile();

// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// 元数据包含有关 LLM 调用的信息,包括标签
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // 根据元数据中的 tags 字段筛选流式传输的令牌,仅包含
  // 来自带有 "joke" 标签的 LLM 调用的令牌
  if (metadata.tags?.includes("joke")) {
    console.log(msg.content + "|");
  }
}

从流中省略消息

使用 nostream 标签完全排除 LLM 输出。标记为 nostream 的调用仍然运行并产生输出;它们的令牌只是在 messages 模式下不发出。 这在以下情况下很有用:
  • 您需要 LLM 输出进行内部处理(例如结构化输出),但不想将其流式传输到客户端
  • 您通过不同的渠道流式传输相同的内容(例如自定义 UI 消息),并希望避免 messages 流中的重复输出
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";

const streamModel = new ChatAnthropic({ model: "claude-3-haiku-20240307" });
const internalModel = new ChatAnthropic({
  model: "claude-3-haiku-20240307",
}).withConfig({
  tags: ["nostream"],
});

const State = new StateSchema({
  topic: z.string(),
  answer: z.string().optional(),
  notes: z.string().optional(),
});

const writeAnswer = async (state: typeof State.State) => {
  const r = await streamModel.invoke([
    { role: "user", content: `Reply briefly about ${state.topic}` },
  ]);
  return { answer: r.content };
};

const internalNotes = async (state: typeof State.State) => {
  // Tokens from this model are omitted from streamMode: "messages" because of nostream
  const r = await internalModel.invoke([
    { role: "user", content: `Private notes on ${state.topic}` },
  ]);
  return { notes: r.content };
};

const graph = new StateGraph(State)
  .addNode("writeAnswer", writeAnswer)
  .addNode("internal_notes", internalNotes)
  .addEdge(START, "writeAnswer")
  .addEdge("writeAnswer", "internal_notes")
  .compile();

const stream = await graph.stream({ topic: "AI" }, { streamMode: "messages" });

按节点筛选

要仅从特定节点流式传输令牌,请使用 stream_mode="messages" 并根据流式传输元数据中的 langgraph_node 字段筛选输出:
// "messages" 流模式返回一个元组 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [msg, metadata] of await graph.stream(
  inputs,
  { streamMode: "messages" }
)) {
  // 根据元数据中的 langgraph_node 字段筛选流式传输的令牌
  // 仅包含来自指定节点的令牌
  if (msg.content && metadata.langgraph_node === "some_node_name") {
    // ...
  }
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";

const model = new ChatOpenAI({ model: "gpt-4.1-mini" });

const State = new StateSchema({
  topic: z.string(),
  joke: z.string(),
  poem: z.string(),
});

const writeJoke: GraphNode<typeof State> = async (state) => {
  const topic = state.topic;
  const jokeResponse = await model.invoke([
    { role: "user", content: `Write a joke about ${topic}` }
  ]);
  return { joke: jokeResponse.content };
};

const writePoem: GraphNode<typeof State> = async (state) => {
  const topic = state.topic;
  const poemResponse = await model.invoke([
    { role: "user", content: `Write a short poem about ${topic}` }
  ]);
  return { poem: poemResponse.content };
};

const graph = new StateGraph(State)
  .addNode("writeJoke", writeJoke)
  .addNode("writePoem", writePoem)
  // 同时编写笑话和诗歌
  .addEdge(START, "writeJoke")
  .addEdge(START, "writePoem")
  .compile();

// "messages" 流模式返回一个元组 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [msg, metadata] of await graph.stream(
  { topic: "cats" },
  { streamMode: "messages" }
)) {
  // 根据元数据中的 langgraph_node 字段筛选流式传输的令牌
  // 仅包含来自 writePoem 节点的令牌
  if (msg.content && metadata.langgraph_node === "writePoem") {
    console.log(msg.content + "|");
  }
}

自定义数据

要从 LangGraph 节点或工具内部发送自定义用户定义的数据,请按照以下步骤操作:
  1. 使用 LangGraphRunnableConfig 中的 writer 参数发出自定义数据。
  2. 调用 .stream() 时设置 streamMode: "custom" 以在流中获取自定义数据。您可以组合多种模式(例如 ["updates", "custom"]),但必须至少有一个是 "custom"
import { StateGraph, StateSchema, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({
  query: z.string(),
  answer: z.string(),
});

const node: GraphNode<typeof State> = async (state, config) => {
  // 使用 writer 发出自定义键值对(例如进度更新)
  config.writer({ custom_key: "Generating custom data inside node" });
  return { answer: "some data" };
};

const graph = new StateGraph(State)
  .addNode("node", node)
  .addEdge(START, "node")
  .compile();

const inputs = { query: "example" };

// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
  console.log(chunk);
}

工具进度

使用 tools 流模式接收工具执行的实时生命周期事件。这有助于在工具运行时在 UI 中显示进度指示器、部分结果和错误状态。 tools 流模式发出四种事件类型:
事件何时负载
on_tool_start工具调用开始name, input, toolCallId
on_tool_event工具产生中间数据name, data, toolCallId
on_tool_end工具返回最终结果name, output, toolCallId
on_tool_error工具抛出错误name, error, toolCallId

定义流式传输进度的工具

要发出 on_tool_event 事件,请将您的工具函数定义为异步生成器 (async function*)。每个 yield 将中间数据发送到流,return 值用作工具的最终结果。
import { tool } from "@langchain/core/tools";
import { z } from "zod/v4";

const searchFlights = tool(
  async function* (input) {
    const airlines = ["United", "Delta", "American", "JetBlue"];
    const completed: string[] = [];

    for (let i = 0; i < airlines.length; i++) {
      await new Promise((r) => setTimeout(r, 500));
      completed.push(airlines[i]);

      // 每个 yield 向流发出一个 on_tool_event
      yield {
        message: `Searching ${airlines[i]}...`,
        progress: (i + 1) / airlines.length,
        completed,
      };
    }

    // 返回值成为工具结果 (ToolMessage.content)
    return JSON.stringify({
      flights: [
        { airline: "United", price: 450, duration: "5h 30m" },
        { airline: "Delta", price: 520, duration: "5h 15m" },
      ],
    });
  },
  {
    name: "search_flights",
    description: "Search for available flights to a destination.",
    schema: z.object({
      destination: z.string(),
      date: z.string(),
    }),
  }
);
现有的返回 Promise 的工具完全兼容。它们发出 on_tool_starton_tool_end 事件,但不发出 on_tool_event 事件。

在服务端消费工具事件

graph.stream() 传递 streamMode: ["tools"](或与其他模式组合):
for await (const [mode, chunk] of await graph.stream(
  { messages: [{ role: "user", content: "Find flights to Tokyo" }] },
  { streamMode: ["updates", "tools"] }
)) {
  if (mode === "tools") {
    switch (chunk.event) {
      case "on_tool_start":
        console.log(`Tool started: ${chunk.name}`, chunk.input);
        break;
      case "on_tool_event":
        console.log(`Tool progress: ${chunk.name}`, chunk.data);
        break;
      case "on_tool_end":
        console.log(`Tool finished: ${chunk.name}`, chunk.output);
        break;
      case "on_tool_error":
        console.error(`Tool failed: ${chunk.name}`, chunk.error);
        break;
    }
  }
}

在 React 中使用 useStream 进行工具进度

来自 @langchain/langgraph-sdk/reactuseStream 钩子在您将 "tools" 包含在流模式中时暴露 toolProgress 数组。每个条目都是一个 ToolProgress 对象,跟踪运行中工具的当前状态:
字段描述
name工具名称
state当前生命周期状态:"starting", "running", "completed", 或 "error"
toolCallId来自 LLM 的工具调用 ID
input工具的输入参数
dataon_tool_event 的最新产生数据
result最终结果,在 on_tool_end 上设置
error错误,在 on_tool_error 上设置
import { useStream } from "@langchain/langgraph-sdk/react";

function Chat() {
  const stream = useStream({
    assistantId: "my-agent",
    streamMode: ["values", "tools"],
  });

  // 筛选处于活动状态的工具
  const activeTools = stream.toolProgress.filter(
    (t) => t.state === "starting" || t.state === "running"
  );

  return (
    <div>
      {stream.messages.map((msg) => (
        <MessageBubble key={msg.id} message={msg} />
      ))}

      {/* 显示运行中工具的进度卡片 */}
      {activeTools.map((tool) => (
        <ToolProgressCard
          key={tool.toolCallId ?? tool.name}
          name={tool.name}
          state={tool.state}
          data={tool.data}
        />
      ))}
    </div>
  );
}
此示例展示了一个完整的代理,其异步生成器工具将搜索进度流式传输到 React UI。代理定义:
import { tool } from "@langchain/core/tools";
import { ChatOpenAI } from "@langchain/openai";
import { createAgent } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint-memory";
import { z } from "zod/v4";

const searchFlights = tool(
  async function* (input) {
    const airlines = ["United", "Delta", "American", "JetBlue"];
    const completed: string[] = [];

    for (let i = 0; i < airlines.length; i++) {
      await new Promise((r) => setTimeout(r, 600));
      completed.push(`${airlines[i]}: checked`);
      yield {
        message: `Searching ${airlines[i]}...`,
        progress: (i + 1) / airlines.length,
        completed,
      };
    }

    return JSON.stringify({
      flights: [
        { airline: "United", price: 450, duration: "5h 30m" },
        { airline: "Delta", price: 520, duration: "5h 15m" },
      ],
    });
  },
  {
    name: "search_flights",
    description: "Search for available flights.",
    schema: z.object({
      destination: z.string(),
      departure_date: z.string(),
    }),
  }
);

const checkHotels = tool(
  async function* (input) {
    const hotels = ["Grand Hyatt", "Marriott", "Hilton"];
    const completed: string[] = [];

    for (let i = 0; i < hotels.length; i++) {
      await new Promise((r) => setTimeout(r, 400));
      completed.push(`${hotels[i]}: available`);
      yield {
        message: `Checking ${hotels[i]}...`,
        progress: (i + 1) / hotels.length,
        completed,
      };
    }

    return JSON.stringify({
      hotels: [
        { name: "Grand Hyatt", price: 250, rating: 4.5 },
        { name: "Marriott", price: 180, rating: 4.2 },
      ],
    });
  },
  {
    name: "check_hotels",
    description: "Check hotel availability.",
    schema: z.object({
      city: z.string(),
      check_in: z.string(),
      nights: z.number(),
    }),
  }
);

export const agent = createAgent({
  model: new ChatOpenAI({ model: "gpt-4o-mini" }),
  tools: [searchFlights, checkHotels],
  checkpointer: new MemorySaver(),
});
带有进度卡片的 React 组件:
import { useStream } from "@langchain/langgraph-sdk/react";

function TravelPlanner() {
  const stream = useStream<typeof agent>({
    assistantId: "travel-agent",
    streamMode: ["values", "tools"],
  });

  const activeTools = stream.toolProgress.filter(
    (t) => t.state === "starting" || t.state === "running"
  );

  return (
    <div>
      {stream.messages.map((msg) => (
        <div key={msg.id}>{msg.content}</div>
      ))}

      {activeTools.map((tool) => {
        const data = tool.data as {
          message?: string;
          progress?: number;
          completed?: string[];
        } | undefined;

        return (
          <div key={tool.toolCallId ?? tool.name}>
            <strong>{tool.name}</strong>
            {data?.message && <p>{data.message}</p>}
            {data?.progress != null && (
              <div style={{ width: "100%", background: "#eee" }}>
                <div
                  style={{
                    width: `${data.progress * 100}%`,
                    background: "#4CAF50",
                    height: 8,
                    transition: "width 0.3s ease",
                  }}
                />
              </div>
            )}
            {data?.completed?.map((step, i) => (
              <div key={i}>&#10003; {step}</div>
            ))}
          </div>
        );
      })}
    </div>
  );
}

toolscustom 流模式

两种流模式都可以显示工具进度,但它们服务于不同的目的:
  • tools — 自动发出结构化的生命周期事件 (on_tool_start, on_tool_event, on_tool_end, on_tool_error),无需更改工具中的代码,只需使用 async function*useStream 钩子开箱即用即可提供响应式 toolProgress 数组。
  • custom — 让您使用 config.writer() 完全控制发出什么数据以及何时发出。当您需要的自由格式数据不映射到工具生命周期,或者您希望从节点(而不仅仅是工具)流式传输时使用此功能。

子图输出

要将 子图 的输出包含在流式输出中,您可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输来自父图和任何子图的输出。 输出将作为元组 [namespace, data] 流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ["parent_node:<task_id>", "child_node:<task_id>"]
for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    // 设置 subgraphs: true 以流式传输来自子图的输出
    subgraphs: true,
    streamMode: "updates",
  }
)) {
  console.log(chunk);
}
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import { z } from "zod/v4";

// 定义子图
const SubgraphState = new StateSchema({
  foo: z.string(), // 注意此键与父图状态共享
  bar: z.string(),
});

const subgraphBuilder = new StateGraph(SubgraphState)
  .addNode("subgraphNode1", (state) => {
    return { bar: "bar" };
  })
  .addNode("subgraphNode2", (state) => {
    return { foo: state.foo + state.bar };
  })
  .addEdge(START, "subgraphNode1")
  .addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();

// 定义父图
const ParentState = new StateSchema({
  foo: z.string(),
});

const builder = new StateGraph(ParentState)
  .addNode("node1", (state) => {
    return { foo: "hi! " + state.foo };
  })
  .addNode("node2", subgraph)
  .addEdge(START, "node1")
  .addEdge("node1", "node2");
const graph = builder.compile();

for await (const chunk of await graph.stream(
  { foo: "foo" },
  {
    streamMode: "updates",
    // 设置 subgraphs: true 以流式传输来自子图的输出
    subgraphs: true,
  }
)) {
  console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意 我们不仅接收节点更新,还接收命名空间,这告诉我们正在从哪个图(或子图)流式传输。

调试

使用 debug 流模式在图执行过程中尽可能多地流式传输信息。流式输出包括节点名称以及完整状态。
for await (const chunk of await graph.stream(
  { topic: "ice cream" },
  { streamMode: "debug" }
)) {
  console.log(chunk);
}

同时使用多种模式

您可以将数组作为 streamMode 参数传递以同时流式传输多种模式。 流式输出将是 [mode, chunk] 元组,其中 mode 是流模式名称,chunk 是该模式流式传输的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
  streamMode: ["updates", "custom"],
})) {
  console.log(chunk);
}

高级

与任何 LLM 配合使用

您可以使用 streamMode: "custom"任何 LLM API 流式传输数据 — 即使该 API 实现 LangChain 聊天模型接口。 这使您能够集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对于自定义设置高度灵活。
import { StateGraph, GraphNode, StateSchema } from "@langchain/langgraph";
import * as z from "zod";

const State = new StateSchema({ result: z.string() });

const callArbitraryModel: GraphNode<typeof State> = async (state, config) => {
  // 示例节点,调用任意模型并流式传输输出
  // 假设您有一个流式客户端,它产生块
  // 使用您的自定义流式客户端生成 LLM 令牌
  for await (const chunk of yourCustomStreamingClient(state.topic)) {
    // 使用 writer 将自定义数据发送到流
    config.writer({ custom_llm_chunk: chunk });
  }
  return { result: "completed" };
};

const graph = new StateGraph(State)
  .addNode("callArbitraryModel", callArbitraryModel)
  // 根据需要添加其他节点和边
  .compile();

// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(
  { topic: "cats" },
  { streamMode: "custom" }
)) {
  // 块将包含从 llm 流式传输的自定义数据
  console.log(chunk);
}
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
import * as z from "zod";
import OpenAI from "openai";

const openaiClient = new OpenAI();
const modelName = "gpt-4.1-mini";

async function* streamTokens(modelName: string, messages: any[]) {
  const response = await openaiClient.chat.completions.create({
    messages,
    model: modelName,
    stream: true,
  });

  let role: string | null = null;
  for await (const chunk of response) {
    const delta = chunk.choices[0]?.delta;

    if (delta?.role) {
      role = delta.role;
    }

    if (delta?.content) {
      yield { role, content: delta.content };
    }
  }
}

// 这是我们的工具
const getItems = tool(
  async (input, config: LangGraphRunnableConfig) => {
    let response = "";
    for await (const msgChunk of streamTokens(
      modelName,
      [
        {
          role: "user",
          content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
        },
      ]
    )) {
      response += msgChunk.content;
      config.writer?.(msgChunk);
    }
    return response;
  },
  {
    name: "get_items",
    description: "Use this tool to list items one might find in a place you're asked about.",
    schema: z.object({
      place: z.string().describe("The place to look up items for."),
    }),
  }
);

const State = new StateSchema({
  messages: MessagesValue,
});

const callTool: GraphNode<typeof State> = async (state) => {
  const aiMessage = state.messages.at(-1);
  const toolCall = aiMessage.tool_calls?.at(-1);

  const functionName = toolCall?.function?.name;
  if (functionName !== "get_items") {
    throw new Error(`Tool ${functionName} not supported`);
  }

  const functionArguments = toolCall?.function?.arguments;
  const args = JSON.parse(functionArguments);

  const functionResponse = await getItems.invoke(args);
  const toolMessage = {
    tool_call_id: toolCall.id,
    role: "tool",
    name: functionName,
    content: functionResponse,
  };
  return { messages: [toolMessage] };
};

const graph = new StateGraph(State)
  // 这是工具调用图节点
  .addNode("callTool", callTool)
  .addEdge(START, "callTool")
  .compile();
让我们使用包含工具调用的 AIMessage 调用图:
const inputs = {
  messages: [
    {
      content: null,
      role: "assistant",
      tool_calls: [
        {
          id: "1",
          function: {
            arguments: '{"place":"bedroom"}',
            name: "get_items",
          },
          type: "function",
        }
      ],
    }
  ]
};

for await (const chunk of await graph.stream(
  inputs,
  { streamMode: "custom" }
)) {
  console.log(chunk.content + "|");
}

禁用特定聊天模型的流式传输

如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要显式禁用不支持流式传输的模型的流式传输。 初始化模型时设置 streaming: false
import { ChatOpenAI } from "@langchain/openai";

const model = new ChatOpenAI({
  model: "o1-preview",
  // 设置 streaming: false 以禁用聊天模型的流式传输
  streaming: false,
});
并非所有聊天模型集成都支持 streaming 参数。如果您的模型不支持它,请使用 disableStreaming: true。此参数可通过基类在所有聊天模型上使用。