LangGraph 实现了一个流式系统,用于展示实时更新。流式传输对于增强基于 LLM 构建的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式传输也能显著改善用户体验 (UX),特别是在处理 LLM 延迟时。
基本用法
LangGraph 图暴露了 stream 方法来生成流式输出作为迭代器。
for await (const chunk of await graph.stream(inputs, {
streamMode: "updates",
})) {
console.log(chunk);
}
流模式
将以下一个或多个流模式作为列表传递给 stream 方法:
| 模式 | 描述 |
|---|
| values | 每一步后的完整状态。 |
| updates | 每一步后的状态更新。同一步骤中的多个更新将单独流式传输。 |
| messages | LLM 调用的 (LLM 令牌,元数据) 2-元组。 |
| custom | 通过 writer 配置参数从节点发出的自定义数据。 |
| tools | 工具调用生命周期事件 (on_tool_start, on_tool_event, on_tool_end, on_tool_error)。 |
| debug | 图执行过程中的所有可用信息。 |
图状态
使用流模式 updates 和 values 来流式传输图执行时的状态。
updates 流式传输图每一步后对状态的更新。
values 流式传输图每一步后的状态完整值。
import { StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod/v4";
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
});
const graph = new StateGraph(State)
.addNode("refineTopic", (state) => {
return { topic: state.topic + " and cats" };
})
.addNode("generateJoke", (state) => {
return { joke: `This is a joke about ${state.topic}` };
})
.addEdge(START, "refineTopic")
.addEdge("refineTopic", "generateJoke")
.addEdge("generateJoke", END)
.compile();
使用此功能仅流式传输节点在每一步后返回的状态更新。流式输出包括节点名称以及更新内容。for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "updates" }
)) {
for (const [nodeName, state] of Object.entries(chunk)) {
console.log(`Node ${nodeName} updated:`, state);
}
}
使用此功能流式传输每一步后图的完整状态。for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "values" }
)) {
console.log(`topic: ${chunk.topic}, joke: ${chunk.joke}`);
}
LLM 令牌
使用 messages 流式模式从图的任何部分(包括节点、工具、子图或任务)逐个令牌地流式传输大型语言模型 (LLM) 输出。
messages 模式 的流式输出是一个元组 [message_chunk, metadata],其中:
message_chunk: LLM 的令牌或消息段。
metadata: 包含有关图节点和 LLM 调用详细信息的字典。
如果您的 LLM 不可用作 LangChain 集成,则可以使用 custom 模式流式传输其输出。详见 与任何 LLM 配合使用。
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
const MyState = new StateSchema({
topic: z.string(),
joke: z.string().default(""),
});
const model = new ChatOpenAI({ model: "gpt-4.1-mini" });
const callModel: GraphNode<typeof MyState> = async (state) => {
// 调用 LLM 生成关于主题的笑话
// 注意,即使 LLM 是使用 .invoke 而不是 .stream 运行,也会发出消息事件
const modelResponse = await model.invoke([
{ role: "user", content: `Generate a joke about ${state.topic}` },
]);
return { joke: modelResponse.content };
};
const graph = new StateGraph(MyState)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// "messages" 流模式返回一个元组迭代器 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [messageChunk, metadata] of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "messages" }
)) {
if (messageChunk.content) {
console.log(messageChunk.content + "|");
}
}
按 LLM 调用筛选
您可以将 tags 关联到 LLM 调用,以便按 LLM 调用筛选流式传输的令牌。
import { ChatOpenAI } from "@langchain/openai";
// model1 标记为 "joke"
const model1 = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ['joke']
});
// model2 标记为 "poem"
const model2 = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ['poem']
});
const graph = // ... 定义使用这些 LLM 的图
// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// 元数据包含有关 LLM 调用的信息,包括标签
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 根据元数据中的 tags 字段筛选流式传输的令牌,仅包含
// 来自带有 "joke" 标签的 LLM 调用的令牌
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
// jokeModel 标记为 "joke"
const jokeModel = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ["joke"]
});
// poemModel 标记为 "poem"
const poemModel = new ChatOpenAI({
model: "gpt-4.1-mini",
tags: ["poem"]
});
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const callModel: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
console.log("Writing joke...");
const jokeResponse = await jokeModel.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
console.log("\n\nWriting poem...");
const poemResponse = await poemModel.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return {
joke: jokeResponse.content,
poem: poemResponse.content
};
};
const graph = new StateGraph(State)
.addNode("callModel", callModel)
.addEdge(START, "callModel")
.compile();
// streamMode 设置为 "messages" 以流式传输 LLM 令牌
// 元数据包含有关 LLM 调用的信息,包括标签
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 根据元数据中的 tags 字段筛选流式传输的令牌,仅包含
// 来自带有 "joke" 标签的 LLM 调用的令牌
if (metadata.tags?.includes("joke")) {
console.log(msg.content + "|");
}
}
从流中省略消息
使用 nostream 标签完全排除 LLM 输出。标记为 nostream 的调用仍然运行并产生输出;它们的令牌只是在 messages 模式下不发出。
这在以下情况下很有用:
- 您需要 LLM 输出进行内部处理(例如结构化输出),但不想将其流式传输到客户端
- 您通过不同的渠道流式传输相同的内容(例如自定义 UI 消息),并希望避免
messages 流中的重复输出
import { ChatAnthropic } from "@langchain/anthropic";
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import * as z from "zod";
const streamModel = new ChatAnthropic({ model: "claude-3-haiku-20240307" });
const internalModel = new ChatAnthropic({
model: "claude-3-haiku-20240307",
}).withConfig({
tags: ["nostream"],
});
const State = new StateSchema({
topic: z.string(),
answer: z.string().optional(),
notes: z.string().optional(),
});
const writeAnswer = async (state: typeof State.State) => {
const r = await streamModel.invoke([
{ role: "user", content: `Reply briefly about ${state.topic}` },
]);
return { answer: r.content };
};
const internalNotes = async (state: typeof State.State) => {
// Tokens from this model are omitted from streamMode: "messages" because of nostream
const r = await internalModel.invoke([
{ role: "user", content: `Private notes on ${state.topic}` },
]);
return { notes: r.content };
};
const graph = new StateGraph(State)
.addNode("writeAnswer", writeAnswer)
.addNode("internal_notes", internalNotes)
.addEdge(START, "writeAnswer")
.addEdge("writeAnswer", "internal_notes")
.compile();
const stream = await graph.stream({ topic: "AI" }, { streamMode: "messages" });
按节点筛选
要仅从特定节点流式传输令牌,请使用 stream_mode="messages" 并根据流式传输元数据中的 langgraph_node 字段筛选输出:
// "messages" 流模式返回一个元组 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [msg, metadata] of await graph.stream(
inputs,
{ streamMode: "messages" }
)) {
// 根据元数据中的 langgraph_node 字段筛选流式传输的令牌
// 仅包含来自指定节点的令牌
if (msg.content && metadata.langgraph_node === "some_node_name") {
// ...
}
}
import { ChatOpenAI } from "@langchain/openai";
import { StateGraph, StateSchema, GraphNode, START } from "@langchain/langgraph";
import * as z from "zod";
const model = new ChatOpenAI({ model: "gpt-4.1-mini" });
const State = new StateSchema({
topic: z.string(),
joke: z.string(),
poem: z.string(),
});
const writeJoke: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
const jokeResponse = await model.invoke([
{ role: "user", content: `Write a joke about ${topic}` }
]);
return { joke: jokeResponse.content };
};
const writePoem: GraphNode<typeof State> = async (state) => {
const topic = state.topic;
const poemResponse = await model.invoke([
{ role: "user", content: `Write a short poem about ${topic}` }
]);
return { poem: poemResponse.content };
};
const graph = new StateGraph(State)
.addNode("writeJoke", writeJoke)
.addNode("writePoem", writePoem)
// 同时编写笑话和诗歌
.addEdge(START, "writeJoke")
.addEdge(START, "writePoem")
.compile();
// "messages" 流模式返回一个元组 [messageChunk, metadata]
// 其中 messageChunk 是 LLM 流式传输的令牌,metadata 是一个字典
// 包含有关调用 LLM 的图节点和其他信息
for await (const [msg, metadata] of await graph.stream(
{ topic: "cats" },
{ streamMode: "messages" }
)) {
// 根据元数据中的 langgraph_node 字段筛选流式传输的令牌
// 仅包含来自 writePoem 节点的令牌
if (msg.content && metadata.langgraph_node === "writePoem") {
console.log(msg.content + "|");
}
}
自定义数据
要从 LangGraph 节点或工具内部发送自定义用户定义的数据,请按照以下步骤操作:
- 使用
LangGraphRunnableConfig 中的 writer 参数发出自定义数据。
- 调用
.stream() 时设置 streamMode: "custom" 以在流中获取自定义数据。您可以组合多种模式(例如 ["updates", "custom"]),但必须至少有一个是 "custom"。
import { StateGraph, StateSchema, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({
query: z.string(),
answer: z.string(),
});
const node: GraphNode<typeof State> = async (state, config) => {
// 使用 writer 发出自定义键值对(例如进度更新)
config.writer({ custom_key: "Generating custom data inside node" });
return { answer: "some data" };
};
const graph = new StateGraph(State)
.addNode("node", node)
.addEdge(START, "node")
.compile();
const inputs = { query: "example" };
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
import { tool } from "@langchain/core/tools";
import { LangGraphRunnableConfig } from "@langchain/langgraph";
import * as z from "zod";
const queryDatabase = tool(
async (input, config: LangGraphRunnableConfig) => {
// 使用 writer 发出自定义键值对(例如进度更新)
config.writer({ data: "Retrieved 0/100 records", type: "progress" });
// 执行查询
// 发出另一个自定义键值对
config.writer({ data: "Retrieved 100/100 records", type: "progress" });
return "some-answer";
},
{
name: "query_database",
description: "Query the database.",
schema: z.object({
query: z.string().describe("The query to execute."),
}),
}
);
const graph = // ... 定义使用此工具的图
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(inputs, { streamMode: "custom" })) {
console.log(chunk);
}
工具进度
使用 tools 流模式接收工具执行的实时生命周期事件。这有助于在工具运行时在 UI 中显示进度指示器、部分结果和错误状态。
tools 流模式发出四种事件类型:
| 事件 | 何时 | 负载 |
|---|
on_tool_start | 工具调用开始 | name, input, toolCallId |
on_tool_event | 工具产生中间数据 | name, data, toolCallId |
on_tool_end | 工具返回最终结果 | name, output, toolCallId |
on_tool_error | 工具抛出错误 | name, error, toolCallId |
定义流式传输进度的工具
要发出 on_tool_event 事件,请将您的工具函数定义为异步生成器 (async function*)。每个 yield 将中间数据发送到流,return 值用作工具的最终结果。
import { tool } from "@langchain/core/tools";
import { z } from "zod/v4";
const searchFlights = tool(
async function* (input) {
const airlines = ["United", "Delta", "American", "JetBlue"];
const completed: string[] = [];
for (let i = 0; i < airlines.length; i++) {
await new Promise((r) => setTimeout(r, 500));
completed.push(airlines[i]);
// 每个 yield 向流发出一个 on_tool_event
yield {
message: `Searching ${airlines[i]}...`,
progress: (i + 1) / airlines.length,
completed,
};
}
// 返回值成为工具结果 (ToolMessage.content)
return JSON.stringify({
flights: [
{ airline: "United", price: 450, duration: "5h 30m" },
{ airline: "Delta", price: 520, duration: "5h 15m" },
],
});
},
{
name: "search_flights",
description: "Search for available flights to a destination.",
schema: z.object({
destination: z.string(),
date: z.string(),
}),
}
);
现有的返回 Promise 的工具完全兼容。它们发出 on_tool_start 和 on_tool_end 事件,但不发出 on_tool_event 事件。
在服务端消费工具事件
向 graph.stream() 传递 streamMode: ["tools"](或与其他模式组合):
for await (const [mode, chunk] of await graph.stream(
{ messages: [{ role: "user", content: "Find flights to Tokyo" }] },
{ streamMode: ["updates", "tools"] }
)) {
if (mode === "tools") {
switch (chunk.event) {
case "on_tool_start":
console.log(`Tool started: ${chunk.name}`, chunk.input);
break;
case "on_tool_event":
console.log(`Tool progress: ${chunk.name}`, chunk.data);
break;
case "on_tool_end":
console.log(`Tool finished: ${chunk.name}`, chunk.output);
break;
case "on_tool_error":
console.error(`Tool failed: ${chunk.name}`, chunk.error);
break;
}
}
}
在 React 中使用 useStream 进行工具进度
来自 @langchain/langgraph-sdk/react 的 useStream 钩子在您将 "tools" 包含在流模式中时暴露 toolProgress 数组。每个条目都是一个 ToolProgress 对象,跟踪运行中工具的当前状态:
| 字段 | 描述 |
|---|
name | 工具名称 |
state | 当前生命周期状态:"starting", "running", "completed", 或 "error" |
toolCallId | 来自 LLM 的工具调用 ID |
input | 工具的输入参数 |
data | on_tool_event 的最新产生数据 |
result | 最终结果,在 on_tool_end 上设置 |
error | 错误,在 on_tool_error 上设置 |
import { useStream } from "@langchain/langgraph-sdk/react";
function Chat() {
const stream = useStream({
assistantId: "my-agent",
streamMode: ["values", "tools"],
});
// 筛选处于活动状态的工具
const activeTools = stream.toolProgress.filter(
(t) => t.state === "starting" || t.state === "running"
);
return (
<div>
{stream.messages.map((msg) => (
<MessageBubble key={msg.id} message={msg} />
))}
{/* 显示运行中工具的进度卡片 */}
{activeTools.map((tool) => (
<ToolProgressCard
key={tool.toolCallId ?? tool.name}
name={tool.name}
state={tool.state}
data={tool.data}
/>
))}
</div>
);
}
此示例展示了一个完整的代理,其异步生成器工具将搜索进度流式传输到 React UI。代理定义:import { tool } from "@langchain/core/tools";
import { ChatOpenAI } from "@langchain/openai";
import { createAgent } from "@langchain/langgraph";
import { MemorySaver } from "@langchain/langgraph-checkpoint-memory";
import { z } from "zod/v4";
const searchFlights = tool(
async function* (input) {
const airlines = ["United", "Delta", "American", "JetBlue"];
const completed: string[] = [];
for (let i = 0; i < airlines.length; i++) {
await new Promise((r) => setTimeout(r, 600));
completed.push(`${airlines[i]}: checked`);
yield {
message: `Searching ${airlines[i]}...`,
progress: (i + 1) / airlines.length,
completed,
};
}
return JSON.stringify({
flights: [
{ airline: "United", price: 450, duration: "5h 30m" },
{ airline: "Delta", price: 520, duration: "5h 15m" },
],
});
},
{
name: "search_flights",
description: "Search for available flights.",
schema: z.object({
destination: z.string(),
departure_date: z.string(),
}),
}
);
const checkHotels = tool(
async function* (input) {
const hotels = ["Grand Hyatt", "Marriott", "Hilton"];
const completed: string[] = [];
for (let i = 0; i < hotels.length; i++) {
await new Promise((r) => setTimeout(r, 400));
completed.push(`${hotels[i]}: available`);
yield {
message: `Checking ${hotels[i]}...`,
progress: (i + 1) / hotels.length,
completed,
};
}
return JSON.stringify({
hotels: [
{ name: "Grand Hyatt", price: 250, rating: 4.5 },
{ name: "Marriott", price: 180, rating: 4.2 },
],
});
},
{
name: "check_hotels",
description: "Check hotel availability.",
schema: z.object({
city: z.string(),
check_in: z.string(),
nights: z.number(),
}),
}
);
export const agent = createAgent({
model: new ChatOpenAI({ model: "gpt-4o-mini" }),
tools: [searchFlights, checkHotels],
checkpointer: new MemorySaver(),
});
带有进度卡片的 React 组件:import { useStream } from "@langchain/langgraph-sdk/react";
function TravelPlanner() {
const stream = useStream<typeof agent>({
assistantId: "travel-agent",
streamMode: ["values", "tools"],
});
const activeTools = stream.toolProgress.filter(
(t) => t.state === "starting" || t.state === "running"
);
return (
<div>
{stream.messages.map((msg) => (
<div key={msg.id}>{msg.content}</div>
))}
{activeTools.map((tool) => {
const data = tool.data as {
message?: string;
progress?: number;
completed?: string[];
} | undefined;
return (
<div key={tool.toolCallId ?? tool.name}>
<strong>{tool.name}</strong>
{data?.message && <p>{data.message}</p>}
{data?.progress != null && (
<div style={{ width: "100%", background: "#eee" }}>
<div
style={{
width: `${data.progress * 100}%`,
background: "#4CAF50",
height: 8,
transition: "width 0.3s ease",
}}
/>
</div>
)}
{data?.completed?.map((step, i) => (
<div key={i}>✓ {step}</div>
))}
</div>
);
})}
</div>
);
}
两种流模式都可以显示工具进度,但它们服务于不同的目的:
tools — 自动发出结构化的生命周期事件 (on_tool_start, on_tool_event, on_tool_end, on_tool_error),无需更改工具中的代码,只需使用 async function*。useStream 钩子开箱即用即可提供响应式 toolProgress 数组。
custom — 让您使用 config.writer() 完全控制发出什么数据以及何时发出。当您需要的自由格式数据不映射到工具生命周期,或者您希望从节点(而不仅仅是工具)流式传输时使用此功能。
子图输出
要将 子图 的输出包含在流式输出中,您可以在父图的 .stream() 方法中设置 subgraphs: true。这将流式传输来自父图和任何子图的输出。
输出将作为元组 [namespace, data] 流式传输,其中 namespace 是一个元组,包含调用子图的节点路径,例如 ["parent_node:<task_id>", "child_node:<task_id>"]。
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
// 设置 subgraphs: true 以流式传输来自子图的输出
subgraphs: true,
streamMode: "updates",
}
)) {
console.log(chunk);
}
import { StateGraph, StateSchema, START } from "@langchain/langgraph";
import { z } from "zod/v4";
// 定义子图
const SubgraphState = new StateSchema({
foo: z.string(), // 注意此键与父图状态共享
bar: z.string(),
});
const subgraphBuilder = new StateGraph(SubgraphState)
.addNode("subgraphNode1", (state) => {
return { bar: "bar" };
})
.addNode("subgraphNode2", (state) => {
return { foo: state.foo + state.bar };
})
.addEdge(START, "subgraphNode1")
.addEdge("subgraphNode1", "subgraphNode2");
const subgraph = subgraphBuilder.compile();
// 定义父图
const ParentState = new StateSchema({
foo: z.string(),
});
const builder = new StateGraph(ParentState)
.addNode("node1", (state) => {
return { foo: "hi! " + state.foo };
})
.addNode("node2", subgraph)
.addEdge(START, "node1")
.addEdge("node1", "node2");
const graph = builder.compile();
for await (const chunk of await graph.stream(
{ foo: "foo" },
{
streamMode: "updates",
// 设置 subgraphs: true 以流式传输来自子图的输出
subgraphs: true,
}
)) {
console.log(chunk);
}
[[], {'node1': {'foo': 'hi! foo'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode1': {'bar': 'bar'}}]
[['node2:dfddc4ba-c3c5-6887-5012-a243b5b377c2'], {'subgraphNode2': {'foo': 'hi! foobar'}}]
[[], {'node2': {'foo': 'hi! foobar'}}]
注意 我们不仅接收节点更新,还接收命名空间,这告诉我们正在从哪个图(或子图)流式传输。
使用 debug 流模式在图执行过程中尽可能多地流式传输信息。流式输出包括节点名称以及完整状态。
for await (const chunk of await graph.stream(
{ topic: "ice cream" },
{ streamMode: "debug" }
)) {
console.log(chunk);
}
同时使用多种模式
您可以将数组作为 streamMode 参数传递以同时流式传输多种模式。
流式输出将是 [mode, chunk] 元组,其中 mode 是流模式名称,chunk 是该模式流式传输的数据。
for await (const [mode, chunk] of await graph.stream(inputs, {
streamMode: ["updates", "custom"],
})) {
console.log(chunk);
}
与任何 LLM 配合使用
您可以使用 streamMode: "custom" 从任何 LLM API 流式传输数据 — 即使该 API 未实现 LangChain 聊天模型接口。
这使您能够集成原始 LLM 客户端或提供自己流式接口的外部服务,使 LangGraph 对于自定义设置高度灵活。
import { StateGraph, GraphNode, StateSchema } from "@langchain/langgraph";
import * as z from "zod";
const State = new StateSchema({ result: z.string() });
const callArbitraryModel: GraphNode<typeof State> = async (state, config) => {
// 示例节点,调用任意模型并流式传输输出
// 假设您有一个流式客户端,它产生块
// 使用您的自定义流式客户端生成 LLM 令牌
for await (const chunk of yourCustomStreamingClient(state.topic)) {
// 使用 writer 将自定义数据发送到流
config.writer({ custom_llm_chunk: chunk });
}
return { result: "completed" };
};
const graph = new StateGraph(State)
.addNode("callArbitraryModel", callArbitraryModel)
// 根据需要添加其他节点和边
.compile();
// 设置 streamMode: "custom" 以在流中接收自定义数据
for await (const chunk of await graph.stream(
{ topic: "cats" },
{ streamMode: "custom" }
)) {
// 块将包含从 llm 流式传输的自定义数据
console.log(chunk);
}
import { StateGraph, StateSchema, MessagesValue, GraphNode, START, LangGraphRunnableConfig } from "@langchain/langgraph";
import { tool } from "@langchain/core/tools";
import * as z from "zod";
import OpenAI from "openai";
const openaiClient = new OpenAI();
const modelName = "gpt-4.1-mini";
async function* streamTokens(modelName: string, messages: any[]) {
const response = await openaiClient.chat.completions.create({
messages,
model: modelName,
stream: true,
});
let role: string | null = null;
for await (const chunk of response) {
const delta = chunk.choices[0]?.delta;
if (delta?.role) {
role = delta.role;
}
if (delta?.content) {
yield { role, content: delta.content };
}
}
}
// 这是我们的工具
const getItems = tool(
async (input, config: LangGraphRunnableConfig) => {
let response = "";
for await (const msgChunk of streamTokens(
modelName,
[
{
role: "user",
content: `Can you tell me what kind of items i might find in the following place: '${input.place}'. List at least 3 such items separating them by a comma. And include a brief description of each item.`,
},
]
)) {
response += msgChunk.content;
config.writer?.(msgChunk);
}
return response;
},
{
name: "get_items",
description: "Use this tool to list items one might find in a place you're asked about.",
schema: z.object({
place: z.string().describe("The place to look up items for."),
}),
}
);
const State = new StateSchema({
messages: MessagesValue,
});
const callTool: GraphNode<typeof State> = async (state) => {
const aiMessage = state.messages.at(-1);
const toolCall = aiMessage.tool_calls?.at(-1);
const functionName = toolCall?.function?.name;
if (functionName !== "get_items") {
throw new Error(`Tool ${functionName} not supported`);
}
const functionArguments = toolCall?.function?.arguments;
const args = JSON.parse(functionArguments);
const functionResponse = await getItems.invoke(args);
const toolMessage = {
tool_call_id: toolCall.id,
role: "tool",
name: functionName,
content: functionResponse,
};
return { messages: [toolMessage] };
};
const graph = new StateGraph(State)
// 这是工具调用图节点
.addNode("callTool", callTool)
.addEdge(START, "callTool")
.compile();
让我们使用包含工具调用的 AIMessage 调用图:const inputs = {
messages: [
{
content: null,
role: "assistant",
tool_calls: [
{
id: "1",
function: {
arguments: '{"place":"bedroom"}',
name: "get_items",
},
type: "function",
}
],
}
]
};
for await (const chunk of await graph.stream(
inputs,
{ streamMode: "custom" }
)) {
console.log(chunk.content + "|");
}
禁用特定聊天模型的流式传输
如果您的应用程序混合了支持流式传输和不支持流式传输的模型,您可能需要显式禁用不支持流式传输的模型的流式传输。
初始化模型时设置 streaming: false。
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({
model: "o1-preview",
// 设置 streaming: false 以禁用聊天模型的流式传输
streaming: false,
});
并非所有聊天模型集成都支持 streaming 参数。如果您的模型不支持它,请使用 disableStreaming: true。此参数可通过基类在所有聊天模型上使用。