Skip to main content
智能体需要实时展示其工作过程。Agent Server 提供了可恢复的流式传输功能:如果客户端在流传输过程中断开连接(网络切换、标签页休眠、移动端后台运行),重新连接后可以从断点处继续。多种流式模式让您能够控制粒度,从每个步骤后的完整状态快照,到从提供商实时到达的逐令牌 LLM 输出。 LangGraph SDK 允许您从 LangSmith 部署 API 流式传输输出
LangGraph SDK 和 Agent Server 是 LangSmith 的一部分。

基本用法

基本用法示例:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

# 使用名为 "agent" 的已部署图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

# 创建一个流式运行
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
这是一个可以在 Agent Server 中运行的示例图。 更多详情请参阅 LangSmith 快速入门
# graph.py
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)
一旦您拥有一个正在运行的 Agent Server,就可以使用 LangGraph SDK 与其交互。
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用名为 "agent" 的已部署图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

# 创建一个流式运行
async for chunk in client.runs.stream(  # (1)!
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"  # (2)!
):
    print(chunk.data)
  1. client.runs.stream() 方法返回一个迭代器,产生流式输出。
  2. 设置 stream_mode="updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

支持的流模式

模式描述LangGraph 库方法
values在每个超级步骤后流式传输完整的图状态。.stream() / .astream() 配合 stream_mode="values"
updates流式传输图每个步骤后状态的更新。如果同一步骤中有多个更新(例如,运行了多个节点),这些更新会分别流式传输。.stream() / .astream() 配合 stream_mode="updates"
messages-tuple流式传输调用 LLM 的图节点的令牌和元数据(适用于聊天应用)。.stream() / .astream() 配合 stream_mode="messages"
debug在图执行过程中流式传输尽可能多的信息。.stream() / .astream() 配合 stream_mode="debug"
custom从图内部流式传输自定义数据.stream() / .astream() 配合 stream_mode="custom"
events流式传输所有事件(包括图的状态);主要用于迁移大型 LCEL 应用时。.astream_events()

流式传输多种模式

您可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。 流式输出将是 (mode, chunk) 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input=inputs,
    stream_mode=["updates", "custom"]
):
    print(chunk)

流式传输图状态

使用流模式 updatesvalues 来流式传输图执行时的状态。
  • updates 流式传输图每个步骤后状态的更新
  • values 流式传输图每个步骤后状态的完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
  topic: str
  joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
有状态运行 以下示例假设您希望将流式运行的输出持久化到检查点数据库,并且已经创建了一个线程。要创建一个线程:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用名为 "agent" 的已部署图
assistant_id = "agent"
# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]
如果您不需要持久化运行的输出,可以在流式传输时传递 None 而不是 thread_id

流模式:updates

使用此模式仅流式传输每个步骤后节点返回的状态更新。流式输出包括节点名称和更新内容。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="updates"
):
    print(chunk.data)

流模式:values

使用此模式流式传输每个步骤后图的完整状态
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="values"
):
    print(chunk.data)

子图

要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。这将同时流式传输父图和任何子图的输出。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
这是一个可以在 Agent Server 中运行的示例图。 更多详情请参阅 LangSmith 快速入门
# graph.py
from langgraph.graph import START, StateGraph
from typing import TypedDict

# 定义子图
class SubgraphState(TypedDict):
    foo: str  # 注意此键与父图状态共享
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 定义父图
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
一旦您拥有一个正在运行的 Agent Server,就可以使用 LangGraph SDK 与其交互。
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>)

# 使用名为 "agent" 的已部署图
assistant_id = "agent"

# 创建一个线程
thread = await client.threads.create()
thread_id = thread["thread_id"]

async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"foo": "foo"},
    stream_subgraphs=True, # (1)!
    stream_mode="updates",
):
    print(chunk)
  1. 设置 stream_subgraphs=True 以流式传输子图的输出。
注意,我们不仅接收节点更新,还接收命名空间信息,告诉我们正在从哪个图(或子图)流式传输。

调试

使用 debug 流模式在图执行过程中流式传输尽可能多的信息。流式输出包括节点名称和完整状态。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="debug"
):
    print(chunk.data)

LLM 令牌

使用 messages-tuple 流模式从图的任何部分(包括节点、工具、子图或任务)逐令牌流式传输大语言模型(LLM)的输出。 messages-tuple 模式的流式输出是一个元组 (message_chunk, metadata),其中:
  • message_chunk:来自 LLM 的令牌或消息片段。
  • metadata:包含图节点和 LLM 调用详细信息的字典。
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="gpt-4.1-mini")

def call_model(state: MyState):
    """调用 LLM 生成关于某个主题的笑话"""
    model_response = model.invoke( # (1)!
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)
  1. 注意,即使 LLM 使用 invoke 而不是 stream 运行,消息事件也会被发出。
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="messages-tuple",
):
    if chunk.event != "messages":
        continue

    message_chunk, metadata = chunk.data  # (1)!
    if message_chunk["content"]:
        print(message_chunk["content"], end="|", flush=True)
  1. “messages-tuple” 流模式返回一个元组迭代器 (message_chunk, metadata),其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个字典,包含有关调用 LLM 的图节点和其他信息。

过滤 LLM 令牌

流式传输自定义数据

要发送自定义用户定义的数据
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"query": "example"},
    stream_mode="custom"
):
    print(chunk.data)

流式传输事件

要流式传输所有事件,包括图的状态:
async for chunk in client.runs.stream(
    thread_id,
    assistant_id,
    input={"topic": "ice cream"},
    stream_mode="events"
):
    print(chunk.data)

无状态运行

如果您不希望将流式运行的输出持久化到检查点数据库,可以在不创建线程的情况下创建无状态运行:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.stream(
    None,  # (1)!
    assistant_id,
    input=inputs,
    stream_mode="updates"
):
    print(chunk.data)
  1. 我们传递 None 而不是 thread_id UUID。

加入并流式传输

LangSmith 允许您加入一个活跃的后台运行并从中流式传输输出。为此,您可以使用 LangGraph SDK 的 client.runs.join_stream 方法:
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)

async for chunk in client.runs.join_stream(
    thread_id,
    run_id,  # (1)!
):
    print(chunk)
  1. 这是您要加入的现有运行的 run_id
输出未缓冲 当您使用 .join_stream 时,输出不会被缓冲,因此在加入之前产生的任何输出都不会被接收。

API 参考

关于 API 使用和实现,请参阅 API 参考