本指南解释使用子图的机制。子图是用作另一个图中 节点 的 图 。
子图适用于:
构建 多智能体系统
在多个图中重用一组节点
分配开发工作:当你希望不同团队独立处理图的不同部分时,你可以将每个部分定义为子图,只要遵守子图接口(输入和输出模式),父图就可以在不了解子图任何细节的情况下构建
npm install @langchain/langgraph
为 LangGraph 开发设置 LangSmith
注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 让你能够使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用——阅读更多关于 如何开始使用 LangSmith 的信息。
定义子图通信
添加子图时,你需要定义父图和子图之间如何通信:
模式 何时使用 状态模式 在节点内调用子图 父图和子图具有不同的状态模式 (没有共享键),或者你需要在它们之间转换状态 你编写一个包装函数,将父图状态映射到子图输入,并将子图输出映射回父图状态 将子图添加为节点 父图和子图共享状态键 ——子图从与父图相同的通道读取和写入 你将编译后的子图直接传递给 add_node——不需要包装函数
在节点内调用子图
当父图和子图具有不同的状态模式 (没有共享键)时,在节点函数中调用子图。这在你想为 多智能体 系统中的每个代理保留私有消息历史时很常见。
节点函数在调用子图之前将父图状态转换为子图状态,并在返回之前将结果转换回父图状态。
import { StateGraph , StateSchema , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const SubgraphState = new StateSchema ( {
bar : z . string () ,
} ) ;
// 子图
const subgraphBuilder = new StateGraph (SubgraphState)
. addNode ( "subgraphNode1" , ( state ) => {
return { bar : "hi! " + state . bar };
} )
. addEdge (START , "subgraphNode1" ) ;
const subgraph = subgraphBuilder . compile () ;
// 父图
const State = new StateSchema ( {
foo : z . string () ,
} ) ;
// 将状态转换为子图状态并转换回来
const builder = new StateGraph (State)
. addNode ( "node1" , async ( state ) => {
const subgraphOutput = await subgraph . invoke ( { bar : state . foo } ) ;
return { foo : subgraphOutput . bar };
} )
. addEdge (START , "node1" ) ;
const graph = builder . compile () ;
import { StateGraph , StateSchema , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 定义子图
const SubgraphState = new StateSchema ( {
// 注意这些键都没有与父图状态共享
bar : z . string () ,
baz : z . string () ,
} ) ;
const subgraphBuilder = new StateGraph (SubgraphState)
. addNode ( "subgraphNode1" , ( state ) => {
return { baz : "baz" };
} )
. addNode ( "subgraphNode2" , ( state ) => {
return { bar : state . bar + state . baz };
} )
. addEdge (START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" ) ;
const subgraph = subgraphBuilder . compile () ;
// 定义父图
const ParentState = new StateSchema ( {
foo : z . string () ,
} ) ;
const builder = new StateGraph (ParentState)
. addNode ( "node1" , ( state ) => {
return { foo : "hi! " + state . foo };
} )
. addNode ( "node2" , async ( state ) => {
const response = await subgraph . invoke ( { bar : state . foo } ) ;
return { foo : response . bar };
} )
. addEdge (START , "node1" )
. addEdge ( "node1" , "node2" ) ;
const graph = builder . compile () ;
for await ( const chunk of await graph . stream (
{ foo : "foo" },
{ subgraphs : true }
)) {
console . log (chunk) ;
}
将状态转换为子图状态
将响应转换回父图状态
[[], { node1: { foo: 'hi! foo' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode1: { baz: 'baz' } }]
[['node2:9c36dd0f-151a-cb42-cbad-fa2f851f9ab7'], { subgraphNode2: { bar: 'hi! foobaz' } }]
[[], { node2: { foo: 'hi! foobaz' } }]
将子图添加为节点
当父图和子图共享状态键 时,你可以直接将编译后的子图传递给 add_node。不需要包装函数——子图会自动从父图的状态通道读取和写入。例如,在 多智能体 系统中,代理通常通过共享的 messages 键进行通信。
如果你的子图与父图共享状态键,可以按照以下步骤将其添加到你的图中:
定义子图工作流(下面的示例中的 subgraphBuilder)并编译它
在定义父图工作流时,将编译后的子图传递给 .addNode 方法
import { StateGraph , StateSchema , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
foo : z . string () ,
} ) ;
// 子图
const subgraphBuilder = new StateGraph (State)
. addNode ( "subgraphNode1" , ( state ) => {
return { foo : "hi! " + state . foo };
} )
. addEdge (START , "subgraphNode1" ) ;
const subgraph = subgraphBuilder . compile () ;
// 父图
const builder = new StateGraph (State)
. addNode ( "node1" , subgraph)
. addEdge (START , "node1" ) ;
const graph = builder . compile () ;
import { StateGraph , StateSchema , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 定义子图
const SubgraphState = new StateSchema ( {
foo : z . string () ,
bar : z . string () ,
} ) ;
const subgraphBuilder = new StateGraph (SubgraphState)
. addNode ( "subgraphNode1" , ( state ) => {
return { bar : "bar" };
} )
. addNode ( "subgraphNode2" , ( state ) => {
// 注意此节点正在使用仅在子图中可用的状态键 ('bar')
// 并且正在更新共享状态键 ('foo')
return { foo : state . foo + state . bar };
} )
. addEdge (START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" ) ;
const subgraph = subgraphBuilder . compile () ;
// 定义父图
const ParentState = new StateSchema ( {
foo : z . string () ,
} ) ;
const builder = new StateGraph (ParentState)
. addNode ( "node1" , ( state ) => {
return { foo : "hi! " + state . foo };
} )
. addNode ( "node2" , subgraph)
. addEdge (START , "node1" )
. addEdge ( "node1" , "node2" ) ;
const graph = builder . compile () ;
for await ( const chunk of await graph . stream ( { foo : "foo" } )) {
console . log (chunk) ;
}
此键与父图状态共享
此键仅对 SubgraphState 私有,父图不可见
{ node1: { foo: 'hi! foo' } }
{ node2: { foo: 'hi! foobar' } }
子图持久性
使用子图时,你需要决定其内部数据在调用之间如何处理。考虑一个委托给专家子代理的客户支持机器人:“billing expert” 子代理应该记住客户之前的问题,还是每次被调用时重新开始?
.compile() 上的 checkpointer 参数控制子图持久性:
模式 checkpointer=行为 每次调用 None(默认)每次调用都重新开始,并继承父图的检查点器以支持单次调用内的 中断 和 持久执行 。 每个线程 True状态在同一线程的调用之间累积。每次调用从上一次结束的地方继续。 无状态 False完全不进行检查点——像普通函数调用一样运行。不支持中断或持久执行。
每次调用是最适合大多数应用程序的选择,包括 多智能体 系统,其中子代理处理独立的请求。当子代理需要多轮对话记忆时(例如,在几次交互中建立上下文的助理),请使用每个线程。
父图必须使用检查点器编译,以便子图持久性功能(中断、状态检查、每个线程的记忆)正常工作。请参阅 持久性 。
下面的示例使用 LangChain 的 create_agent ,这是构建代理的常用方式。create_agent 在底层生成一个 LangGraph 图 ,因此所有子图持久性概念都直接适用。如果你使用原始 LangGraph StateGraph 构建,则应用相同的模式和配置选项——有关详细信息,请参阅 图 API 。
有状态
有状态子图继承父图的检查点器,这启用了 中断 、持久执行 和状态检查。两种有状态模式的区别在于状态保留的时间长度。
每次调用(默认)
这是推荐的大多数应用程序的模式,包括 多智能体 系统,其中子代理作为工具被调用。它支持中断、持久执行 和并行调用,同时保持每次调用的隔离。
当每次对子图的调用都是独立的,且子代理不需要记住之前调用的任何内容时,使用每次调用持久性。这是最常见的模式,特别是对于 多智能体 系统,其中子代理处理一次性请求,如“查找此客户的订单”或“总结此文档”。
省略 checkpointer 或将其设置为 None。每次调用都重新开始,但在单次调用内,子图继承父图的检查点器,并可以使用 interrupt() 暂停和恢复。
以下示例使用两个子代理(水果专家、蔬菜专家)作为外部代理的工具进行包装:
import { createAgent , tool } from "langchain" ;
import { MemorySaver , Command , interrupt } from "@langchain/langgraph" ;
import * as z from "zod" ;
const fruitInfo = tool (
( input ) => `Info about ${ input . fruitName } ` ,
{
name : "fruit_info" ,
description : "Look up fruit info." ,
schema : z . object ( { fruitName : z . string () } ) ,
}
) ;
const veggieInfo = tool (
( input ) => `Info about ${ input . veggieName } ` ,
{
name : "veggie_info" ,
description : "Look up veggie info." ,
schema : z . object ( { veggieName : z . string () } ) ,
}
) ;
// 子代理 - 没有设置 checkpointer(继承父级)
const fruitAgent = createAgent ( {
model : "gpt-4.1-mini" ,
tools : [fruitInfo] ,
prompt : "You are a fruit expert. Use the fruit_info tool. Respond in one sentence." ,
} ) ;
const veggieAgent = createAgent ( {
model : "gpt-4.1-mini" ,
tools : [veggieInfo] ,
prompt : "You are a veggie expert. Use the veggie_info tool. Respond in one sentence." ,
} ) ;
// 将子代理包装为外部代理的工具
const askFruitExpert = tool (
async ( input ) => {
const response = await fruitAgent . invoke ( {
messages : [ { role : "user" , content : input . question } ] ,
} ) ;
return response . messages[response . messages . length - 1 ] . content ;
},
{
name : "ask_fruit_expert" ,
description : "Ask the fruit expert. Use for ALL fruit questions." ,
schema : z . object ( { question : z . string () } ) ,
}
) ;
const askVeggieExpert = tool (
async ( input ) => {
const response = await veggieAgent . invoke ( {
messages : [ { role : "user" , content : input . question } ] ,
} ) ;
return response . messages[response . messages . length - 1 ] . content ;
},
{
name : "ask_veggie_expert" ,
description : "Ask the veggie expert. Use for ALL veggie questions." ,
schema : z . object ( { question : z . string () } ) ,
}
) ;
// 带有检查点器的外部代理
const agent = createAgent ( {
model : "gpt-4.1-mini" ,
tools : [askFruitExpert , askVeggieExpert] ,
prompt :
"You have two experts: ask_fruit_expert and ask_veggie_expert. " +
"ALWAYS delegate questions to the appropriate expert." ,
checkpointer : new MemorySaver () ,
} ) ;
每次调用都可以使用 interrupt() 暂停和恢复。向工具函数添加 interrupt() 以在执行前要求用户批准: const fruitInfo = tool (
( input ) => {
interrupt ( "continue?" ) ;
return `Info about ${ input . fruitName } ` ;
},
{
name : "fruit_info" ,
description : "Look up fruit info." ,
schema : z . object ( { fruitName : z . string () } ) ,
}
) ;
const config = { configurable : { thread_id : "1" } };
// Invoke - the subagent's tool calls interrupt()
let response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about apples" } ] },
config ,
) ;
// response contains __interrupt__
// Resume - approve the interrupt
response = await agent . invoke ( new Command ( { resume : true } ) , config) ;
// Subagent message count: 4
每次调用都以新的子代理状态开始。子代理不记得之前的调用: const config = { configurable : { thread_id : "1" } };
// First call
let response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about apples" } ] },
config ,
) ;
// Subagent message count: 4
// Second call - subagent starts fresh, no memory of apples
response = await agent . invoke (
{ messages : [ { role : "user" , content : "Now tell me about bananas" } ] },
config ,
) ;
// Subagent message count: 4 (still fresh!)
对同一子图的多次调用可以无冲突地工作,因为每次调用都有自己独立的检查点命名空间: const config = { configurable : { thread_id : "1" } };
// LLM calls ask_fruit_expert for both apples and bananas
const response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about apples and bananas" } ] },
config ,
) ;
// Subagent message count: 4 (apples - fresh)
// Subagent message count: 4 (bananas - fresh)
每个线程
当子代理需要记住之前的交互时,使用每个线程持久性。例如,在几次交互中建立上下文的研究助理,或跟踪已编辑文件的编码助理。子代理的对话历史和数据在同一线程的调用之间累积。每次调用从上一次结束的地方继续。
使用 checkpointer=True 编译以启用此行为。
每个线程的子图不支持并行工具调用。当 LLM 可以将每个线程的子代理作为工具访问时,它可能会尝试并行多次调用该工具(例如,同时询问水果专家关于苹果和香蕉的事)。这会导致检查点冲突,因为两个调用都写入同一个命名空间。 下面的示例使用 LangChain 的 ToolCallLimitMiddleware 来防止这种情况。如果你使用纯 LangGraph StateGraph 构建,则需要自己防止并行工具调用——例如,通过配置模型禁用并行工具调用,或通过添加逻辑确保不会在并行中多次调用同一子图。
以下示例使用使用 checkpointer=True 编译的水果专家子代理:
import { createAgent , tool , toolCallLimitMiddleware } from "langchain" ;
import { MemorySaver , Command , interrupt } from "@langchain/langgraph" ;
import * as z from "zod" ;
const fruitInfo = tool (
( input ) => `Info about ${ input . fruitName } ` ,
{
name : "fruit_info" ,
description : "Look up fruit info." ,
schema : z . object ( { fruitName : z . string () } ) ,
}
) ;
// 带有 checkpointer=true 的子代理,用于持久状态
const fruitAgent = createAgent ( {
model : "gpt-4.1-mini" ,
tools : [fruitInfo] ,
prompt : "You are a fruit expert. Use the fruit_info tool. Respond in one sentence." ,
checkpointer : true ,
} ) ;
// 将子代理包装为外部代理的工具
const askFruitExpert = tool (
async ( input ) => {
const response = await fruitAgent . invoke ( {
messages : [ { role : "user" , content : input . question } ] ,
} ) ;
return response . messages[response . messages . length - 1 ] . content ;
},
{
name : "ask_fruit_expert" ,
description : "Ask the fruit expert. Use for ALL fruit questions." ,
schema : z . object ( { question : z . string () } ) ,
}
) ;
// 带有检查点器的外部代理
// 使用 toolCallLimitMiddleware 防止对每个线程的子代理进行并行调用,
// 否则会导致检查点冲突。
const agent = createAgent ( {
model : "gpt-4.1-mini" ,
tools : [askFruitExpert] ,
prompt : "You have a fruit expert. ALWAYS delegate fruit questions to ask_fruit_expert." ,
middleware : [
toolCallLimitMiddleware ( { toolName : "ask_fruit_expert" , runLimit : 1 } ) ,
] ,
checkpointer : new MemorySaver () ,
} ) ;
每个线程的子代理支持 interrupt(),就像每次调用一样。向工具函数添加 interrupt() 以要求用户批准: const fruitInfo = tool (
( input ) => {
interrupt ( "continue?" ) ;
return `Info about ${ input . fruitName } ` ;
},
{
name : "fruit_info" ,
description : "Look up fruit info." ,
schema : z . object ( { fruitName : z . string () } ) ,
}
) ;
const config = { configurable : { thread_id : "1" } };
// Invoke - the subagent's tool calls interrupt()
let response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about apples" } ] },
config ,
) ;
// response contains __interrupt__
// Resume - approve the interrupt
response = await agent . invoke ( new Command ( { resume : true } ) , config) ;
// Subagent message count: 4
状态在调用之间累积——子代理记得过去的对话: const config = { configurable : { thread_id : "1" } };
// First call
let response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about apples" } ] },
config ,
) ;
// Subagent message count: 4
// Second call - subagent REMEMBERS apples conversation
response = await agent . invoke (
{ messages : [ { role : "user" , content : "Now tell me about bananas" } ] },
config ,
) ;
// Subagent message count: 8 (accumulated!)
当你有多个不同 的每个线程子图(例如,水果专家和蔬菜专家)时,每个都需要自己的存储空间,这样它们的检查点就不会相互覆盖。这称为命名空间隔离 。 如果你在 节点内调用子图 ,LangGraph 根据调用顺序分配命名空间(第一次调用、第二次调用等)。这意味着重新排序调用会混淆哪个子图加载哪个状态。为了避免这种情况,将每个子代理包装在自己的 StateGraph 中,并使用唯一的节点名称——这为每个子图提供稳定、唯一的命名空间: import { StateGraph , StateSchema , MessagesValue , START } from "@langchain/langgraph" ;
function createSubAgent ( model : string , { name , ... kwargs } : { name : string ; [ key : string ] : any }) {
const agent = createAgent ( { model , name , ... kwargs } ) ;
return new StateGraph ( new StateSchema ( { messages : MessagesValue } ))
. addNode (name , agent) // unique name → stable namespace
. addEdge (START , name)
. compile () ;
}
const fruitAgent = createSubAgent ( "gpt-4.1-mini" , {
name : "fruit_agent" , tools : [fruitInfo] , prompt : "..." , checkpointer : true ,
} ) ;
const veggieAgent = createSubAgent ( "gpt-4.1-mini" , {
name : "veggie_agent" , tools : [veggieInfo] , prompt : "..." , checkpointer : true ,
} ) ;
const config = { configurable : { thread_id : "1" } };
// First call - LLM calls both fruit and veggie experts
let response = await agent . invoke (
{ messages : [ { role : "user" , content : "Tell me about cherries and broccoli" } ] },
config ,
) ;
// Fruit subagent message count: 4
// Veggie subagent message count: 4
// Second call - both agents accumulate independently
response = await agent . invoke (
{ messages : [ { role : "user" , content : "Now tell me about oranges and carrots" } ] },
config ,
) ;
// Fruit subagent message count: 8 (remembers cherries!)
// Veggie subagent message count: 8 (remembers broccoli!)
作为 节点添加 的子图已经自动获得基于名称的命名空间,所以它们不需要这个包装。
无状态
当你想要像普通函数调用一样运行子代理,没有任何检查点开销时使用此选项。子图无法暂停/恢复,也不受益于 持久执行 。使用 checkpointer=False 编译。
没有检查点,子图就没有持久执行。如果进程在运行中途崩溃,子图无法恢复,必须从头重新运行。
const subgraphBuilder = new StateGraph ( ... ) ;
const subgraph = subgraphBuilder . compile ( { checkpointer : false } ) ;
检查点器参考
使用 .compile() 上的 checkpointer 参数控制子图持久性:
const subgraph = builder . compile ( { checkpointer : false } ) ; # or true , or null
功能 每次调用(默认) 每个线程 无状态 checkpointer=NoneTrueFalse中断 (HITL) ✅ ✅ ❌ 多轮记忆 ❌ ✅ ❌ 多次调用(不同子图) ✅ ⚠️ ✅ 多次调用(相同子图) ✅ ❌ ✅ 状态检查 ⚠️ ✅ ❌
中断 (HITL) :子图可以使用 interrupt() 暂停执行并等待用户输入,然后从中断处恢复。
多轮记忆 :子图在同一个 线程 内的多次调用之间保留其状态。每次调用从上一次结束的地方继续,而不是重新开始。
多次调用(不同子图) :可以在单个节点内调用多个不同的子图实例,而不会发生检查点命名空间冲突。
多次调用(相同子图) :可以在单个节点内多次调用同一子图实例。使用有状态持久性时,这些调用写入相同的检查点命名空间并发生冲突——改用每次调用持久性。
状态检查 :子图的状态可通过 get_state(config, subgraphs=True) 获取,用于调试和监控。
查看子图状态
当你启用 持久性 时,你可以使用 subgraphs 选项检查子图状态。使用 无状态 检查点 (checkpointer=False),不会保存子图检查点,因此子图状态不可用。
查看子图状态需要 LangGraph 能够静态发现 子图——即,它是 作为节点添加 或 在节点内调用 。当子图在 工具 函数或其他间接调用(例如 子代理 模式)内调用时不起作用。无论嵌套如何,中断仍传播到顶级图。
返回当前调用 的子图状态。每次调用都重新开始。 import { StateGraph , StateSchema , START , MemorySaver , interrupt , Command } from "@langchain/langgraph" ;
import * as z from "zod" ;
const State = new StateSchema ( {
foo : z . string () ,
} ) ;
// 子图
const subgraphBuilder = new StateGraph (State)
. addNode ( "subgraphNode1" , ( state ) => {
const value = interrupt ( "Provide value:" ) ;
return { foo : state . foo + value };
} )
. addEdge (START , "subgraphNode1" ) ;
const subgraph = subgraphBuilder . compile () ; // inherits parent checkpointer
// 父图
const builder = new StateGraph (State)
. addNode ( "node1" , subgraph)
. addEdge (START , "node1" ) ;
const checkpointer = new MemorySaver () ;
const graph = builder . compile ( { checkpointer } ) ;
const config = { configurable : { thread_id : "1" } };
await graph . invoke ( { foo : "" }, config) ;
// View subgraph state for the current invocation
const subgraphState = ( await graph . getState (config , { subgraphs : true } )) . tasks[ 0 ] . state ;
// Resume the subgraph
await graph . invoke ( new Command ( { resume : "bar" } ) , config) ;
返回此线程上所有调用之间的累积 子图状态。 import { StateGraph , StateSchema , MessagesValue , START , MemorySaver } from "@langchain/langgraph" ;
// 带有自己持久状态的子图
const SubgraphState = new StateSchema ( {
messages : MessagesValue ,
} ) ;
const subgraphBuilder = new StateGraph (SubgraphState) ;
// ... add nodes and edges
const subgraph = subgraphBuilder . compile ( { checkpointer : true } ) ;
// 父图
const builder = new StateGraph (SubgraphState)
. addNode ( "agent" , subgraph)
. addEdge (START , "agent" ) ;
const checkpointer = new MemorySaver () ;
const graph = builder . compile ( { checkpointer } ) ;
const config = { configurable : { thread_id : "1" } };
await graph . invoke ( { messages : [ { role : "user" , content : "hi" } ] }, config) ;
await graph . invoke ( { messages : [ { role : "user" , content : "what did I say?" } ] }, config) ;
// View accumulated subgraph state (includes messages from both invocations)
const subgraphState = ( await graph . getState (config , { subgraphs : true } )) . tasks[ 0 ] . state ;
流式传输子图输出
要将子图的输出包含在流式输出中,你可以在父图的 stream 方法中设置 subgraphs 选项。这将流式传输来自父图和任何子图的输出。
for await ( const chunk of await graph . stream (
{ foo : "foo" },
{
subgraphs : true ,
streamMode : "updates" ,
}
)) {
console . log (chunk) ;
}
设置 subgraphs: true 以流式传输来自子图的输出。
import { StateGraph , StateSchema , START } from "@langchain/langgraph" ;
import * as z from "zod" ;
// 定义子图
const SubgraphState = new StateSchema ( {
foo : z . string () ,
bar : z . string () ,
} ) ;
const subgraphBuilder = new StateGraph (SubgraphState)
. addNode ( "subgraphNode1" , ( state ) => {
return { bar : "bar" };
} )
. addNode ( "subgraphNode2" , ( state ) => {
// note that this node is using a state key ('bar') that is only available in the subgraph
// and is sending update on the shared state key ('foo')
return { foo : state . foo + state . bar };
} )
. addEdge (START , "subgraphNode1" )
. addEdge ( "subgraphNode1" , "subgraphNode2" ) ;
const subgraph = subgraphBuilder . compile () ;
// 定义父图
const ParentState = new StateSchema ( {
foo : z . string () ,
} ) ;
const builder = new StateGraph (ParentState)
. addNode ( "node1" , ( state ) => {
return { foo : "hi! " + state . foo };
} )
. addNode ( "node2" , subgraph)
. addEdge (START , "node1" )
. addEdge ( "node1" , "node2" ) ;
const graph = builder . compile () ;
for await ( const chunk of await graph . stream (
{ foo : "foo" },
{
streamMode : "updates" ,
subgraphs : true ,
}
)) {
console . log (chunk) ;
}
设置 subgraphs: true 以流式传输来自子图的输出。
[[], { node1: { foo: 'hi! foo' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode1: { bar: 'bar' } }]
[['node2:e58e5673-a661-ebb0-70d4-e298a7fc28b7'], { subgraphNode2: { foo: 'hi! foobar' } }]
[[], { node2: { foo: 'hi! foobar' } }]