Skip to main content
本指南演示了 LangGraph 的 Graph API 的基础知识。它将介绍 状态,以及组合常见的图结构,如 序列分支循环。它还涵盖了 LangGraph 的控制功能,包括用于 map-reduce 工作流的 Send API 以及用于将状态更新与节点间的“跳转”相结合的 Command API

设置

安装 langgraph
pip install -U langgraph
设置 LangSmith 以获得更好的调试体验注册 LangSmith 以快速发现问题并提高 LangGraph 项目的性能。LangSmith 允许您使用跟踪数据来调试、测试和监控使用 LangGraph 构建的 LLM 应用——有关如何入门的更多信息,请阅读 文档

定义和更新状态

在此我们展示如何在 LangGraph 中定义和更新 状态。我们将演示:
  1. 如何使用状态来定义图的 模式
  2. 如何使用 归约器 来控制如何处理状态更新。

定义状态

LangGraph 中的 状态 可以是 TypedDictPydantic 模型或 dataclass。下面我们将使用 TypedDict。有关使用 Pydantic 的详细信息,请参阅 为图状态使用 Pydantic 模型 默认情况下,图将具有相同的输入和输出模式,状态决定了该模式。有关如何定义不同的输入和输出模式,请参阅 定义输入和输出模式 让我们考虑一个使用 消息 的简单示例。这代表了适用于许多 LLM 应用的通用状态表述。有关更多详细信息,请参阅我们的 概念页面
from langchain.messages import AnyMessage
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[AnyMessage]
    extra_field: int
此状态跟踪 消息 对象列表,以及一个额外的整数字段。

更新状态

让我们构建一个包含单个节点的示例图。我们的 节点 只是一个读取图的 state 并对其进行更新的 Python 函数。此函数的第一个参数始终是 state:
from langchain.messages import AIMessage

def node(state: State):
    messages = state["messages"]
    new_message = AIMessage("Hello!")
    return {"messages": messages + [new_message], "extra_field": 10}
此节点只是将一条消息附加到我们的消息列表中,并填充一个额外字段。
节点应直接返回对状态的更新,而不是修改状态。
接下来让我们定义一个包含此节点的简单图。我们使用 StateGraph 来定义一个在此状态下操作的图。然后我们使用 add_node 来填充我们的图。
from langgraph.graph import StateGraph

builder = StateGraph(State)
builder.add_node(node)
builder.set_entry_point("node")
graph = builder.compile()
LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查我们的图。有关可视化的详细信息,请参阅 可视化您的图
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
包含单个节点的简单图 在这种情况下,我们的图只执行单个节点。让我们继续进行简单的调用:
from langchain.messages import HumanMessage

result = graph.invoke({"messages": [HumanMessage("Hi")]})
result
{'messages': [HumanMessage(content='Hi'), AIMessage(content='Hello!')], 'extra_field': 10}
请注意:
  • 我们通过更新状态的单个键来启动调用。
  • 我们在调用结果中接收整个状态。
为了方便起见,我们经常通过美化打印来检查 消息对象 的内容:
for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

使用归约器处理状态更新

状态中的每个键都可以有自己的独立 归约器 函数,它控制如何应用来自节点的更新。如果没有显式指定归约器函数,则假设对该键的所有更新都应覆盖它。 对于 TypedDict 状态模式,我们可以通过使用归约器函数注释状态的相应字段来定义归约器。 在之前的示例中,我们的节点通过在消息列表中追加一条消息来更新状态中的 "messages" 键。下面,我们为此键添加一个归约器,以便自动追加更新:
from typing_extensions import Annotated

def add(left, right):
    """也可以从内置的 `operator` 导入 `add`。"""
    return left + right

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add]
    extra_field: int
现在我们的节点可以简化为:
def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}
from langgraph.graph import START

graph = StateGraph(State).add_node(node).add_edge(START, "node").compile()

