Skip to main content
深度智能体构建在 LangGraph 的流式传输基础设施之上,并为一流的子智能体流提供支持。当深度智能体将工作委派给子智能体时,您可以独立地从每个子智能体流式传输更新——实时跟踪进度、LLM 令牌和工具调用。 深度智能体流式传输可以实现:

启用子图流式传输

深度智能体使用 LangGraph 的子图流式传输来呈现子智能体执行中的事件。要接收子智能体事件,请在流式传输时启用 stream_subgraphs
import { createDeepAgent } from "deepagents";

const agent = createDeepAgent({
  systemPrompt: "你是一个有用的研究助手",
  subagents: [
    {
      name: "researcher",
      description: "深度研究一个主题",
      systemPrompt: "你是一个细致的研究者。",
    },
  ],
});

for await (const [namespace, chunk] of await agent.stream(
  { messages: [{ role: "user", content: "研究量子计算进展" }] },
  {
    streamMode: "updates",
    subgraphs: true,
  }
)) {
  if (namespace.length > 0) {
    // 子智能体事件 - 命名空间标识来源
    console.log(`[子智能体: ${namespace.join("|")}]`);
  } else {
    // 主智能体事件
    console.log("[主智能体]");
  }
  console.log(chunk);
}

命名空间

当启用 subgraphs 时,每个流式事件都包含一个命名空间,用于标识产生该事件的智能体。命名空间是一个由节点名称和任务 ID 组成的路径,表示智能体的层级结构。
命名空间来源
() (空)主智能体
("tools:abc123",)由主智能体的 task 工具调用 abc123 生成的子智能体
("tools:abc123", "model_request:def456")子智能体内部的模型请求节点
使用命名空间将事件路由到正确的 UI 组件:
for await (const [namespace, chunk] of await agent.stream(
  { messages: [{ role: "user", content: "规划我的假期" }] },
  { streamMode: "updates", subgraphs: true }
)) {
  // 检查此事件是否来自子智能体
  const isSubagent = namespace.some(
    (segment: string) => segment.startsWith("tools:")
  );

  if (isSubagent) {
    // 从命名空间中提取工具调用 ID
    const toolCallId = namespace
      .find((s: string) => s.startsWith("tools:"))
      ?.split(":")[1];
    console.log(`子智能体 ${toolCallId}:`, chunk);
  } else {
    console.log("主智能体:", chunk);
  }
}

子智能体进度

使用 stream_mode="updates" 在每个步骤完成时跟踪子智能体的进度。这对于显示哪些子智能体处于活动状态以及它们完成了哪些工作非常有用。
import { createDeepAgent } from "deepagents";

const agent = createDeepAgent({
  systemPrompt:
    "你是一个项目协调员。始终使用 task 工具将研究任务委派给 " +
    "你的研究员子智能体。将你的最终回答控制在一句话以内。",
  subagents: [
    {
      name: "researcher",
      description: "彻底研究主题",
      systemPrompt:
        "你是一个细致的研究者。研究给定的主题 " +
        "并提供 2-3 句话的简洁总结。",
    },
  ],
});

for await (const [namespace, chunk] of await agent.stream(
  {
    messages: [
      { role: "user", content: "写一篇关于 AI 安全的简短总结" },
    ],
  },
  { streamMode: "updates", subgraphs: true },
)) {
  // 主智能体更新(空命名空间)
  if (namespace.length === 0) {
    for (const [nodeName, data] of Object.entries(chunk)) {
      if (nodeName === "tools") {
        // 返回给主智能体的子智能体结果
        for (const msg of (data as any).messages ?? []) {
          if (msg.type === "tool") {
            console.log(`\n子智能体完成: ${msg.name}`);
            console.log(`  结果: ${String(msg.content).slice(0, 200)}...`);
          }
        }
      } else {
        console.log(`[主智能体] 步骤: ${nodeName}`);
      }
    }
  }
  // 子智能体更新(非空命名空间)
  else {
    for (const [nodeName] of Object.entries(chunk)) {
      console.log(`  [${namespace[0]}] 步骤: ${nodeName}`);
    }
  }
}
输出
主智能体步骤: model_request
  [tools:call_abc123] 步骤: model_request
  [tools:call_abc123] 步骤: tools
  [tools:call_abc123] 步骤: model_request
子智能体完成: task
结果: ## AI 安全报告...
主智能体步骤: model_request
  [tools:call_def456] 步骤: model_request
  [tools:call_def456] 步骤: model_request
子智能体完成: task
结果: # 关于 AI 安全的综合报告...
主智能体步骤: model_request

LLM 令牌

使用 stream_mode="messages" 从主智能体和子智能体流式传输单个令牌。每个消息事件都包含标识来源智能体的元数据。
let currentSource = "";

