其核心在于,LangGraph 将智能体工作流建模为图。您使用三个关键组件来定义智能体的行为:
-
State:表示应用程序当前快照的共享数据结构。它可以是任何数据类型,但通常使用共享状态模式定义。
-
Nodes:编码智能体逻辑的函数。它们接收当前状态作为输入,执行某些计算或副作用,并返回更新后的状态。
-
Edges:根据当前状态确定下一个要执行的 Node 的函数。它们可以是条件分支或固定转换。
通过组合 Nodes 和 Edges,您可以创建复杂的、循环的工作流,使状态随时间演变。然而,真正的力量来自于 LangGraph 如何管理该状态。
强调一下:Nodes 和 Edges 仅仅是函数——它们可以包含 LLM 或只是普通的代码。
简而言之:节点做工作,边告诉下一步做什么。
LangGraph 的底层图算法使用 消息传递 来定义通用程序。当一个 Node 完成其操作时,它沿着一条或多条边向其他节点发送消息。这些接收节点随后执行它们的函数,将结果消息传递给下一组节点,过程继续。受 Google 的 Pregel 系统启发,程序以离散的“超级步骤”进行。
一个超级步骤可以被视为对图节点的一次单轮迭代。并行运行的节点属于同一个超级步骤,而顺序运行的节点属于不同的超级步骤。在图执行开始时,所有节点都处于 inactive 状态。当节点在其任何传入边(或“通道”)上收到新消息(状态)时,该节点变为 active。活动节点随后运行其函数并响应更新。在每个超级步骤结束时,没有传入消息的节点通过将自己标记为 inactive 来投票 halt。当所有节点都 inactive 且没有消息在传输中时,图执行终止。
StateGraph
StateGraph 类是要使用的主要图类。这是由用户定义的 State 对象参数化的。
编译您的图
要构建您的图,首先定义 state,然后添加 nodes 和 edges,最后编译它。究竟什么是编译您的图以及为什么需要它?
编译是一个相当简单的步骤。它对图的结构提供一些基本检查(例如没有孤立的节点等)。这也是您可以指定运行时参数的地方,如 checkpointers 和 breakpoints。您只需调用 .compile 方法即可编译您的图:
graph = graph_builder.compile(...)
定义图时做的第一件事就是定义图的 State。State 包括 图的模式 以及 reducer 函数,后者指定如何将更新应用到状态。State 的模式将是图中所有 Nodes 和 Edges 的输入模式,可以是 TypedDict 或 Pydantic 模型。所有 Nodes 都会向 State 发出更新,然后使用指定的 reducer 函数应用这些更新。
指定图模式的主要文档方式是使用 TypedDict。如果您想在状态中提供默认值,请使用 dataclass。我们还支持使用 Pydantic BaseModel 作为图状态,如果您想要递归数据验证(尽管请注意 Pydantic 的性能不如 TypedDict 或 dataclass)。
默认情况下,图将具有相同的输入和输出模式。如果您想更改此内容,也可以直接指定显式的输入和输出模式。当您有很多键,其中一些明确用于输入,另一些用于输出时,这很有用。有关更多信息,请参阅 指南。
多个模式
通常,所有图节点都与单个模式通信。这意味着它们将读取和写入相同的状态通道。但是,有些情况下我们需要对此有更多的控制:
- 内部节点可以传递不需要在图的输入/输出中的信息。
- 我们也可能希望为图使用不同的输入/输出模式。输出可能仅包含单个相关输出键。
可以在图内部定义私有状态通道供内部节点通信。我们可以简单地定义一个私有模式 PrivateState。
还可以为图定义显式的输入和输出模式。在这些情况下,我们定义一个包含与图操作相关的 所有 键的“内部”模式。但是,我们也定义 input 和 output 模式,它们是“内部”模式的子集,以约束图的输入和输出。有关更多详细信息,请参阅 定义输入和输出模式。
让我们看一个例子:
class InputState(TypedDict):
user_input: str
class OutputState(TypedDict):
graph_output: str
class OverallState(TypedDict):
foo: str
user_input: str
graph_output: str
class PrivateState(TypedDict):
bar: str
def node_1(state: InputState) -> OverallState:
# 写入 OverallState
return {"foo": state["user_input"] + " name"}
def node_2(state: OverallState) -> PrivateState:
# 从 OverallState 读取,写入 PrivateState
return {"bar": state["foo"] + " is"}
def node_3(state: PrivateState) -> OutputState:
# 从 PrivateState 读取,写入 OutputState
return {"graph_output": state["bar"] + " Lance"}
builder = StateGraph(OverallState,input_schema=InputState,output_schema=OutputState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", "node_3")
builder.add_edge("node_3", END)
graph = builder.compile()
graph.invoke({"user_input":"My"})
# {'graph_output': 'My name is Lance'}
这里有两点微妙且重要需要注意:
-
我们将
state: InputState 作为输入模式传递给 node_1。但是,我们写入 foo,这是 OverallState 中的一个通道。我们如何写入不在输入模式中的状态通道?这是因为节点 可以写入图状态中的任何状态通道。图状态是在初始化时定义的状态通道的并集,其中包括 OverallState 以及过滤器 InputState 和 OutputState。
-
我们用以下方式初始化图:
StateGraph(
OverallState,
input_schema=InputState,
output_schema=OutputState
)
我们如何在 node_2 中写入 PrivateState?如果未在 StateGraph 初始化中传递,图如何获得对该模式的访问权限?
我们可以这样做是因为 _nodes 也可以声明额外的状态 channels_,只要存在状态模式定义。在这种情况下,定义了 PrivateState 模式,因此我们可以将 bar 作为图中的新状态通道并写入它。
归约器
归约器对于理解如何将从节点发出的更新应用到 State 至关重要。State 中的每个键都有自己独立的归约器函数。如果没有明确指定归约器函数,则假定对该键的所有更新都应覆盖它。有几种不同类型的归约器,从默认类型的归约器开始:
默认归约器
这两个示例展示了如何使用默认归约器:
from typing_extensions import TypedDict
class State(TypedDict):
foo: int
bar: list[str]
在此示例中,未为任何键指定归约器函数。假设图的输入是:
{"foo": 1, "bar": ["hi"]}。然后假设第一个 Node 返回 {"foo": 2}。这被视为对状态的更新。注意 Node 不需要返回整个 State 模式 - 只需要一个更新。应用此更新后,State 将变为 {"foo": 2, "bar": ["hi"]}。如果第二个节点返回 {"bar": ["bye"]},则 State 将变为 {"foo": 2, "bar": ["bye"]}
from typing import Annotated
from typing_extensions import TypedDict
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
在此示例中,我们使用 Annotated 类型指定了第二个键 (bar) 的归约器函数 (operator.add)。注意第一个键保持不变。假设图的输入是 {"foo": 1, "bar": ["hi"]}。然后假设第一个 Node 返回 {"foo": 2}。这被视为对状态的更新。注意 Node 不需要返回整个 State 模式 - 只需要一个更新。应用此更新后,State 将变为 {"foo": 2, "bar": ["hi"]}。如果第二个节点返回 {"bar": ["bye"]},则 State 将变为 {"foo": 2, "bar": ["hi", "bye"]}。注意这里 bar 键是通过将两个列表相加来更新的。
在图状态中处理消息
为什么要使用消息?
大多数现代 LLM 提供商都具有聊天模型接口,接受消息列表作为输入。特别是 LangChain 的 聊天模型接口 接受消息对象列表作为输入。这些消息有多种形式,如 HumanMessage(用户输入)或 AIMessage(LLM 响应)。
有关消息对象的更多信息,请参阅 消息概念指南。
在图中使用消息
在许多情况下,将先前的对话历史作为消息列表存储在图状态中很有帮助。为此,我们可以向图状态添加一个键(通道)来存储 Message 对象列表,并使用归约器函数对其进行注释(见下面示例中的 messages 键)。归约器函数对于告诉图如何在每次状态更新时(例如当节点发送更新时)更新状态中的 Message 对象列表至关重要。如果您不指定归约器,每次状态更新都将用最近提供的值覆盖消息列表。如果您只想将消息附加到现有列表,可以使用 operator.add 作为归约器。
但是,您可能还希望在图状态中手动更新消息(例如人工干预)。如果您使用 operator.add,您发送给图的手动状态更新将被附加到现有的消息列表中,而不是更新现有消息。为了避免这种情况,您需要一个能够跟踪消息 ID 并在更新时覆盖现有消息的归约器。为了实现这一点,您可以使用预建的 add_messages 函数。对于全新的消息,它将简单地附加到现有列表,但它也会正确处理现有消息的更新。
序列化
除了跟踪消息 ID 外,add_messages 函数还会尝试在接收到 messages 通道上的状态更新时将消息反序列化为 LangChain Message 对象。
有关更多信息,请参阅 LangChain 序列化/反序列化。这允许以下格式发送图输入/状态更新:
# 这是支持的
{"messages": [HumanMessage(content="message")]}
# 这也支持
{"messages": [{"type": "human", "content": "message"}]}
由于使用 add_messages 时状态更新始终反序列化为 LangChain Messages,您应该使用点符号访问消息属性,如 state["messages"][-1].content。
下面是使用 add_messages 作为其归约器函数的图的示例。
from langchain.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict
class GraphState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
MessagesState
由于在状态中拥有消息列表非常常见,因此存在一个预建的状态 MessagesState,使其易于使用消息。MessagesState 定义了一个单一的 messages 键,它是 AnyMessage 对象列表,并使用 add_messages 归约器。通常,需要跟踪的状态不仅仅是消息,所以我们看到人们继承此状态并添加更多字段,例如:
from langgraph.graph import MessagesState
class State(MessagesState):
documents: list[str]
在 LangGraph 中,节点是 Python 函数(同步或异步),接受以下参数:
state—图的 状态
config—包含配置信息(如 thread_id)和追踪信息(如 tags)的 RunnableConfig 对象
runtime—包含 运行时 context 和其他信息(如 store、stream_writer 和 execution_info)的 Runtime 对象
类似于 NetworkX,您使用 add_node 方法将这些节点添加到图中:
from dataclasses import dataclass
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
class State(TypedDict):
input: str
results: str
@dataclass
class Context:
user_id: str
builder = StateGraph(State)
def plain_node(state: State):
return state
def node_with_runtime(state: State, runtime: Runtime[Context]):
print("In node: ", runtime.context.user_id)
return {"results": f"Hello, {state['input']}!"}
def node_with_config(state: State, config: RunnableConfig):
print("In node with thread_id: ", config["configurable"]["thread_id"])
return {"results": f"Hello, {state['input']}!"}
builder.add_node("plain_node", plain_node)
builder.add_node("node_with_runtime", node_with_runtime)
builder.add_node("node_with_config", node_with_config)
...
在幕后,函数被转换为 RunnableLambda,为您的函数添加批量和异步支持,以及 原生追踪和调试。
如果您在不指定名称的情况下将节点添加到图中,它将获得等同于函数名的默认名称。
builder.add_node(my_node)
# 然后您可以通过将其引用为 `"my_node"` 来创建到此节点/从此节点的边
START 节点
START 节点是一个特殊节点,代表向图发送用户输入的节点。引用此节点的主要目的是确定哪些节点应该首先被调用。
from langgraph.graph import START
graph.add_edge(START, "node_a")
END 节点
END 节点是一个特殊节点,代表终止节点。当您想要表示哪些边在完成之后没有操作时,会引用此节点。
from langgraph.graph import END
graph.add_edge("node_a", END)
节点缓存
LangGraph 支持基于节点输入的任务/节点缓存。要使用缓存:
- 在编译图(或指定入口点)时指定缓存
- 为节点指定缓存策略。每个缓存策略支持:
key_func 用于基于节点输入生成缓存键,默认为输入的 hash 加 pickle。
ttl,缓存的生存时间(秒)。如果未指定,缓存将永不过期。
例如:
import time
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
class State(TypedDict):
x: int
result: int
builder = StateGraph(State)
def expensive_node(state: State) -> dict[str, int]:
# 昂贵计算
time.sleep(2)
return {"result": state["x"] * 2}
builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
builder.set_entry_point("expensive_node")
builder.set_finish_point("expensive_node")
graph = builder.compile(cache=InMemoryCache())
print(graph.invoke({"x": 5}, stream_mode='updates'))
# [{'expensive_node': {'result': 10}}]
print(graph.invoke({"x": 5}, stream_mode='updates'))
# [{'expensive_node': {'result': 10}, '__metadata__': {'cached': True}}]
- 第一次运行需要两秒钟(由于模拟的昂贵计算)。
- 第二次运行利用缓存并快速返回。
边定义了逻辑如何路由以及图如何决定停止。这是您的智能体工作方式以及不同节点相互通信的重要部分。有几种关键的边类型:
- 普通边:直接从一节点到下一节点。
- 条件边:调用函数以确定接下来要前往哪个节点。
- 入口点:用户输入到达时首先调用的节点。
- 条件入口点:调用函数以确定用户输入到达时首先调用哪些节点。
一个节点可以有多个传出边。如果一个节点有多个传出边,所有这些目标节点将在下一个超级步骤中作为一部分并行执行。
普通边
如果您总是想从节点 A 到节点 B,可以直接使用 add_edge 方法。
graph.add_edge("node_a", "node_b")
条件边
如果您想可选地路由到一个或多个边(或可选地终止),可以使用 add_conditional_edges 方法。此方法接受一个节点名称和一个在该节点执行后要调用的“路由函数”:
graph.add_conditional_edges("node_a", routing_function)
类似于节点,routing_function 接受图的当前 state 并返回值。
默认情况下,routing_function 的返回值用作要将状态发送到下一个的节点名称(或节点列表)。所有这些节点将在下一个超级步骤中作为一部分并行运行。
您可以选择提供一个字典,将 routing_function 的输出映射到下一个节点的名称。
graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})
如果您想在单个函数中结合状态更新和路由,请使用 Command 代替条件边。
入口点
入口点是图启动时运行的第一个节点。您可以使用来自虚拟 START 节点到要执行的第一节点的 add_edge 方法来指定进入图的位置。
from langgraph.graph import START
graph.add_edge(START, "node_a")
条件入口点
条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用来自虚拟 START 节点的 add_conditional_edges 来完成此操作。
from langgraph.graph import START
graph.add_conditional_edges(START, routing_function)
您可以选择提供一个字典,将 routing_function 的输出映射到下一个节点的名称。
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})
Send
默认情况下,Nodes 和 Edges 是预先定义的,并在相同的共享状态上运行。但是,可能存在确切边在事前未知的情况,和/或您可能希望同时存在不同版本的 State。一个常见的例子是 map-reduce 设计模式。在这种设计模式中,第一个节点可能会生成对象列表,您可能希望对所有这些对象应用其他节点。对象的数量可能在事前未知(意味着边的数量可能未知),并且下游 Node 的输入 State 应该不同(每个生成的对象一个)。
为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:第一个是节点名称,第二个是要传递给该节点的状态。
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
graph.add_conditional_edges("node_a", continue_to_jokes)
Command
Command 是控制图执行的多功能原语。它接受四个参数:
update:应用状态更新(类似于从节点返回更新)。
goto:导航到特定节点(类似于 条件边)。
graph:从 子图 导航时定位父图。
resume:提供值以在 中断 后恢复执行。
Command 在三种上下文中使用:
从节点返回
update 和 goto
从节点函数返回 Command 以在单步中更新状态并路由到下一个节点:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
return Command(
# 状态更新
update={"foo": "bar"},
# 控制流
goto="my_other_node"
)
使用 Command,您还可以实现动态控制流行为(与 条件边 相同):
def my_node(state: State) -> Command[Literal["my_other_node"]]:
if state["foo"] == "bar":
return Command(update={"foo": "baz"}, goto="my_other_node")
当您既需要更新状态又需要路由到不同节点时使用 Command。如果您只需要路由而不更新状态,请使用 条件边。
在节点函数中返回 Command 时,您必须添加返回类型注解,列出节点正在路由到的节点名称,例如 Command[Literal["my_other_node"]]。这对于图渲染是必要的,并告诉 LangGraph my_node 可以导航到 my_other_node。
Command 仅添加动态边—使用 add_edge / addEdge 定义的静态边仍然执行。例如,如果 node_a 返回 Command(goto="my_other_node"),并且您也有 graph.add_edge("node_a", "node_b"),则 node_b 和 my_other_node 都将运行。
查看此 操作指南 以获取如何使用 Command 的端到端示例。
graph
如果您正在使用 子图,可以通过在 Command 中指定 graph=Command.PARENT 从子图中的节点导航到父图中的不同节点:
def my_node(state: State) -> Command[Literal["other_subgraph"]]:
return Command(
update={"foo": "bar"},
goto="other_subgraph", # `other_subgraph` 是父图中的节点
graph=Command.PARENT
)
将 graph 设置为 Command.PARENT 将导航到最近的父图。当您将更新从子图节点发送到父图节点,针对父图和子图 状态模式 共享的键时,您必须在父图状态中为您正在更新的键定义 归约器。请参阅此 示例。
这在实现 多智能体交接 时特别有用。有关详细信息,请查看 导航到父图中的节点。
输入到 invoke/stream
Command(resume=...) 是唯一 intended 作为 invoke()/stream() 输入的 Command 模式。不要使用 Command(update=...) 作为输入来继续多轮对话——因为传递任何 Command 作为输入会从最新的检查点恢复(即最后运行的步骤,而不是 __start__),如果图已经完成,它将看起来卡住。要在现有线程上继续对话,请传递普通输入字典:# 错误 - 图从最新检查点恢复
# (最后运行的步骤),看起来卡住
graph.invoke(Command(update={
"messages": [{"role": "user", "content": "follow up"}]
}), config)
# 正确 - 普通字典从 __start__ 重启
graph.invoke( {
"messages": [{"role": "user", "content": "follow up"}]
}, config)
resume
使用 Command(resume=...) 提供值并在 中断 后恢复图执行。传递给 resume 的值成为暂停节点内 interrupt() 调用的返回值:
from langgraph.types import Command, interrupt
def human_review(state: State):
# 暂停图并等待值
answer = interrupt("Do you approve?")
return {"messages": [{"role": "user", "content": answer}]}
# 第一次调用 - 命中中断并暂停
result = graph.invoke({"messages": [...]}, config)
# 使用值恢复 - interrupt() 调用返回 "yes"
result = graph.invoke(Command(resume="yes"), config)
查看 中断概念指南 以获取中断模式的完整详细信息,包括多个中断和验证循环。
从工具返回
您可以从工具返回 Command 以更新图状态和控制流。使用 update 修改状态(例如,保存对话期间查找的客户信息),并使用 goto 在工具完成后路由到特定节点。
在工具内部使用时,goto 添加动态边—已调用工具的节点上已定义的任何静态边仍将执行。
有关详细信息,请参阅 在工具中使用。
图迁移
即使使用检查点器跟踪状态,LangGraph 也能轻松处理图定义(节点、边和状态)的迁移。
- 对于图末尾的线程(即未中断),您可以更改图的整个拓扑(即所有节点和边,移除、添加、重命名等)
- 对于当前中断的线程,我们支持除重命名/移除节点之外的所有拓扑更改(因为该线程现在可能即将进入不再存在的节点)— 如果这是障碍,请联系我们,我们可以优先考虑解决方案。
- 对于修改状态,我们完全支持添加和移除键的向后和向前兼容性
- 重命名的状态键在现有线程中丢失其保存的状态
- 以不兼容方式更改类型的状态键目前可能导致具有更改前状态的线程出现问题 — 如果这是障碍,请联系我们,我们可以优先考虑解决方案。
运行时上下文
创建图时,您可以为传递给节点的运行时上下文指定 context_schema。这对于传递不属于图状态的信息到节点很有用。例如,您可能想要传递依赖项,如模型名称或数据库连接。
@dataclass
class ContextSchema:
llm_provider: str = "openai"
graph = StateGraph(State, context_schema=ContextSchema)
然后您可以使用 invoke 方法的 context 参数将此上下文传递给图。
graph.invoke(inputs, context={"llm_provider": "anthropic"})
然后您可以在节点或条件边中访问和使用此上下文:
from langgraph.runtime import Runtime
def node_a(state: State, runtime: Runtime[ContextSchema]):
llm = get_llm(runtime.context.llm_provider)
# ...
有关配置的完整分解,请参阅 添加运行时配置。
递归限制
递归限制设置单次执行期间图可以执行的 超级步骤 的最大数量。一旦达到限制,LangGraph 将抛出 GraphRecursionError。从版本 1.0.6 开始,默认递归限制设置为 1000 步。递归限制可以在运行时设置在任何图上,并通过配置字典传递给 invoke/stream。重要的是,recursion_limit 是一个独立的 config 键,不应像所有其他用户定义的配置一样传递在 configurable 键内。参见下面的示例:
graph.invoke(inputs, config={"recursion_limit": 5}, context={"llm": "anthropic"})
阅读 递归限制 以了解更多关于递归限制如何工作的信息。
访问和处理递归计数器
当前步骤计数器可在任何节点内的 config["metadata"]["langgraph_step"] 中访问,允许在达到递归限制之前主动处理递归。这使得您可以在图逻辑中实现优雅降级策略。
工作原理
步骤计数器存储在 config["metadata"]["langgraph_step"] 中。递归限制检查遵循逻辑:step > stop,其中 stop = step + recursion_limit + 1。当超过限制时,LangGraph 抛出 GraphRecursionError。
访问当前步骤计数器
您可以在任何节点内访问当前步骤计数器以监控执行进度。
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
def my_node(state: dict, config: RunnableConfig) -> dict:
current_step = config["metadata"]["langgraph_step"]
print(f"Currently on step: {current_step}")
return state
主动递归处理
LangGraph 提供 RemainingSteps 托管值,跟踪在达到递归限制之前还剩多少步。这允许您在图内进行优雅降级。
from typing import Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.managed import RemainingSteps
class State(TypedDict):
messages: Annotated[list, lambda x, y: x + y]
remaining_steps: RemainingSteps # 托管值 - 跟踪直到限制的步数
def reasoning_node(state: State) -> dict:
# RemainingSteps 由 LangGraph 自动填充
remaining = state["remaining_steps"]
# 检查是否步数不足
if remaining <= 2:
return {"messages": ["Approaching limit, wrapping up..."]}
# 正常处理
return {"messages": ["thinking..."]}
def route_decision(state: State) -> Literal["reasoning_node", "fallback_node"]:
"""根据剩余步数路由"""
if state["remaining_steps"] <= 2:
return "fallback_node"
return "reasoning_node"
def fallback_node(state: State) -> dict:
"""处理接近递归限制的情况"""
return {"messages": ["Reached complexity limit, providing best effort answer"]}
# 构建图
builder = StateGraph(State)
builder.add_node("reasoning_node", reasoning_node)
builder.add_node("fallback_node", fallback_node)
builder.add_edge(START, "reasoning_node")
builder.add_conditional_edges("reasoning_node", route_decision)
builder.add_edge("fallback_node", END)
graph = builder.compile()
# RemainingSteps 适用于任何 recursion_limit
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
主动与被动方法
处理递归限制主要有两种方法:主动(在图内监控)和被动(在外部捕获错误)。
from typing import Annotated, Literal, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed import RemainingSteps
from langgraph.errors import GraphRecursionError
class State(TypedDict):
messages: Annotated[list, lambda x, y: x + y]
remaining_steps: RemainingSteps
# 主动方法(推荐)- 使用 RemainingSteps
def agent_with_monitoring(state: State) -> dict:
"""在图内主动监控和处理递归"""
remaining = state["remaining_steps"]
# 早期检测 - 路由到内部处理
if remaining <= 2:
return {
"messages": ["Approaching limit, returning partial result"]
}
# 正常处理
return {"messages": [f"Processing... ({remaining} steps remaining)"]}
def route_decision(state: State) -> Literal["agent", END]:
if state["remaining_steps"] <= 2:
return END
return "agent"
# 构建图
builder = StateGraph(State)
builder.add_node("agent", agent_with_monitoring)
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", route_decision)
graph = builder.compile()
# 主动:图优雅完成
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
# 被动方法(后备)- 在外部捕获错误
try:
result = graph.invoke({"messages": []}, {"recursion_limit": 10})
except GraphRecursionError as e:
# 在图执行失败后在外部处理
result = {"messages": ["Fallback: recursion limit exceeded"]}
这些方法之间的主要区别是:
| 方法 | 检测 | 处理 | 控制流 |
|---|
主动(使用 RemainingSteps) | 达到限制之前 | 通过条件路由在图内 | 图继续到完成节点 |
被动(捕获 GraphRecursionError) | 超过限制之后 | 在 try/catch 外的图外 | 图执行终止 |
主动优势:
- 图内的优雅降级
- 可以在检查点中保存中间状态
- 更好的用户体验和部分结果
- 图正常完成(无异常)
被动优势:
其他可用元数据
除了 langgraph_step,以下元数据也可在 config["metadata"] 中获取:
def inspect_metadata(state: dict, config: RunnableConfig) -> dict:
metadata = config["metadata"]
print(f"Step: {metadata['langgraph_step']}")
print(f"Node: {metadata['langgraph_node']}")
print(f"Triggers: {metadata['langgraph_triggers']}")
print(f"Path: {metadata['langgraph_path']}")
print(f"Checkpoint NS: {metadata['langgraph_checkpoint_ns']}")
return state
可视化
能够可视化图通常很好,尤其是随着它们变得越来越复杂。LangGraph 自带几种内置方式来可视化图。有关更多信息,请参阅 可视化您的图。
可观测性与追踪
要追踪、调试和评估您的智能体,请使用 LangSmith。
了解更多