result = graph.invoke({"messages": [HumanMessage("Hi")]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!

MessagesState

在实践中,更新消息列表还有其他注意事项:
  • 我们可能希望更新状态中的现有消息。
  • 我们可能希望接受 消息格式 的简写,例如 OpenAI 格式
LangGraph 包含一个内置归约器 add_messages,可处理这些注意事项:
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    extra_field: int

def node(state: State):
    new_message = AIMessage("Hello!")
    return {"messages": [new_message], "extra_field": 10}

graph = StateGraph(State).add_node(node).set_entry_point("node").compile()
input_message = {"role": "user", "content": "Hi"}

result = graph.invoke({"messages": [input_message]})

for message in result["messages"]:
    message.pretty_print()
================================ Human Message ================================

Hi
================================== Ai Message ==================================

Hello!
这是涉及 聊天模型 的应用程序的通用状态表示。LangGraph 包含预构建的 MessagesState 以供方便使用,因此我们可以拥有:
from langgraph.graph import MessagesState

class State(MessagesState):
    extra_field: int

使用 Overwrite 绕过归约器

在某些情况下,您可能想要绕过归约器并直接覆盖状态值。LangGraph 为此目的提供了 Overwrite 类型。当节点返回用 Overwrite 包装的值时,归约器将被绕过,通道将直接设置为该值。 当您想要重置或替换累积的状态而不是将其与现有值合并时,这很有用。
from langgraph.graph import StateGraph, START, END
from langgraph.types import Overwrite
from typing_extensions import Annotated, TypedDict
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]

def add_message(state: State):
    return {"messages": ["first message"]}

def replace_messages(state: State):
    # 绕过归约器并替换整个消息列表
    return {"messages": Overwrite(["replacement message"])}

builder = StateGraph(State)
builder.add_node("add_message", add_message)
builder.add_node("replace_messages", replace_messages)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", "replace_messages")
builder.add_edge("replace_messages", END)

graph = builder.compile()

result = graph.invoke({"messages": ["initial"]})
print(result["messages"])
['replacement message']
您也可以使用带有特殊键 "__overwrite__" 的 JSON 格式:
def replace_messages(state: State):
    return {"messages": {"__overwrite__": ["replacement message"]}}
当节点并行执行时,在同一个超级步骤中,只有一个节点可以在相同的状态键上使用 Overwrite。如果多个节点尝试在同一个超级步骤中覆盖相同的键,将引发 InvalidUpdateError

定义输入和输出模式

默认情况下,StateGraph 使用单一模式运行,所有节点都预期使用该模式进行通信。但是,也可以为图定义不同的输入和输出模式。 当指定不同的模式时,内部仍将使用模式用于节点之间的通信。输入模式确保提供的输入符合预期的结构,而输出模式过滤内部数据,仅根据定义的模式返回相关信息。 下面,我们将看到如何定义不同的输入和输出模式。
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 定义输入模式
class InputState(TypedDict):
    question: str

# 定义输出模式
class OutputState(TypedDict):
    answer: str

# 定义整体模式,结合输入和输出
class OverallState(InputState, OutputState):
    pass

# 定义处理输入并生成答案的节点
def answer_node(state: InputState):
    # 示例答案和一个额外键
    return {"answer": "bye", "question": state["question"]}

# 构建图,指定输入和输出模式
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node(answer_node)  # 添加答案节点
builder.add_edge(START, "answer_node")  # 定义起始边
builder.add_edge("answer_node", END)  # 定义结束边
graph = builder.compile()  # 编译图

# 使用输入调用图并打印结果
print(graph.invoke({"question": "hi"}))
{'answer': 'bye'}
请注意,调用的输出仅包含输出模式。

在节点之间传递私有状态

在某些情况下,您可能希望节点交换对中间逻辑至关重要但不需要成为图主模式一部分的信息。这种私有数据与图的整体输入/输出无关,应仅在特定节点之间共享。 下面,我们将创建一个由三个节点(node_1、node_2 和 node_3)组成的示例顺序图,其中私有数据在前两个步骤(node_1 和 node_2)之间传递,而第三个步骤(node_3)只能访问公共整体状态。
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict

# 图的整体状态(这是在节点间共享的公共状态)
class OverallState(TypedDict):
    a: str

# node_1 的输出包含不属于整体状态的私有数据
class Node1Output(TypedDict):
    private_data: str

# 私有数据仅在 node_1 和 node_2 之间共享
def node_1(state: OverallState) -> Node1Output:
    output = {"private_data": "set by node_1"}
    print(f"进入节点 `node_1`:\n\t输入:{state}\n\t返回:{output}")
    return output

# node_2 的输入仅请求 node_1 之后可用的私有数据
class Node2Input(TypedDict):
    private_data: str

def node_2(state: Node2Input) -> OverallState:
    output = {"a": "set by node_2"}
    print(f"进入节点 `node_2`:\n\t输入:{state}\n\t返回:{output}")
    return output

# node_3 只能访问整体状态(无法访问 node_1 的私有数据)
def node_3(state: OverallState) -> OverallState:
    output = {"a": "set by node_3"}
    print(f"进入节点 `node_3`:\n\t输入:{state}\n\t返回:{output}")
    return output

# 按顺序连接节点
# node_2 接受来自 node_1 的私有数据,而
# node_3 看不到私有数据。
builder = StateGraph(OverallState).add_sequence([node_1, node_2, node_3])
builder.add_edge(START, "node_1")
graph = builder.compile()

# 使用初始状态调用图
response = graph.invoke(
    {
        "a": "set at start",
    }
)

print()
print(f"图调用输出:{response}")
进入节点 `node_1`:
    输入:{'a': 'set at start'}。
    返回:{'private_data': 'set by node_1'}
进入节点 `node_2`:
    输入:{'private_data': 'set by node_1'}。
    返回:{'a': 'set by node_2'}
进入节点 `node_3`:
    输入:{'a': 'set by node_2'}。
    返回:{'a': 'set by node_3'}

图调用输出:{'a': 'set by node_3'}

为图状态使用 Pydantic 模型

StateGraph 在初始化时接受 state_schema 参数,指定图中节点可以访问和更新的 state 的“形状”。 在我们的示例中,我们通常使用 Python 原生的 TypedDictdataclass 作为 state_schema,但 state_schema 可以是任何 类型 在这里,我们将看到如何使用 Pydantic BaseModel 作为 state_schema 来为 输入 添加运行时验证。
已知限制
  • 目前,图的输出 不会 是 pydantic 模型的实例。
  • 运行时验证仅发生在图中第一个节点的输入上,而不发生在后续节点或输出上。
  • pydantic 的验证错误跟踪不显示错误出现在哪个节点。
  • Pydantic 的递归验证可能很慢。对于性能敏感的应用程序,您可能需要考虑使用 dataclass
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
from pydantic import BaseModel

# 图的整体状态(这是在节点间共享的公共状态)
class OverallState(BaseModel):
    a: str

def node(state: OverallState):
    return {"a": "goodbye"}

# 构建状态图
builder = StateGraph(OverallState)
builder.add_node(node)  # node_1 是第一个节点
builder.add_edge(START, "node")  # 用 node_1 启动图
builder.add_edge("node", END)  # 在 node_1 后结束图
graph = builder.compile()

# 使用有效输入测试图
graph.invoke({"a": "hello"})
使用 无效 输入调用图
try:
    graph.invoke({"a": 123})  # 应该是字符串
except Exception as e:
    print("由于 `a` 是整数而不是字符串,引发了异常。")
    print(e)
由于 `a` 是整数而不是字符串,引发了异常。
1 validation error for OverallState
a
  Input should be a valid string [type=string_type, input_value=123, input_type=int]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type
有关 Pydantic 模型状态的附加功能,请参见下方:
当使用 Pydantic 模型作为状态模式时,了解序列化如何工作非常重要,特别是在以下情况:
  • 将 Pydantic 对象作为输入传递
  • 接收来自图的输出
  • 处理嵌套的 Pydantic 模型
让我们看看这些行为的实际效果。
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class NestedModel(BaseModel):
    value: str

class ComplexState(BaseModel):
    text: str
    count: int
    nested: NestedModel

def process_node(state: ComplexState):
    # 节点接收已验证的 Pydantic 对象
    print(f"输入状态类型:{type(state)}")
    print(f"嵌套类型:{type(state.nested)}")
    # 返回字典更新
    return {"text": state.text + " processed", "count": state.count + 1}

# 构建图
builder = StateGraph(ComplexState)
builder.add_node("process", process_node)
builder.add_edge(START, "process")
builder.add_edge("process", END)
graph = builder.compile()

# 创建 Pydantic 实例作为输入
input_state = ComplexState(text="hello", count=0, nested=NestedModel(value="test"))
print(f"输入对象类型:{type(input_state)}")

# 使用 Pydantic 实例调用图
result = graph.invoke(input_state)
print(f"输出类型:{type(result)}")
print(f"输出内容:{result}")

# 如果需要,转换回 Pydantic 模型
output_model = ComplexState(**result)
print(f"转换回 Pydantic:{type(output_model)}")
Pydantic 会对某些数据类型执行运行时类型强制转换。这可能很有帮助,但如果不知道这一点,也可能导致意外行为。
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel

class CoercionExample(BaseModel):
    # Pydantic 会将字符串数字强制转换为整数
    number: int
    # Pydantic 会将字符串布尔值解析为 bool
    flag: bool

def inspect_node(state: CoercionExample):
    print(f"number: {state.number} (类型:{type(state.number)})")
    print(f"flag: {state.flag} (类型:{type(state.flag)})")
    return {}

builder = StateGraph(CoercionExample)
builder.add_node("inspect", inspect_node)
builder.add_edge(START, "inspect")
builder.add_edge("inspect", END)
graph = builder.compile()

# 演示将转换为字符串输入
result = graph.invoke({"number": "42", "flag": "true"})

# 这将因验证错误而失败
try:
    graph.invoke({"number": "not-a-number", "flag": "true"})
except Exception as e:
    print(f"\n预期的验证错误:{e}")
当在状态模式中使用 LangChain 消息类型时,序列化方面有重要的注意事项。在使用消息对象通过网络传输时,您应该使用 AnyMessage(而不是 BaseMessage)来进行正确的序列化/反序列化。
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel
from langchain.messages import HumanMessage, AIMessage, AnyMessage
from typing import List

class ChatState(BaseModel):
    messages: List[AnyMessage]
    context: str

def add_message(state: ChatState):
    return {"messages": state.messages + [AIMessage(content="Hello there!")]}

builder = StateGraph(ChatState)
builder.add_node("add_message", add_message)
builder.add_edge(START, "add_message")
builder.add_edge("add_message", END)
graph = builder.compile()

# 使用消息创建输入
initial_state = ChatState(
    messages=[HumanMessage(content="Hi")], context="Customer support chat"
)

result = graph.invoke(initial_state)
print(f"输出:{result}")

# 转换回 Pydantic 模型以查看消息类型
output_model = ChatState(**result)
for i, msg in enumerate(output_model.messages):
    print(f"消息 {i}: {type(msg).__name__} - {msg.content}")

添加运行时配置

有时您希望在调用图时能够配置它。例如,您可能希望在运行时指定要使用什么 LLM 或系统提示,而 无需将这些参数污染到图状态中 要添加运行时配置:
  1. 指定配置的 schema
  2. 将配置添加到节点或条件边的函数签名中
  3. 将配置传递给图。
下面是一个简单示例:
from langgraph.graph import END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

# 1. 指定配置 schema
class ContextSchema(TypedDict):
    my_runtime_value: str

# 2. 定义一个在节点中访问配置的图
class State(TypedDict):
    my_state_value: str

def node(state: State, runtime: Runtime[ContextSchema]):
    if runtime.context["my_runtime_value"] == "a":
        return {"my_state_value": 1}
    elif runtime.context["my_runtime_value"] == "b":
        return {"my_state_value": 2}
    else:
        raise ValueError("未知值。")

builder = StateGraph(State, context_schema=ContextSchema)
builder.add_node(node)
builder.add_edge(START, "node")
builder.add_edge("node", END)

graph = builder.compile()

# 3. 在运行时传入配置:
print(graph.invoke({}, context={"my_runtime_value": "a"}))
print(graph.invoke({}, context={"my_runtime_value": "b"}))
{'my_state_value': 1}
{'my_state_value': 2}
下面我们演示一个实际示例,在其中配置在运行时使用什么 LLM。我们将同时使用 OpenAI 和 Anthropic 模型。
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, END, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"

MODELS = {
    "anthropic": init_chat_model("claude-haiku-4-5-20251001"),
    "openai": init_chat_model("gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    response = model.invoke(state["messages"])
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# 用法
input_message = {"role": "user", "content": "hi"}
# 没有配置时,使用默认值(Anthropic)
response_1 = graph.invoke({"messages": [input_message]}, context=ContextSchema())["messages"][-1]
# 或者,可以设置 OpenAI
response_2 = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai"})["messages"][-1]

print(response_1.response_metadata["model_name"])
print(response_2.response_metadata["model_name"])
claude-haiku-4-5-20251001
gpt-4.1-mini-2025-04-14
下面我们演示一个实际示例,在其中配置两个参数:在运行时使用的 LLM 和系统消息。
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langchain.messages import SystemMessage
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class ContextSchema:
    model_provider: str = "anthropic"
    system_message: str | None = None

MODELS = {
    "anthropic": init_chat_model("claude-haiku-4-5-20251001"),
    "openai": init_chat_model("gpt-4.1-mini"),
}

def call_model(state: MessagesState, runtime: Runtime[ContextSchema]):
    model = MODELS[runtime.context.model_provider]
    messages = state["messages"]
    if (system_message := runtime.context.system_message):
        messages = [SystemMessage(system_message)] + messages
    response = model.invoke(messages)
    return {"messages": [response]}

builder = StateGraph(MessagesState, context_schema=ContextSchema)
builder.add_node("model", call_model)
builder.add_edge(START, "model")
builder.add_edge("model", END)

graph = builder.compile()

# 用法
input_message = {"role": "user", "content": "hi"}
response = graph.invoke({"messages": [input_message]}, context={"model_provider": "openai", "system_message": "Respond in Italian."})
for message in response["messages"]:
    message.pretty_print()
================================ Human Message ================================

hi
================================== Ai Message ==================================

Ciao! Come posso aiutarti oggi?

添加重试策略

有许多用例可能需要您的节点具有自定义重试策略,例如如果您正在调用 API、查询数据库或调用 LLM 等。LangGraph 允许您将重试策略添加到节点。 要配置重试策略,请将 retry_policy 参数传递给 add_noderetry_policy 参数接受 RetryPolicy 命名元组对象。下面我们使用默认参数实例化 RetryPolicy 对象并将其与节点关联:
from langgraph.types import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry_policy=RetryPolicy(),
)
默认情况下,retry_on 参数使用 default_retry_on 函数,它在除以下之外的任何异常上重试:
  • ValueError
  • TypeError
  • ArithmeticError
  • ImportError
  • LookupError
  • NameError
  • SyntaxError
  • RuntimeError
  • ReferenceError
  • StopIteration
  • StopAsyncIteration
  • OSError
此外,对于来自流行的 http 请求库(如 requestshttpx)的异常,它仅在 5xx 状态码上重试。
考虑一个我们从 SQL 数据库读取的示例。下面我们将两个不同的重试策略传递给节点:
import sqlite3
from typing_extensions import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.types import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("claude-haiku-4-5-20251001")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 定义一个新图
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry_policy=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry_policy=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()

在节点内访问执行信息

需要 langgraph>=1.1.3。当节点具有重试策略时,您可以通过 runtime.execution_info 检查当前尝试次数和第一次尝试的时间。这对于根据重试状态调整行为非常有用(例如,在第一次尝试失败后切换到备用提供商)。
属性描述
node_attempt当前执行尝试次数(从 1 开始索引)。第一次尝试时为 1,第一次重试时为 2,依此类推。
node_first_attempt_time第一次尝试开始的 Unix 时间戳(秒)。在重试期间保持不变。
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime
from langgraph.types import RetryPolicy
from typing_extensions import TypedDict

class State(TypedDict):
    result: str

def my_node(state: State, runtime: Runtime):
    info = runtime.execution_info
    if info.node_attempt > 1:
        # 在重试时使用备用方案
        return {"result": call_fallback_api()}
    return {"result": call_primary_api()}

builder = StateGraph(State)
builder.add_node("my_node", my_node, retry_policy=RetryPolicy(max_attempts=3))
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
即使没有重试策略,execution_info 也可用于 Runtime 对象——node_attempt 默认为 1node_first_attempt_time 设置为节点开始执行的时间。

添加节点缓存

节点缓存在您希望避免重复操作的情况下非常有用,例如在执行昂贵操作时(无论是时间还是成本方面)。LangGraph 允许您在图中的节点中添加个性化的缓存策略。 要配置缓存策略,请将 cache_policy 参数传递给 add_node 函数。在下面的示例中,使用 120 秒的生存时间和默认 key_func 生成器实例化 CachePolicy 对象。然后将其与节点关联:
from langgraph.types import CachePolicy

builder.add_node(
    "node_name",
    node_function,
    cache_policy=CachePolicy(ttl=120),
)
然后,要为图启用节点级缓存,请在编译图时设置 cache 参数。下面的示例使用 InMemoryCache 来设置具有内存缓存的图,但也提供 SqliteCache
from langgraph.cache.memory import InMemoryCache

graph = builder.compile(cache=InMemoryCache())

创建步骤序列

先决条件 本指南假设您熟悉上面关于 状态 的部分。
在此我们演示如何构建简单的步骤序列。我们将展示:
  1. 如何构建顺序图
  2. 构建类似图的内置简写。
要添加节点序列,我们使用 add_nodeadd_edge 方法:
from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
我们也可以使用内置简写 .add_sequence
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")
LangGraph 使为您的应用程序添加底层持久层变得容易。 这允许在节点执行之间对状态进行检查点,因此您的 LangGraph 节点管理:它们还决定执行步骤如何 流式传输,以及如何使用 Studio 可视化和调试您的应用程序。让我们演示一个端到端示例。我们将创建三个步骤的序列:
  1. 在状态的键中填充值
  2. 更新相同的值
  3. 填充不同的值
首先让我们定义我们的 状态。这管理图的 模式,还可以指定如何应用更新。有关更多详细信息,请参阅 使用归约器处理状态更新在我们的例子中,我们将只跟踪两个值:
from typing_extensions import TypedDict

class State(TypedDict):
    value_1: str
    value_2: int
我们的 节点 只是读取图的 state 并对其进行更新的 Python 函数。此函数的第一个参数始终是 state:
def step_1(state: State):
    return {"value_1": "a"}

def step_2(state: State):
    current_value_1 = state["value_1"]
    return {"value_1": f"{current_value_1} b"}

def step_3(state: State):
    return {"value_2": 10}
请注意,在向状态发出更新时,每个节点只需指定其希望更新的键的值。默认情况下,这将 覆盖 相应键的值。您还可以使用 归约器 来控制如何处理更新——例如,您可以将连续更新附加到键。有关更多详细信息,请参阅 使用归约器处理状态更新
最后,我们定义图。我们使用 StateGraph 来定义一个在此状态下操作的图。然后我们将使用 add_nodeadd_edge 来填充我们的图并定义其控制流。
from langgraph.graph import START, StateGraph

builder = StateGraph(State)

# 添加节点
builder.add_node(step_1)
builder.add_node(step_2)
builder.add_node(step_3)

# 添加边
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
指定自定义名称 您可以使用 add_node 为节点指定自定义名称:
builder.add_node("my_node", step_1)
请注意:
  • add_edge 接受节点的名称,对于函数默认为 node.__name__
  • 我们必须指定图的入口点。为此,我们添加一条带有 START 节点 的边。
  • 当没有更多节点可执行时,图停止。
我们接下来 编译 我们的图。这提供了对图结构的几个基本检查(例如,识别孤立节点)。如果我们通过 检查点器 为我们的应用程序添加持久性,它也将在此处传递。
graph = builder.compile()
LangGraph 提供了用于可视化您的图的内置实用程序。让我们检查我们的序列。有关可视化的详细信息,请参阅 可视化您的图
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
步骤序列图让我们继续进行简单的调用:
graph.invoke({"value_1": "c"})
{'value_1': 'a b', 'value_2': 10}
请注意:
  • 我们通过为单个状态键提供值来启动调用。我们必须始终至少提供一个键的值。
  • 我们传入的值被第一个节点覆盖。
  • 第二个节点更新了该值。
  • 第三个节点填充了不同的值。
内置简写 langgraph>=0.2.46 包含用于添加节点序列的内置简写 add_sequence。您可以如下编译相同的图:
builder = StateGraph(State).add_sequence([step_1, step_2, step_3])
builder.add_edge(START, "step_1")

graph = builder.compile()

graph.invoke({"value_1": "c"})

创建分支

节点的并行执行对于加快整体图操作至关重要。LangGraph 原生支持节点的并行执行,可以显著提高基于图的工作流的性能。这种并行化是通过扇出和扇入机制实现的,利用标准边和 条件边。以下是一些示例,展示如何添加适合您的分支数据流。

并行运行图节点

在此示例中,我们从 Node A 扇出到 B 和 C,然后扇入到 D。使用我们的状态,我们指定归约器 add 操作。这将组合或累积特定键的值,而不是简单地覆盖现有值。对于列表,这意味着将新列表与现有列表连接。有关使用归约器更新状态的更多详细信息,请参阅上面的 状态归约器 部分。
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其变为追加模式
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'向 {state["aggregate"]} 添加 "A"')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'向 {state["aggregate"]} 添加 "B"')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'向 {state["aggregate"]} 添加 "C"')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'向 {state["aggregate"]} 添加 "D"')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
并行执行图 使用归约器,您可以看到每个节点中添加的值都被累积了。
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "D" to ['A', 'B', 'C']
在上述示例中,节点 "b""c" 在同一个 超级步骤 中并发执行。因为它们在同一步骤中,所以节点 "d""b""c" 完成后执行。重要的是,来自并行超级步骤的更新可能不会按一致的顺序排列。如果您需要来自并行超级步骤的一致、预定顺序的更新,您应该将输出写入状态中的单独字段,并附带用于排序的值。
LangGraph 在 超级步骤 内执行节点,这意味着虽然并行分支并行执行,但整个超级步骤是 事务性 的。如果这些分支中的任何一个抛出异常,没有任何 更新应用于状态(整个超级步骤出错)。重要的是,当使用 检查点器 时,超级步骤内成功节点的結果会被保存,并且在恢复时不会重复。如果您有容易出错的(也许想处理不稳定的 API 调用)节点,LangGraph 提供两种解决方法:
  1. 您可以在节点内编写常规 Python 代码来捕获和处理异常。
  2. 您可以设置 retry_policy 以指导图重试抛出某些类型异常的节点。只有失败的分支会被重试,因此您无需担心执行冗余工作。
