智能体需要实时展示其工作过程。Agent Server 提供了可恢复的流式传输功能:如果客户端在流传输过程中断开连接(网络切换、标签页休眠、移动端后台运行),重新连接后可以从断点处继续。多种流式模式让您能够控制粒度,从每个步骤后的完整状态快照,到从提供商实时到达的逐令牌 LLM 输出。
LangGraph SDK 允许您从 LangSmith 部署 API 流式传输输出 。
LangGraph SDK 和 Agent Server 是 LangSmith 的一部分。
基本用法
基本用法示例:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
# 使用名为 "agent" 的已部署图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
# 创建一个流式运行
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = inputs ,
stream_mode = "updates"
):
print ( chunk . data )
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
// 使用名为 "agent" 的已部署图
const assistantID = "agent";
// 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"];
// 创建一个流式运行
const streamResponse = client.runs.stream(
threadID,
assistantID,
{
input ,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
创建一个线程: curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
创建一个流式运行: curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : \" updates \"
}"
这是一个可以在 Agent Server 中运行的示例图。
更多详情请参阅 LangSmith 快速入门 。 # graph.py
from typing import TypedDict
from langgraph . graph import StateGraph , START , END
class State ( TypedDict ):
topic : str
joke : str
def refine_topic ( state : State ):
return { "topic" : state [ " topic " ] + " and cats" }
def generate_joke ( state : State ):
return { "joke" : f "This is a joke about { state [ ' topic ' ] } " }
graph = (
StateGraph ( State )
. add_node ( refine_topic )
. add_node ( generate_joke )
. add_edge ( START , "refine_topic" )
. add_edge ( "refine_topic" , "generate_joke" )
. add_edge ( "generate_joke" , END )
. compile ()
)
一旦您拥有一个正在运行的 Agent Server,就可以使用
LangGraph SDK 与其交互。 from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > )
# 使用名为 "agent" 的已部署图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
# 创建一个流式运行
async for chunk in client . runs . stream ( # (1)!
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "updates" # (2)!
):
print ( chunk . data )
client.runs.stream() 方法返回一个迭代器,产生流式输出。
设置 stream_mode="updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式 。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > });
# 使用名为 "agent" 的已部署图
const assistantID = "agent";
# 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"];
# 创建一个流式运行
const streamResponse = client.runs.stream( // (1)!
threadID,
assistantID,
{
input: { topic : "ice cream" },
streamMode: "updates" // (2)!
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
client.runs.stream() 方法返回一个迭代器,产生流式输出。
设置 streamMode: "updates" 以仅流式传输每个节点后图状态的更新。其他流模式也可用。详情请参阅支持的流模式 。
创建一个线程: curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
创建一个流式运行: curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" updates \"
}"
{ 'run_id' : '1f02c2b3-3cef-68de-b720-eec2a4a8e920' , 'attempt' : 1 }
{ 'refine_topic' : { 'topic' : 'ice cream and cats' }}
{ 'generate_joke' : { 'joke' : 'This is a joke about ice cream and cats' }}
支持的流模式
模式 描述 LangGraph 库方法 values在每个超级步骤 后流式传输完整的图状态。 .stream() / .astream() 配合 stream_mode="values"updates流式传输图每个步骤后状态的更新。如果同一步骤中有多个更新(例如,运行了多个节点),这些更新会分别流式传输。 .stream() / .astream() 配合 stream_mode="updates"messages-tuple流式传输调用 LLM 的图节点的令牌和元数据(适用于聊天应用)。 .stream() / .astream() 配合 stream_mode="messages"debug在图执行过程中流式传输尽可能多的信息。 .stream() / .astream() 配合 stream_mode="debug"custom从图内部流式传输自定义数据 .stream() / .astream() 配合 stream_mode="custom"events流式传输所有事件(包括图的状态);主要用于迁移大型 LCEL 应用时。 .astream_events()
流式传输多种模式
您可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。
流式输出将是 (mode, chunk) 的元组,其中 mode 是流模式的名称,chunk 是该模式流式传输的数据。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = inputs ,
stream_mode = [ "updates" , "custom" ]
):
print ( chunk )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input ,
streamMode : [ "updates" , "custom" ]
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : [
\" updates \"
\" custom \"
]
}"
流式传输图状态
使用流模式 updates 和 values 来流式传输图执行时的状态。
updates 流式传输图每个步骤后状态的更新 。
values 流式传输图每个步骤后状态的完整值 。
from typing import TypedDict
from langgraph . graph import StateGraph , START , END
class State ( TypedDict ):
topic : str
joke : str
def refine_topic ( state : State ):
return { "topic" : state [ " topic " ] + " and cats" }
def generate_joke ( state : State ):
return { "joke" : f "This is a joke about { state [ ' topic ' ] } " }
graph = (
StateGraph ( State )
. add_node ( refine_topic )
. add_node ( generate_joke )
. add_edge ( START , "refine_topic" )
. add_edge ( "refine_topic" , "generate_joke" )
. add_edge ( "generate_joke" , END )
. compile ()
)
有状态运行
以下示例假设您希望将流式运行的输出持久化到检查点 数据库,并且已经创建了一个线程。要创建一个线程:from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > )
# 使用名为 "agent" 的已部署图
assistant_id = "agent"
# 创建一个线程
thread = await client . threads . create ()
thread_id = thread [ " thread_id " ]
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > });
// 使用名为 "agent" 的已部署图
const assistantID = "agent";
// 创建一个线程
const thread = await client.threads.create();
const threadID = thread["thread_id"]
curl --request POST \
--url < DEPLOYMENT_UR L > /threads \
--header 'Content-Type: application/json' \
--data '{}'
如果您不需要持久化运行的输出,可以在流式传输时传递 None 而不是 thread_id。
流模式:updates
使用此模式仅流式传输每个步骤后节点返回的状态更新 。流式输出包括节点名称和更新内容。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "updates"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "updates"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" updates \"
}"
流模式:values
使用此模式流式传输每个步骤后图的完整状态 。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "values"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "values"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" values \"
}"
要在流式输出中包含子图 的输出,可以在父图的 .stream() 方法中设置 subgraphs=True。这将同时流式传输父图和任何子图的输出。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "foo" : "foo" },
stream_subgraphs = True , # (1)!
stream_mode = "updates" ,
):
print ( chunk )
设置 stream_subgraphs=True 以流式传输子图的输出。
使用 debug 流模式在图执行过程中流式传输尽可能多的信息。流式输出包括节点名称和完整状态。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "debug"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "debug"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" debug \"
}"
LLM 令牌
使用 messages-tuple 流模式从图的任何部分(包括节点、工具、子图或任务)逐令牌 流式传输大语言模型(LLM)的输出。
messages-tuple 模式 的流式输出是一个元组 (message_chunk, metadata),其中:
message_chunk:来自 LLM 的令牌或消息片段。
metadata:包含图节点和 LLM 调用详细信息的字典。
from dataclasses import dataclass
from langchain . chat_models import init_chat_model
from langgraph . graph import StateGraph , START
@dataclass
class MyState :
topic : str
joke : str = ""
model = init_chat_model ( model = "gpt-4.1-mini" )
def call_model ( state : MyState ):
"""调用 LLM 生成关于某个主题的笑话"""
model_response = model . invoke ( # (1)!
[
{ "role" : "user" , "content" : f "Generate a joke about { state . topic } " }
]
)
return { "joke" : model_response . content }
graph = (
StateGraph ( MyState )
. add_node ( call_model )
. add_edge ( START , "call_model" )
. compile ()
)
注意,即使 LLM 使用 invoke 而不是 stream 运行,消息事件也会被发出。
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "messages-tuple" ,
):
if chunk . event != "messages" :
continue
message_chunk , metadata = chunk . data # (1)!
if message_chunk [ " content " ]:
print ( message_chunk [ " content " ], end = "|" , flush = True )
“messages-tuple” 流模式返回一个元组迭代器 (message_chunk, metadata),其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个字典,包含有关调用 LLM 的图节点和其他信息。
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "messages-tuple"
}
) ;
for await ( const chunk of streamResponse) {
if (chunk . event !== "messages" ) {
continue ;
}
console . log (chunk . data[ 0 ][ "content" ]) ; // (1)!
}
“messages-tuple” 流模式返回一个元组迭代器 (message_chunk, metadata),其中 message_chunk 是 LLM 流式传输的令牌,metadata 是一个字典,包含有关调用 LLM 的图节点和其他信息。
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" messages-tuple \"
}"
过滤 LLM 令牌
流式传输自定义数据
要发送自定义用户定义的数据 :
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "query" : "example" },
stream_mode = "custom"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { query : "example" },
streamMode : "custom"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" query \" : \" example \" },
\" stream_mode \" : \" custom \"
}"
流式传输事件
要流式传输所有事件,包括图的状态:
async for chunk in client . runs . stream (
thread_id ,
assistant_id ,
input = { "topic" : "ice cream" },
stream_mode = "events"
):
print ( chunk . data )
const streamResponse = client . runs . stream (
threadID ,
assistantID ,
{
input : { topic : "ice cream" },
streamMode : "events"
}
) ;
for await ( const chunk of streamResponse) {
console . log (chunk . data) ;
}
curl --request POST \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/stream \
--header 'Content-Type: application/json' \
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : { \" topic \" : \" ice cream \" },
\" stream_mode \" : \" events \"
}"
无状态运行
如果您不希望将流式运行的输出持久化到检查点 数据库,可以在不创建线程的情况下创建无状态运行:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
async for chunk in client . runs . stream (
None , # (1)!
assistant_id ,
input = inputs ,
stream_mode = "updates"
):
print ( chunk . data )
我们传递 None 而不是 thread_id UUID。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
// 创建一个流式运行
const streamResponse = client.runs.stream(
null, // (1)!
assistantID,
{
input ,
streamMode: "updates"
}
);
for await (const chunk of streamResponse) {
console . log (chunk . data);
}
我们传递 None 而不是 thread_id UUID。
curl --request POST \
--url < DEPLOYMENT_UR L > /runs/stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
--data "{
\" assistant_id \" : \" agent \" ,
\" input \" : <inputs>,
\" stream_mode \" : \" updates \"
}"
加入并流式传输
LangSmith 允许您加入一个活跃的后台运行 并从中流式传输输出。为此,您可以使用 LangGraph SDK 的 client.runs.join_stream 方法:
from langgraph_sdk import get_client
client = get_client ( url =< DEPLOYMENT_URL > , api_key =< API_KEY > )
async for chunk in client . runs . join_stream (
thread_id ,
run_id , # (1)!
):
print ( chunk )
这是您要加入的现有运行的 run_id。
import { Client } from "@langchain/langgraph-sdk" ;
const client = new Client ( { apiUrl : < DEPLOYMENT_URL > , apiKey: < API_KEY > });
const streamResponse = client.runs.joinStream(
threadID,
runId // (1)!
);
for await (const chunk of streamResponse) {
console . log (chunk);
}
这是您要加入的现有运行的 run_id。
curl --request GET \
--url < DEPLOYMENT_UR L > /threads/ < THREAD_I D > /runs/ < RUN_I D > /stream \
--header 'Content-Type: application/json' \
--header 'x-api-key: <API_KEY>'
输出未缓冲
当您使用 .join_stream 时,输出不会被缓冲,因此在加入之前产生的任何输出都不会被接收。
API 参考
关于 API 使用和实现,请参阅 API 参考 。