- 流式子智能体进度——在每个子智能体并行运行时跟踪其执行情况。
- 流式传输 LLM 令牌——从主智能体和每个子智能体流式传输令牌。
- 流式传输工具调用——查看子智能体执行过程中的工具调用及结果。
- 流式传输自定义更新——从子智能体节点内部发出用户定义的信号。
启用子图流式传输
深度智能体使用 LangGraph 的子图流式传输来呈现子智能体执行中的事件。要接收子智能体事件,请在流式传输时启用stream_subgraphs。
from deepagents import create_deep_agent
agent = create_deep_agent(
system_prompt="你是一个有用的研究助手",
subagents=[
{
"name": "researcher",
"description": "深度研究一个主题",
"system_prompt": "你是一个细致的研究者。",
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "研究量子计算进展"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
if chunk["ns"]:
# 子智能体事件 - 命名空间标识来源
print(f"[子智能体: {chunk['ns']}]")
else:
# 主智能体事件
print("[主智能体]")
print(chunk["data"])
命名空间
当启用subgraphs 时,每个流式事件都包含一个命名空间,用于标识产生该事件的智能体。命名空间是一个由节点名称和任务 ID 组成的路径,表示智能体的层级结构。
| 命名空间 | 来源 |
|---|---|
() (空) | 主智能体 |
("tools:abc123",) | 由主智能体的 task 工具调用 abc123 生成的子智能体 |
("tools:abc123", "model_request:def456") | 子智能体内部的模型请求节点 |
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "规划我的假期"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
# 检查此事件是否来自子智能体
is_subagent = any(
segment.startswith("tools:") for segment in chunk["ns"]
)
if is_subagent:
# 从命名空间中提取工具调用 ID
tool_call_id = next(
s.split(":")[1] for s in chunk["ns"] if s.startswith("tools:")
)
print(f"子智能体 {tool_call_id}: {chunk['data']}")
else:
print(f"主智能体: {chunk['data']}")
子智能体进度
使用stream_mode="updates" 在每个步骤完成时跟踪子智能体的进度。这对于显示哪些子智能体处于活动状态以及它们完成了哪些工作非常有用。
from deepagents import create_deep_agent
agent = create_deep_agent(
system_prompt=(
"你是一个项目协调员。始终使用 task 工具将研究任务委派给 "
"你的研究员子智能体。将你的最终回答控制在一句话以内。"
),
subagents=[
{
"name": "researcher",
"description": "彻底研究主题",
"system_prompt": (
"你是一个细致的研究者。研究给定的主题 "
"并提供 2-3 句话的简洁总结。"
),
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "写一篇关于 AI 安全的简短总结"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
# 主智能体更新(空命名空间)
if not chunk["ns"]:
for node_name, data in chunk["data"].items():
if node_name == "tools":
# 返回给主智能体的子智能体结果
for msg in data.get("messages", []):
if msg.type == "tool":
print(f"\n子智能体完成: {msg.name}")
print(f" 结果: {str(msg.content)[:200]}...")
else:
print(f"[主智能体] 步骤: {node_name}")
# 子智能体更新(非空命名空间)
else:
for node_name, data in chunk["data"].items():
print(f" [{chunk['ns'][0]}] 步骤: {node_name}")
输出
[主智能体] 步骤: model_request
[tools:call_abc123] 步骤: model_request
[tools:call_abc123] 步骤: tools
[tools:call_abc123] 步骤: model_request
子智能体完成: task
结果: ## AI 安全报告...
[主智能体] 步骤: model_request
LLM 令牌
使用stream_mode="messages" 从主智能体和子智能体流式传输单个令牌。每个消息事件都包含标识来源智能体的元数据。
current_source = ""
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "研究量子计算进展"}]},
stream_mode="messages",
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# 检查此事件是否来自子智能体(命名空间包含 "tools:")
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
if is_subagent:
# 来自子智能体的令牌
subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
if subagent_ns != current_source:
print(f"\n\n--- [子智能体: {subagent_ns}] ---")
current_source = subagent_ns
if token.content:
print(token.content, end="", flush=True)
else:
# 来自主智能体的令牌
if "main" != current_source:
print("\n\n--- [主智能体] ---")
current_source = "main"
if token.content:
print(token.content, end="", flush=True)
print()
工具调用
当子智能体使用工具时,您可以流式传输工具调用事件来显示每个子智能体正在做什么。工具调用块出现在messages 流模式中。
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "研究近期量子计算进展"}]},
stream_mode="messages",
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# 标识来源:"main" 或子智能体命名空间段
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
source = next((s for s in chunk["ns"] if s.startswith("tools:")), "main") if is_subagent else "main"
# 工具调用块(流式工具调用)
if token.tool_call_chunks:
for tc in token.tool_call_chunks:
if tc.get("name"):
print(f"\n[{source}] 工具调用: {tc['name']}")
# 参数分块流式传输——增量写入
if tc.get("args"):
print(tc["args"], end="", flush=True)
# 工具结果
if token.type == "tool":
print(f"\n[{source}] 工具结果 [{token.name}]: {str(token.content)[:150]}")
# 常规 AI 内容(跳过工具调用消息)
if token.type == "ai" and token.content and not token.tool_call_chunks:
print(token.content, end="", flush=True)
print()
自定义更新
在子智能体工具内部使用get_stream_writer 来发出自定义进度事件:
import time
from langchain.tools import tool
from langgraph.config import get_stream_writer
from deepagents import create_deep_agent
@tool
def analyze_data(topic: str) -> str:
"""对给定主题运行数据分析。
此工具执行实际分析并发出进度更新。
对于任何分析请求,你必须调用此工具。
"""
writer = get_stream_writer()
writer({"status": "starting", "topic": topic, "progress": 0})
time.sleep(0.5)
writer({"status": "analyzing", "progress": 50})
time.sleep(0.5)
writer({"status": "complete", "progress": 100})
return (
f'对 "{topic}" 的分析:客户情绪 85% 为正面,'
"主要受产品质量和支持响应时间驱动。"
)
agent = create_deep_agent(
system_prompt=(
"你是一个协调员。对于任何分析请求,你必须使用 task 工具委派给 "
"分析师子智能体。永远不要尝试直接回答。"
"收到结果后,用一句话总结。"
),
subagents=[
{
"name": "analyst",
"description": "执行带有实时进度跟踪的数据分析",
"system_prompt": (
"你是一个数据分析师。对于每个分析请求,你必须调用 analyze_data 工具。"
"不要使用任何其他工具。"
"分析完成后,报告结果。"
),
"tools": [analyze_data],
},
],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "分析客户满意度趋势"}]},
stream_mode="custom",
subgraphs=True,
version="v2",
):
if chunk["type"] == "custom":
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
if is_subagent:
subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
print(f"[{subagent_ns}]", chunk["data"])
else:
print("[main]", chunk["data"])
输出
[tools:call_abc123] {'status': 'starting', 'topic': 'customer satisfaction trends', 'progress': 0}
[tools:call_abc123] {'status': 'analyzing', 'progress': 50}
[tools:call_abc123] {'status': 'complete', 'progress': 100}
多模式流式传输
组合多种流模式以获得智能体执行的完整视图:# 跳过内部中间件步骤——仅显示有意义的节点名称
INTERESTING_NODES = {"model_request", "tools"}
last_source = ""
mid_line = False # 当写入的令牌没有尾随换行符时为 True
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "分析远程工作对团队生产力的影响"}]},
stream_mode=["updates", "messages", "custom"],
subgraphs=True,
version="v2",
):
is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
source = "子智能体" if is_subagent else "主智能体"
if chunk["type"] == "updates":
for node_name in chunk["data"]:
if node_name not in INTERESTING_NODES:
continue
if mid_line:
print()
mid_line = False
print(f"[{source}] 步骤: {node_name}")
elif chunk["type"] == "messages":
token, metadata = chunk["data"]
if token.content:
# 当来源更改时打印标题
if source != last_source:
if mid_line:
print()
mid_line = False
print(f"\n[{source}] ", end="")
last_source = source
print(token.content, end="", flush=True)
mid_line = True
elif chunk["type"] == "custom":
if mid_line:
print()
mid_line = False
print(f"[{source}] 自定义事件:", chunk["data"])
print()
常见模式
跟踪子智能体生命周期
监控子智能体的启动、运行和完成:active_subagents = {}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "研究最新的 AI 安全发展"}]},
stream_mode="updates",
subgraphs=True,
version="v2",
):
if chunk["type"] == "updates":
for node_name, data in chunk["data"].items():
# ─── 阶段 1:检测子智能体启动 ────────────────────────
# 当主智能体的 model_request 包含 task 工具调用时,
# 子智能体已被生成。
if not chunk["ns"] and node_name == "model_request":
for msg in data.get("messages", []):
for tc in getattr(msg, "tool_calls", []):
if tc["name"] == "task":
active_subagents[tc["id"]] = {
"type": tc["args"].get("subagent_type"),
"description": tc["args"].get("description", "")[:80],
"status": "pending",
}
print(
f'[生命周期] 等待中 → 子智能体 "{tc["args"].get("subagent_type")}" '
f'({tc["id"]})'
)
# ─── 阶段 2:检测子智能体运行 ─────────────────────────
# 当我们从 tools:UUID 命名空间收到事件时,该子智能体正在执行。
if chunk["ns"] and chunk["ns"][0].startswith("tools:"):
pregel_id = chunk["ns"][0].split(":")[1]
# 检查是否有任何等待中的子智能体需要标记为运行中。
# 注意:pregel 任务 ID 与 tool_call_id 不同,
# 因此我们在第一个子智能体事件上将任何等待中的子智能体标记为运行中。
for sub_id, sub in active_subagents.items():
if sub["status"] == "pending":
sub["status"] = "running"
print(
f'[生命周期] 运行中 → 子智能体 "{sub["type"]}" '
f"(pregel: {pregel_id})"
)
break
# ─── 阶段 3:检测子智能体完成 ──────────────────────
# 当主智能体的 tools 节点返回工具消息时,
# 子智能体已完成并返回其结果。
if not chunk["ns"] and node_name == "tools":
for msg in data.get("messages", []):
if msg.type == "tool":
sub = active_subagents.get(msg.tool_call_id)
if sub:
sub["status"] = "complete"
print(
f'[生命周期] 完成 → 子智能体 "{sub["type"]}" '
f"({msg.tool_call_id})"
)
print(f" 结果预览: {str(msg.content)[:120]}...")
# 打印最终状态
print("\n--- 最终子智能体状态 ---")
for sub_id, sub in active_subagents.items():
print(f" {sub['type']}: {sub['status']}")
v2 流式传输格式
需要 LangGraph >= 1.1。
version="v2"),这是推荐的方法。每个块都是一个带有 type、ns 和 data 键的 StreamPart 字典——无论流模式、模式数量或子图设置如何,其形状都相同。
v2 格式消除了嵌套的元组解包,使得在深度智能体中处理子图流式传输变得简单。比较两种格式:
# 统一格式——无需嵌套元组解包
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "研究量子计算"}]},
stream_mode=["updates", "messages", "custom"],
subgraphs=True,
version="v2",
):
print(chunk["type"]) # "updates", "messages", 或 "custom"
print(chunk["ns"]) # () 表示主智能体,("tools:<id>",) 表示子智能体
print(chunk["data"]) # 负载
相关
- 子智能体——使用深度智能体配置和使用子智能体
- 前端流式传输——使用
useStream为深度智能体构建 React UI - LangChain 流式传输概述——LangChain 智能体的一般流式传输概念
Connect these docs to Claude, VSCode, and more via MCP for real-time answers.