结合这两者,您可以执行并行执行并完全控制异常处理。
设置最大并发数 您可以通过在调用图时在 配置 中设置 max_concurrency 来控制最大并发任务数。
graph.invoke({"value_1": "c"}, {"configurable": {"max_concurrency": 10}})

延迟节点执行

延迟节点执行在您希望延迟节点执行直到所有其他待处理任务完成时非常有用。这在分支长度不同的情况下特别相关,这在 map-reduce 流等工作流中很常见。 上面的示例展示了当每条路径只有一步时如何扇出和扇入。但是如果一个分支有多步怎么办?让我们在 "b" 分支中添加一个节点 "b_2"
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其变为追加模式
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'向 {state["aggregate"]} 添加 "A"')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'向 {state["aggregate"]} 添加 "B"')
    return {"aggregate": ["B"]}

def b_2(state: State):
    print(f'向 {state["aggregate"]} 添加 "B_2"')
    return {"aggregate": ["B_2"]}

def c(state: State):
    print(f'向 {state["aggregate"]} 添加 "C"')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'向 {state["aggregate"]} 添加 "D"')
    return {"aggregate": ["D"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(b_2)
builder.add_node(c)
builder.add_node(d, defer=True)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
延迟执行图
graph.invoke({"aggregate": []})
Adding "A" to []
Adding "B" to ['A']
Adding "C" to ['A']
Adding "B_2" to ['A', 'B', 'C']
Adding "D" to ['A', 'B', 'C', 'B_2']
在上述示例中,节点 "b""c" 在同一个超级步骤中并发执行。我们在节点 d 上设置了 defer=True,因此它不会执行直到所有待处理任务完成。在这种情况下,这意味着 "d" 等待执行直到整个 "b" 分支完成。

条件分支

如果您的扇出应根据状态在运行时变化,您可以使用 add_conditional_edges 使用图状态选择一个或多个路径。见下方示例,其中节点 a 生成确定下一个节点的状态更新。
import operator
from typing import Annotated, Literal, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    # 向状态添加一个键。我们将设置此键以确定
    # 我们如何分支。
    which: str

def a(state: State):
    print(f'向 {state["aggregate"]} 添加 "A"')
    return {"aggregate": ["A"], "which": "c"}

def b(state: State):
    print(f'向 {state["aggregate"]} 添加 "B"')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'向 {state["aggregate"]} 添加 "C"')
    return {"aggregate": ["C"]}

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_edge(START, "a")
builder.add_edge("b", END)
builder.add_edge("c", END)

def conditional_edge(state: State) -> Literal["b", "c"]:
    # 在此填写任意逻辑,使用状态
    # 来确定下一个节点
    return state["which"]

builder.add_conditional_edges("a", conditional_edge)

graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
条件分支图
result = graph.invoke({"aggregate": []})
print(result)
Adding "A" to []
Adding "C" to ['A']
{'aggregate': ['A', 'C'], 'which': 'c'}
您的条件边可以路由到多个目标节点。例如:
def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]