for await (const [namespace, chunk] of await agent.stream(
  {
    messages: [
      {
        role: "user",
        content: "研究量子计算进展",
      },
    ],
  },
  { streamMode: "messages", subgraphs: true },
)) {
  const [message] = chunk;

  // 检查此事件是否来自子智能体(命名空间包含 "tools:")
  const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));

  if (isSubagent) {
    // 来自子智能体的令牌
    const subagentNs = namespace.find((s: string) => s.startsWith("tools:"))!;
    if (subagentNs !== currentSource) {
      process.stdout.write(`\n\n--- [子智能体: ${subagentNs}] ---\n`);
      currentSource = subagentNs;
    }
    if (message.text) {
      process.stdout.write(message.text);
    }
  } else {
    // 来自主智能体的令牌
    if ("main" !== currentSource) {
      process.stdout.write(`\n\n--- [主智能体] ---\n`);
      currentSource = "main";
    }
    if (message.text) {
      process.stdout.write(message.text);
    }
  }
}

process.stdout.write("\n");

工具调用

当子智能体使用工具时,您可以流式传输工具调用事件来显示每个子智能体正在做什么。工具调用块出现在 messages 流模式中。
import { AIMessageChunk, ToolMessage } from "langchain";

for await (const [namespace, chunk] of await agent.stream(
  {
    messages: [
      {
        role: "user",
        content: "研究近期量子计算进展",
      },
    ],
  },
  { streamMode: "messages", subgraphs: true },
)) {
  const [message] = chunk;

  // 标识来源:"main" 或子智能体命名空间段
  const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
  const source = isSubagent
    ? namespace.find((s: string) => s.startsWith("tools:"))!
    : "main";

  // 工具调用块(流式工具调用)
  if (AIMessageChunk.isInstance(message) && message.tool_call_chunks?.length) {
    for (const tc of message.tool_call_chunks) {
      if (tc.name) {
        console.log(`\n[${source}] 工具调用: ${tc.name}`);
      }
      // 参数分块流式传输——增量写入
      if (tc.args) {
        process.stdout.write(tc.args);
      }
    }
  }

  // 工具结果
  if (ToolMessage.isInstance(message)) {
    console.log(
      `\n[${source}] 工具结果 [${message.name}]: ${message.text?.slice(0, 150)}`,
    );
  }

  // 常规 AI 内容(跳过工具调用消息)
  if (
    AIMessageChunk.isInstance(message) &&
    message.text &&
    !message.tool_call_chunks?.length
  ) {
    process.stdout.write(message.text);
  }
}

process.stdout.write("\n");

自定义更新

在子智能体工具内部使用 config.writer 来发出自定义进度事件:
import { createDeepAgent } from "deepagents";
import { tool, type ToolRuntime } from "langchain";
import { z } from "zod";

/**
 * 一个通过 config.writer 发出自定义进度事件的工具。
 * writer 将数据发送到 "custom" 流模式。
 */
const analyzeData = tool(
  async ({ topic }: { topic: string }, config: ToolRuntime) => {
    const writer = config.writer;

    writer?.({ status: "starting", topic, progress: 0 });
    await new Promise((r) => setTimeout(r, 500));

    writer?.({ status: "analyzing", progress: 50 });
    await new Promise((r) => setTimeout(r, 500));

    writer?.({ status: "complete", progress: 100 });
    return `对 "${topic}" 的分析:客户情绪 85% 为正面,主要受产品质量和支持响应时间驱动。`;
  },
  {
    name: "analyze_data",
    description:
      "对给定主题运行数据分析。" +
      "此工具执行实际分析并发出进度更新。" +
      "对于任何分析请求,你必须调用此工具。",
    schema: z.object({
      topic: z.string().describe("要分析的主题或对象"),
    }),
  },
);

const agent = createDeepAgent({
  systemPrompt:
    "你是一个协调员。对于任何分析请求,你必须使用 task 工具委派给 " +
    "分析师子智能体。永远不要尝试直接回答。" +
    "收到结果后,用一句话总结。",
  subagents: [
    {
      name: "analyst",
      description: "执行带有实时进度跟踪的数据分析",
      systemPrompt:
        "你是一个数据分析师。对于每个分析请求,你必须调用 analyze_data 工具。" +
        "不要使用任何其他工具。" +
        "分析完成后,报告结果。",
      tools: [analyzeData],
    },
  ],
});

for await (const [namespace, chunk] of await agent.stream(
  {
    messages: [
      {
        role: "user",
        content: "分析客户满意度趋势",
      },
    ],
  },
  { streamMode: "custom", subgraphs: true },
)) {
  const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
  if (isSubagent) {
    const subagentNs = namespace.find((s: string) => s.startsWith("tools:"))!;
    console.log(`[${subagentNs}]`, chunk);
  } else {
    console.log("[main]", chunk);
  }
}
输出
[tools:call_abc123] { status: 'fetching', progress: 0 }
[tools:call_abc123] { status: 'analyzing', progress: 50 }
[tools:call_abc123] { status: 'complete', progress: 100 }

