Skip to main content
本指南介绍如何通过 LangSmith 部署 API 取消代理的运行。您可以通过 ID 取消单个运行,或按线程或状态取消多个运行。取消功能适用于停止长时间运行或卡住的运行,或在用户放弃请求时使用。

设置

创建客户端和线程:
from langgraph_sdk import get_client

client = get_client(url=<DEPLOYMENT_URL>)
assistant_id = "agent"
thread = await client.threads.create()

取消单个运行

以下示例创建运行,使用不同选项取消它,并打印运行状态以展示每种情况下的结果。您可以取消处于 pendingrunning 状态的运行。尝试取消不处于 pendingrunning 状态的运行将导致错误。

使用中断取消(默认)

interrupt 会停止执行该运行的工作线程,并将运行标记为 interrupted。不会删除任何内容:
  • 运行记录保留(状态为 interrupted)。您可以获取它、检查输入/输出并查看执行历史。
  • 该运行的所有检查点保持存储。最后一个完成步骤的线程状态得以保留。
  • 您之后可以从检查点恢复(例如使用时间旅行)或检查部分状态。
当您想停止运行但保留它以进行调试、审计或从检查点恢复时,请使用 interrupt
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.cancel(thread["thread_id"], run["run_id"])

run_after = await client.runs.get(thread["thread_id"], run["run_id"], wait=True)
print(run_after["status"])   # "interrupted"

使用回滚取消

rollback 会停止运行,然后将其及其检查点从存储中删除:
  • 运行记录被删除。该运行不再出现在该线程的运行列表或历史中。
  • 该运行创建的所有检查点被删除。线程状态恢复到运行开始之前的状态(就像该运行从未执行过一样)。
  • 回滚后无法恢复或检查该运行。
当您想完全丢弃运行及其影响时(例如,在用户放弃请求且您不需要保留部分工作时),请使用 rollback
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.cancel(thread["thread_id"], run["run_id"], action="rollback", wait=True)

# 抛出错误,因为运行已被删除
try:
    await client.runs.get(thread["thread_id"], run["run_id"])
except Exception:
    print("Run was correctly deleted")

使用等待取消

默认情况下,取消请求在取消操作被请求后立即返回,运行会异步取消。wait=True 使取消请求阻塞,直到运行被完全取消。这在您想知道运行取消后的最终状态时(例如,创建了哪些检查点、最终输出是什么)很有用。
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
# 异步取消运行
await client.runs.cancel(thread["thread_id"], run["run_id"])
# 获取运行状态
run_after = await client.runs.get(thread["thread_id"], run["run_id"])
print(run_after["status"])  # "pending" 或 "running"

# 等待运行被正确取消
await client.runs.join(thread["thread_id"], run["run_id"])
run_after = await client.runs.get(thread["thread_id"], run["run_id"])
print(run_after["status"])  # "interrupted"

取消多个运行

使用批量取消端点在一个请求中取消多个运行。支持中断和回滚两种操作。

按线程 ID 和运行 ID 取消

通过传递运行 ID 来取消特定运行。
run1 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "First request"}]},
)
run2 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Second request"}]},
    multitask_strategy="enqueue",
)

await client.runs.cancel_many(
    thread_id=thread["thread_id"],
    run_ids=[run1["run_id"], run2["run_id"]]
)

# 等待运行被取消
await client.runs.join(thread["thread_id"], run2["run_id"])
runs_after = await client.runs.list(thread["thread_id"])
for run in runs_after:
    if run["run_id"] in (run1["run_id"], run2["run_id"]):
        print(run["run_id"], run["status"])  # "interrupted"

按状态取消

取消部署中所有线程内匹配特定状态的所有运行。有效的状态选项为 pendingrunningall
run1 = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "First request"}]},
)
thread2 = await client.threads.create()
run2 = await client.runs.create(
    thread2["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Second request"}]},
)

await client.runs.cancel_many(
    status="running",
)

# 等待运行被取消
await client.runs.join(thread2["thread_id"], run2["run_id"])
run_after = await client.runs.get(thread["thread_id"], run1["run_id"])
print(run_after["status"])  # 正在运行的运行现在变为 "interrupted"
run_after2 = await client.runs.get(thread2["thread_id"], run2["run_id"])
print(run_after2["status"])  # 所有线程中的运行都被取消

断开连接时取消

当使用流式传输启动运行或等待运行时,您可以设置 on_disconnect="cancel",以便在客户端断开连接时取消运行。这可以避免在用户关闭应用或失去连接时留下正在进行的运行。
# 使用 runs.wait:如果客户端断开连接,运行将被取消
result = await client.runs.wait(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
    on_disconnect="cancel",
)

# 使用 runs.stream:如果客户端断开连接,运行将被取消
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
    on_disconnect="cancel",
):
    print(chunk)

# 使用 runs.join:等待现有运行;如果客户端断开连接则取消
run = await client.runs.create(
    thread["thread_id"],
    assistant_id,
    input={"messages": [{"role": "user", "content": "Long task"}]},
)
await client.runs.join(
    thread["thread_id"],
    run["run_id"],
    on_disconnect="cancel",
)

# 使用 runs.join_stream:加入现有运行并流式传输;如果客户端断开连接则取消
async for chunk in client.runs.join_stream(
    thread["thread_id"],
    run["run_id"],
    on_disconnect="cancel",
):
    print(chunk)

常见场景

  • 人机交互与中断:代理可以在中断点暂停以等待人工输入。取消运行会停止执行;这与中断不同,中断时运行会暂停并可以通过新输入恢复。
  • 时间旅行:使用 interrupt 操作取消后,运行和检查点仍然可用。您可以从检查点恢复(时间旅行)以重放或分支执行。
  • 重复发送消息:当用户在运行进行中发送新输入时,多任务策略(enqueue、reject、interrupt、rollback)决定现有运行是被中断还是回滚,以及如何处理新运行。要从您的应用程序显式取消运行,请使用本页描述的取消 API。
  • Studio:在 Studio 中,使用运行 UI 中的 Cancel 按钮取消当前运行。