Map-Reduce 和 Send API

LangGraph 支持使用 Send API 进行 map-reduce 和其他高级分支模式。以下是如何使用它的示例:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing_extensions import TypedDict, Annotated
import operator

class OverallState(TypedDict):
    topic: str
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]
    best_selected_joke: str

def generate_topics(state: OverallState):
    return {"subjects": ["lions", "elephants", "penguins"]}

def generate_joke(state: OverallState):
    joke_map = {
        "lions": "Why don't lions like fast food? Because they can't catch it!",
        "elephants": "Why don't elephants use computers? They're afraid of the mouse!",
        "penguins": "Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice."
    }
    return {"jokes": [joke_map[state["subject"]]]}

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def best_joke(state: OverallState):
    return {"best_selected_joke": "penguins"}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_node("best_joke", best_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
builder.add_edge("generate_joke", "best_joke")
builder.add_edge("best_joke", END)
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
带有扇出的 map-reduce 图
# 调用图:这里我们调用它以生成笑话列表
for step in graph.stream({"topic": "animals"}):
    print(step)
{'generate_topics': {'subjects': ['lions', 'elephants', 'penguins']}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ['Why don't penguins like talking to strangers at parties? Because they find it hard to break the ice.']}}
{'best_joke': {'best_selected_joke': 'penguins'}}

创建和控制循环

在创建带有循环的图时,我们需要一种终止执行的机制。这通常是通过添加 条件边 来实现的,一旦达到某些终止条件,该边就会路由到 END 节点。 您也可以在调用或流式传输图时设置图递归限制。递归限制设置图在抛出错误之前允许执行的 超级步骤 数量。有关 递归限制概念 的更多信息。 让我们考虑一个简单的带有循环的图,以更好地了解这些机制如何工作。
若要返回状态的最后一个值而不是收到递归限制错误,请参阅 下一节
创建循环时,您可以包含指定终止条件的条件边:
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

def route(state: State) -> Literal["b", END]:
    if termination_condition(state):
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
要控制递归限制,请在配置中指定 "recursionLimit"。这将抛出 GraphRecursionError,您可以捕获并处理:
from langgraph.errors import GraphRecursionError

try:
    graph.invoke(inputs, {"recursion_limit": 3})
except GraphRecursionError:
    print("Recursion Error")
让我们定义一个带有简单循环的图。请注意,我们使用条件边来实现终止条件。
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    # operator.add 归约器函数使其变为追加模式
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'节点 A 看到 {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'节点 B 看到 {state["aggregate"]}')
    return {"aggregate": ["B"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# 定义边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
简单循环图 此架构类似于 ReAct agent,其中节点 "a" 是调用工具的模型,节点 "b" 代表工具。 在我们的 route 条件边中,我们指定在状态中的 "aggregate" 列表超过阈值长度后我们应该结束。 调用图时,我们看到我们在达到终止条件之前在节点 "a""b" 之间交替。
graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
Node B sees ['A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B']
Node B sees ['A', 'B', 'A', 'B', 'A']
Node A sees ['A', 'B', 'A', 'B', 'A', 'B']

施加递归限制

在某些应用中,我们无法保证将达到给定的终止条件。在这些情况下,我们可以设置图的 递归限制。这将在给定数量的 超级步骤 后抛出 GraphRecursionError。然后我们可以捕获并处理此异常:
from langgraph.errors import GraphRecursionError

try:
    graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error
与其抛出 GraphRecursionError,我们可以向状态引入一个新键来跟踪到达递归限制所需的剩余步骤数。然后我们可以使用此键来确定是否应该结束运行。LangGraph 实现了特殊的 RemainingSteps 注解。在底层,它创建一个 ManagedValue 通道——一个在我们图运行期间存在且不再存在的状态通道。
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.managed.is_last_step import RemainingSteps

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    remaining_steps: RemainingSteps

def a(state: State):
    print(f'节点 A 看到 {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'节点 B 看到 {state["aggregate"]}')
    return {"aggregate": ["B"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)

# 定义边
def route(state: State) -> Literal["b", END]:
    if state["remaining_steps"] <= 2:
        return END
    else:
        return "b"

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "a")
graph = builder.compile()

# 测试一下
result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
print(result)
Node A sees []
Node B sees ['A']
Node A sees ['A', 'B']
{'aggregate': ['A', 'B', 'A']}
为了更好地理解递归限制如何工作,让我们考虑一个更复杂的示例。下面我们实现一个循环,但一步扇出到两个节点:
import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    aggregate: Annotated[list, operator.add]

def a(state: State):
    print(f'节点 A 看到 {state["aggregate"]}')
    return {"aggregate": ["A"]}

def b(state: State):
    print(f'节点 B 看到 {state["aggregate"]}')
    return {"aggregate": ["B"]}

def c(state: State):
    print(f'节点 C 看到 {state["aggregate"]}')
    return {"aggregate": ["C"]}

def d(state: State):
    print(f'节点 D 看到 {state["aggregate"]}')
    return {"aggregate": ["D"]}

# 定义节点
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

# 定义边
def route(state: State) -> Literal["b", END]:
    if len(state["aggregate"]) < 7:
        return "b"
    else:
        return END

builder.add_edge(START, "a")
builder.add_conditional_edges("a", route)
builder.add_edge("b", "c")
builder.add_edge("b", "d")
builder.add_edge(["c", "d"], "a")
graph = builder.compile()
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))
带有分支的复杂循环图此图看起来很复杂,但可以概念化为 超级步骤 的循环:
  1. 节点 A
  2. 节点 B
  3. 节点 C 和 D
  4. 节点 A
我们有一个四个超级步骤的循环,其中节点 C 和 D 并发执行。像以前一样调用图时,我们看到我们在达到终止条件之前完成了两个完整的“圈”:
result = graph.invoke({"aggregate": []})
Node A sees []
Node B sees ['A']
Node D sees ['A', 'B']
Node C sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Node B sees ['A', 'B', 'C', 'D', 'A']
Node D sees ['A', 'B', 'C', 'D', 'A', 'B']
Node C sees ['A', 'B', 'C', 'D', 'A', 'B']
Node A sees ['A', 'B', 'C', 'D', 'A', 'B', 'C', 'D']
但是,如果我们将递归限制设置为四,我们只完成一圈,因为每圈是四个超级步骤:
from langgraph.errors import GraphRecursionError

try:
    result = graph.invoke({"aggregate": []}, {"recursion_limit": 4})
except GraphRecursionError:
    print("Recursion Error")
Node A sees []
Node B sees ['A']
Node C sees ['A', 'B']
Node D sees ['A', 'B']
Node A sees ['A', 'B', 'C', 'D']
Recursion Error

异步

使用异步编程范式可以在并发运行 IO 绑定 代码时产生显著的性能提升(例如,并发 API 请求到聊天模型提供商)。 要将图的 sync 实现转换为 async 实现,您需要:
  1. 更新 nodes 使用 async def 而不是 def
  2. 更新代码内部以适当使用 await
  3. 根据需要调用图使用 .ainvoke.astream
由于许多 LangChain 对象实现了 Runnable Protocol,它具有所有 sync 方法的 async 变体,因此将 sync 图升级为 async 图通常很快。 见下方示例。为了演示底层 LLM 的异步调用,我们将包含一个聊天模型:
👉 Read the OpenAI chat model integration docs
pip install -U "langchain[openai]"
import os
from langchain.chat_models import init_chat_model

os.environ["OPENAI_API_KEY"] = "sk-..."

model = init_chat_model("gpt-5.2")
from langchain.chat_models import init_chat_model
from langgraph.graph import MessagesState, StateGraph

async def node(state: MessagesState):
    new_message = await llm.ainvoke(state["messages"])
    return {"messages": [new_message]}

builder = StateGraph(MessagesState).add_node(node).set_entry_point("node")
graph = builder.compile()

input_message = {"role": "user", "content": "Hello"}
result = await graph.ainvoke({"messages": [input_message]})
异步流式传输 有关异步流式传输的示例,请参阅 流式传输指南

使用 Command 结合控制流和状态更新

结合控制流(边)和状态更新(节点)可能很有用。例如,您可能希望 同时 执行状态更新 决定下一个节点去哪里。LangGraph 提供了一种方法,即从节点函数返回 Command 对象:
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # 状态更新
        update={"foo": "bar"},
        # 控制流
        goto="my_other_node"
    )
我们在下面展示了一个端到端示例。让我们创建一个包含 3 个节点:A、B 和 C 的简单图。我们将首先执行节点 A,然后根据节点 A 的输出决定下一步是去节点 B 还是节点 C。
import random
from typing_extensions import TypedDict, Literal
from langgraph.graph import StateGraph, START
from langgraph.types import Command

# 定义图状态
class State(TypedDict):
    foo: str

# 定义节点

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]:
    print("Called A")
    value = random.choice(["b", "c"])
    # 这是条件边函数的替代
    if value == "b":
        goto = "node_b"
    else:
        goto = "node_c"

    # 注意 Command 允许您 BOTH 更新图状态 AND 路由到下一个节点
    return Command(
        # 这是状态更新
        update={"foo": value},
        # 这是边的替代
        goto=goto,
    )

def node_b(state: State):
    print("Called B")
    return {"foo": state["foo"] + "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": state["foo"] + "c"}
我们现在可以使用上述节点创建 StateGraph。请注意,图没有 条件边 用于路由!这是因为控制流是在 node_a 中使用 Command 定义的。
builder = StateGraph(State)
builder.add_edge(START, "node_a")
builder.add_node(node_a)
builder.add_node(node_b)
builder.add_node(node_c)
# 注意:节点 A、B 和 C 之间没有边!

graph = builder.compile()
您可能注意到我们使用了 Command 作为返回类型注解,例如 Command[Literal["node_b", "node_c"]]。这对于图渲染是必要的,并告诉 LangGraph node_a 可以导航到 node_bnode_c
from IPython.display import display, Image

display(Image(graph.get_graph().draw_mermaid_png()))
基于 Command 的图导航 如果我们多次运行图,我们会看到它根据节点 A 中的随机选择采取不同的路径(A -> B 或 A -> C)。
graph.invoke({"foo": ""})
Called A
Called C

导航到父图中的节点

如果您正在使用 子图,您可能希望从子图中的节点导航到不同的子图(即父图中的不同节点)。为此,您可以在 Command 中指定 graph=Command.PARENT
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # 其中 `other_subgraph` 是父图中的节点
        graph=Command.PARENT
    )
让我们使用上面的示例演示此操作。我们将这样做,通过将上面的示例中的 nodeA 更改为我们将其作为子图添加到父图中的单节点图。
使用 Command.PARENT 的状态更新 当您从子图节点向父图节点发送更新时,对于父图和子图 状态模式 共享的键,您 必须 为父图状态中您要更新的键定义 归约器。见下方示例。
import operator
from typing_extensions import Annotated

class State(TypedDict):
    # 注意:我们在这里定义归约器
    foo: Annotated[str, operator.add]

def node_a(state: State):
    print("Called A")
    value = random.choice(["a", "b"])
    # 这是条件边函数的替代
    if value == "a":
        goto = "node_b"
    else:
        goto = "node_c"

    # 注意 Command 允许您 BOTH 更新图状态 AND 路由到下一个节点
    return Command(
        update={"foo": value},
        goto=goto,
        # 这告诉 LangGraph 导航到父图中的 node_b 或 node_c
        # 注意:这将导航到相对于子图最近的父图
        graph=Command.PARENT,
    )

subgraph = StateGraph(State).add_node(node_a).add_edge(START, "node_a").compile()

def node_b(state: State):
    print("Called B")
    # 注意:由于我们定义了归约器,我们不需要手动附加
    # 新字符到现有的 'foo' 值。相反,归约器将自动附加这些
    # (通过 operator.add)
    return {"foo": "b"}

def node_c(state: State):
    print("Called C")
    return {"foo": "c"}

builder = StateGraph(State)
builder.add_edge(START, "subgraph")
builder.add_node("subgraph", subgraph)
builder.add_node(node_b)
builder.add_node(node_c)

graph = builder.compile()
graph.invoke({"foo": ""})
Called A
Called C

在工具中使用

一个常见的用例是从工具内部更新图状态。例如,在客户支持应用中,您可能希望在对话开始时根据账户号或 ID 查找客户信息。要从工具更新图状态,您可以从工具返回 Command(update={"my_custom_key": "foo", "messages": [...]})
@tool
def lookup_user_info(tool_call_id: Annotated[str, InjectedToolCallId], config: RunnableConfig):
    """使用此来查找用户信息以更好地协助他们的问题。"""
    user_info = get_user_info(config.get("configurable", {}).get("user_id"))
    return Command(
        update={
            # 更新状态键
            "user_info": user_info,
            # 更新消息历史
            "messages": [ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id)]
        }
    )