多模式流式传输

组合多种流模式以获得智能体执行的完整视图:
// 跳过内部中间件步骤——仅显示有意义的节点名称
const INTERESTING_NODES = new Set(["model_request", "tools"]);

let lastSource = "";
let midLine = false; // 当写入的令牌没有尾随换行符时为 true

for await (const [namespace, mode, data] of await agent.stream(
  {
    messages: [
      {
        role: "user",
        content: "分析远程工作对团队生产力的影响",
      },
    ],
  },
  { streamMode: ["updates", "messages", "custom"], subgraphs: true },
)) {
  const isSubagent = namespace.some((s: string) => s.startsWith("tools:"));
  const source = isSubagent ? "子智能体" : "主智能体";

  if (mode === "updates") {
    for (const nodeName of Object.keys(data)) {
      if (!INTERESTING_NODES.has(nodeName)) continue;
      if (midLine) {
        process.stdout.write("\n");
        midLine = false;
      }
      console.log(`[${source}] 步骤: ${nodeName}`);
    }
  } else if (mode === "messages") {
    const [message] = data;
    if (message.text) {
      // 当来源更改时打印标题
      if (source !== lastSource) {
        if (midLine) {
          process.stdout.write("\n");
          midLine = false;
        }
        process.stdout.write(`\n[${source}] `);
        lastSource = source;
      }
      process.stdout.write(message.text);
      midLine = true;
    }
  } else if (mode === "custom") {
    if (midLine) {
      process.stdout.write("\n");
      midLine = false;
    }
    console.log(`[${source}] 自定义事件:`, data);
  }
}

process.stdout.write("\n");

常见模式

跟踪子智能体生命周期

监控子智能体的启动、运行和完成:
for await (const [namespace, chunk] of await agent.stream(
  {
    messages: [
      { role: "user", content: "研究最新的 AI 安全发展" },
    ],
  },
  { streamMode: "updates", subgraphs: true },
)) {
  for (const [nodeName, data] of Object.entries(chunk)) {
    // ─── 阶段 1:检测子智能体启动 ────────────────────────
    // 当主智能体的 model_request 包含 task 工具调用时,
    // 子智能体已被生成。
    if (namespace.length === 0 && nodeName === "model_request") {
      for (const msg of (data as any).messages ?? []) {
        for (const tc of msg.tool_calls ?? []) {
          if (tc.name === "task") {
            activeSubagents.set(tc.id, {
              type: tc.args?.subagent_type,
              description: tc.args?.description?.slice(0, 80),
              status: "pending",
            });
            console.log(
              `[生命周期] 等待中  → 子智能体 "${tc.args?.subagent_type}" (${tc.id})`,
            );
          }
        }
      }
    }

    // ─── 阶段 2:检测子智能体运行 ─────────────────────────
    // 当我们从 tools:UUID 命名空间收到事件时,该子智能体正在执行。
    if (namespace.length > 0 && namespace[0].startsWith("tools:")) {
      const pregelId = namespace[0].split(":")[1];
      // 检查是否有任何等待中的子智能体需要标记为运行中。
      // 注意:pregel 任务 ID 与 tool_call_id 不同,
      // 因此我们在第一个子智能体事件上将任何等待中的子智能体标记为运行中。
      for (const [id, sub] of activeSubagents) {
        if (sub.status === "pending") {
          sub.status = "running";
          console.log(
            `[生命周期] 运行中  → 子智能体 "${sub.type}" (pregel: ${pregelId})`,
          );
          break;
        }
      }
    }

    // ─── 阶段 3:检测子智能体完成 ──────────────────────
    // 当主智能体的 tools 节点返回工具消息时,
    // 子智能体已完成并返回其结果。
    if (namespace.length === 0 && nodeName === "tools") {
      for (const msg of (data as any).messages ?? []) {
        if (msg.type === "tool") {
          const subagent = activeSubagents.get(msg.tool_call_id);
          if (subagent) {
            subagent.status = "complete";
            console.log(
              `[生命周期] 完成 → 子智能体 "${subagent.type}" (${msg.tool_call_id})`,
            );
            console.log(
              `  结果预览: ${String(msg.content).slice(0, 120)}...`,
            );
          }
        }
      }
    }
  }
}

// 打印最终状态
console.log("\n--- 最终子智能体状态 ---");
for (const [id, sub] of activeSubagents) {
  console.log(`  ${sub.type}: ${sub.status}`);
}

相关