POST /runs 和 PATCH /runs 端点进行基本追踪,以及使用 POST /runs/multipart 进行批量摄取以实现更高吞吐量。有关完整的端点列表和请求/响应模式,请参阅 API 参考。
我们强烈建议使用 Python 或 TypeScript SDK 将追踪数据发送到 LangSmith,而不是直接使用 REST API。SDK 包含批处理和后台发送优化,可防止追踪影响应用程序的性能。
我们建议使用 UUID v7 作为运行 ID。UUIDv7 嵌入了时间戳,可以保留追踪中运行的正确时间顺序。使用 LangSmith SDK 中的
uuid7() 来生成它们,或参阅 指定自定义运行 ID 了解更多详情。基本追踪
记录运行的最简单方法是通过POST /runs 和 PATCH /runs 端点。这种方法只需最少的信息即可建立追踪层次结构。
使用 LangSmith REST API 时,请在请求头中以
"x-api-key" 提供您的 API 密钥。如果您的 API 密钥链接到多个工作区,请在请求头中使用 "x-tenant-id" 指定工作区。在这种方法中,您无需设置 dotted_order 或 trace_id 字段——系统会自动生成它们。虽然更简单,但它的速度较慢,并且比批量摄取有更低的速率限制。import openai
import os
import requests
from datetime import datetime, timezone
from langsmith import uuid7
# 在请求头中发送您的 API 密钥
headers = {
"x-api-key": os.environ["LANGSMITH_API_KEY"],
"x-tenant-id": os.environ["LANGSMITH_WORKSPACE_ID"]
}
def post_run(run_id, name, run_type, inputs, parent_id=None):
"""向 API 发布新运行的函数。"""
data = {
"id": run_id.hex,
"name": name,
"run_type": run_type,
"inputs": inputs,
"start_time": datetime.utcnow().isoformat(),
# "session_name": "project-name", # 要追踪到的项目名称
# "session_id": "project-id", # 要追踪到的项目 ID。指定 session_name 或 session_id 之一
}
if parent_id:
data["parent_run_id"] = parent_id.hex
requests.post(
"https://api.smith.langchain.com/runs", # 针对自托管安装或欧盟区域,请相应更新
json=data,
headers=headers
)
def patch_run(run_id, outputs):
"""用输出修补运行的函数。"""
requests.patch(
f"https://api.smith.langchain.com/runs/{run_id}",
json={
"outputs": outputs,
"end_time": datetime.now(timezone.utc).isoformat(),
},
headers=headers,
)
# 这可以是您应用程序的用户输入
question = "你能总结一下今天早上的会议吗?"
# 这可以在检索步骤中获取
context = "在今天早上的会议中,我们解决了世界上所有的冲突。"
messages = [
{"role": "system", "content": "你是一个乐于助人的助手。请仅根据给定的上下文回应用户的请求。"},
{"role": "user", "content": f"问题:{question}\n上下文:{context}"}
]
# 创建父运行
parent_run_id = uuid7()
post_run(parent_run_id, "聊天管道", "chain", {"question": question})
# 创建子运行
child_run_id = uuid7()
post_run(child_run_id, "OpenAI 调用", "llm", {"messages": messages}, parent_run_id)
# 生成完成
client = openai.Client()
chat_completion = client.chat.completions.create(
model="gpt-4.1-mini",
messages=messages
)
# 结束运行
patch_run(child_run_id, chat_completion.dict())
patch_run(parent_run_id, {"answer": chat_completion.choices[0].message.content})
批量摄取
为了更快的摄取速度和更高的速率限制,请使用POST /runs/multipart 端点。这需要 requests-toolbelt 和 uuid-utils 包。
此端点要求您计算并设置 dotted_order 和 trace_id。dotted_order 是一个字符串,编码了每个运行的时间戳和 UUID,父条目和子条目之间用点连接(例如 20240101T000000Z<parent-uuid>.20240101T000001Z<child-uuid>)。这告诉 LangSmith 运行之间如何关联以及它们发生的顺序。trace_id 是追踪中根运行的 UUID。
以下示例创建一个父运行和一个子运行,在单个批量请求中发送它们,然后用它们的输出修补两者:
import json
import os
import uuid
from datetime import datetime, timezone
from typing import Dict, List
import requests
from requests_toolbelt import MultipartEncoder
from uuid_utils.compat import uuid7
def create_dotted_order(
start_time: datetime | None = None,
run_id: uuid.UUID | None = None
) -> str:
"""为运行排序和层次结构创建点序字符串。
点序用于建立运行之间的顺序和关系。
它结合了时间戳和唯一标识符,以确保正确的排序和追踪。
"""
st = start_time or datetime.now(timezone.utc)
id_ = run_id or uuid7()
return f"{st.strftime('%Y%m%dT%H%M%S%fZ')}{id_}"
def create_run_base(
name: str,
run_type: str,
inputs: dict,
start_time: datetime
) -> dict:
"""创建运行的基础结构。"""
run_id = uuid7()
return {
"id": str(run_id),
"trace_id": str(run_id),
"name": name,
"start_time": start_time.isoformat(),
"inputs": inputs,
"run_type": run_type,
}
def construct_run(
name: str,
run_type: str,
inputs: dict,
parent_dotted_order: str | None = None,
) -> dict:
"""使用给定参数构造运行字典。
此函数创建一个具有唯一 ID 和点序的运行,如果它是子运行,则建立其在追踪层次结构中的位置。
"""
start_time = datetime.now(timezone.utc)
run = create_run_base(name, run_type, inputs, start_time)
current_dotted_order = create_dotted_order(start_time, uuid.UUID(run["id"]))
if parent_dotted_order:
current_dotted_order = f"{parent_dotted_order}.{current_dotted_order}"
run["trace_id"] = parent_dotted_order.split(".")[0].split("Z")[1]
run["parent_run_id"] = parent_dotted_order.split(".")[-1].split("Z")[1]
run["dotted_order"] = current_dotted_order
return run
def serialize_run(operation: str, run_data: dict) -> List[tuple]:
"""为多部分请求序列化运行。
此函数将运行数据分离为多个部分,以便高效传输和存储。
主要的运行数据和可选字段(输入、输出、事件)被分别序列化。
"""
run_id = run_data.get("id", str(uuid7()))
# 分离可选字段
inputs = run_data.pop("inputs", None)
outputs = run_data.pop("outputs", None)
events = run_data.pop("events", None)
parts = []
# 序列化主要运行数据
run_data_json = json.dumps(run_data).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}",
(
None,
run_data_json,
"application/json",
{"Content-Length": str(len(run_data_json))},
),
)
)
# 序列化可选字段
for key, value in [("inputs", inputs), ("outputs", outputs), ("events", events)]:
if value:
serialized_value = json.dumps(value).encode("utf-8")
parts.append(
(
f"{operation}.{run_id}.{key}",
(
None,
serialized_value,
"application/json",
{"Content-Length": str(len(serialized_value))},
),
)
)
return parts
def batch_ingest_runs(
api_url: str,
api_key: str,
posts: list[dict] | None = None,
patches: list[dict] | None = None,
) -> None:
"""在单个批量请求中摄取多个运行。
此函数处理创建新运行(发布)和更新现有运行(修补)。
与单独的 API 调用相比,它在摄取多个运行时效率更高。
"""
boundary = uuid.uuid4().hex
all_parts = []
for operation, runs in zip(("post", "patch"), (posts, patches)):
if runs:
all_parts.extend(
[part for run in runs for part in serialize_run(operation, run)]
)
encoder = MultipartEncoder(fields=all_parts, boundary=boundary)
headers = {"Content-Type": encoder.content_type, "x-api-key": api_key}
try:
response = requests.post(
f"{api_url}/runs/multipart",
data=encoder,
headers=headers
)
response.raise_for_status()
print("成功摄取运行。")
except requests.RequestException as e:
print(f"摄取运行时出错:{e}")
# 在生产环境中,您可能希望记录此错误或更稳健地处理它
# 配置 API URL 和密钥
# 对于生产用途,请考虑使用配置文件或环境变量
api_url = "https://api.smith.langchain.com"
api_key = os.environ.get("LANGSMITH_API_KEY")
if not api_key:
raise ValueError("未设置 LANGSMITH_API_KEY 环境变量")
# 创建一个父运行
parent_run = construct_run(
name="父运行",
run_type="chain",
inputs={"main_question": "告诉我关于法国"},
)
# 创建一个子运行,链接到父运行
child_run = construct_run(
name="子运行",
run_type="llm",
inputs={"question": "法国的首都是什么?"},
parent_dotted_order=parent_run["dotted_order"],
)
# 首先,发布运行以创建它们
posts = [parent_run, child_run]
batch_ingest_runs(api_url, api_key, posts=posts)
# 然后,用结束时间和任何输出更新运行
child_run_update = {
**child_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"answer": "巴黎是法国的首都。"},
}
parent_run_update = {
**parent_run,
"end_time": datetime.now(timezone.utc).isoformat(),
"outputs": {"summary": "关于法国的讨论,包括其首都。"},
}
patches = [parent_run_update, child_run_update]
batch_ingest_runs(api_url, api_key, patches=patches)
# 注意:此示例需要 `requests` 和 `requests_toolbelt` 库。
# 您可以使用 pip 安装它们:
# pip install requests requests_toolbelt
Connect these docs to Claude, VSCode, and more via MCP for real-time answers.