当您从工具返回 Command 时,您 必须Command.update 中包含 messages(或用于消息历史的任何状态键),并且 messages 中的消息列表 必须 包含 ToolMessage。这对于生成的消息历史有效是必要的(LLM 提供商要求带有工具调用的 AI 消息后跟工具结果消息)。
如果您正在使用通过 Command 更新状态的工具,我们建议使用预构建的 ToolNode,它自动处理工具返回 Command 对象并将它们传播到图状态。如果您正在编写调用工具的自定义节点,则需要手动传播工具返回的 Command 对象作为节点的更新。

可视化您的图

在此我们演示如何可视化您创建的图。 您可以可视化任何任意 Graph,包括 StateGraph 让我们画分形图吧 :)。
import random
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages]

class MyNode:
    def __init__(self, name: str):
        self.name = name
    def __call__(self, state: State):
        return {"messages": [("assistant", f"Called node {self.name}")]}

def route(state) -> Literal["entry_node", END]:
    if len(state["messages"]) > 10:
        return END
    return "entry_node"

def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
        return
    # 此级别要创建的节点数
    num_nodes = random.randint(1, 3)  # 根据需要调整随机性
    for i in range(num_nodes):
        nm = ["A", "B", "C"][i]
        node_name = f"node_{current_node}_{nm}"
        builder.add_node(node_name, MyNode(node_name))
        builder.add_edge(current_node, node_name)
        # 递归添加更多节点
        r = random.random()
        if r > 0.2 and level + 1 < max_level:
            add_fractal_nodes(builder, node_name, level + 1, max_level)
        elif r > 0.05:
            builder.add_conditional_edges(node_name, route, node_name)
        else:
            # 结束
            builder.add_edge(node_name, END)

def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # 可选:如果需要设置结束点
    builder.add_edge(entry_point, END)  # 或任何特定节点
    return builder.compile()

app = build_fractal_graph(3)

Mermaid

我们还可以将图类转换为 Mermaid 语法。
print(app.get_graph().draw_mermaid())
%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
    tart__([<p>__start__</p>]):::first
    ry_node(entry_node)
    e_entry_node_A(node_entry_node_A)
    e_entry_node_B(node_entry_node_B)
    e_node_entry_node_B_A(node_node_entry_node_B_A)
    e_node_entry_node_B_B(node_node_entry_node_B_B)
    e_node_entry_node_B_C(node_node_entry_node_B_C)
    nd__([<p>__end__</p>]):::last
    tart__ --> entry_node;
    ry_node --> __end__;
    ry_node --> node_entry_node_A;
    ry_node --> node_entry_node_B;
    e_entry_node_B --> node_node_entry_node_B_A;
    e_entry_node_B --> node_node_entry_node_B_B;
    e_entry_node_B --> node_node_entry_node_B_C;
    e_entry_node_A -.-> entry_node;
    e_entry_node_A -.-> __end__;
    e_node_entry_node_B_A -.-> entry_node;
    e_node_entry_node_B_A -.-> __end__;
    e_node_entry_node_B_B -.-> entry_node;
    e_node_entry_node_B_B -.-> __end__;
    e_node_entry_node_B_C -.-> entry_node;
    e_node_entry_node_B_C -.-> __end__;
    ssDef default fill:#f2f0ff,line-height:1.2
    ssDef first fill-opacity:0
    ssDef last fill:#bfb6fc

PNG

如果更喜欢,我们可以将图渲染为 .png。这里有三种选择:
  • 使用 Mermaid.ink API(不需要额外的包)
  • 使用 Mermaid + Pyppeteer(需要 pip install pyppeteer
  • 使用 graphviz(需要 pip install graphviz
使用 Mermaid.Ink 默认情况下,draw_mermaid_png() 使用 Mermaid.Ink 的 API 生成图表。
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(Image(app.get_graph().draw_mermaid_png()))
分形图可视化 使用 Mermaid + Pyppeteer
import nest_asyncio

nest_asyncio.apply()  # Jupyter Notebook 运行异步函数所需

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)
使用 Graphviz
try:
    display(Image(app.get_graph().draw_png()))
except ImportError:
    print(
        "您可能需要安装 pygraphviz 的依赖项,更多信息请见 https://github.com/pygraphviz/pygraphviz/blob/main/INSTALL.txt"
    )