Skip to main content
查询 运行记录(run)(即 LangSmith 追踪中的 span 数据)的推荐方式是使用 SDK 中的 list_runs 方法,或 API 中的 /runs/query 端点。LangSmith 将追踪记录存储为 Run(span)数据格式 中指定的简单格式。 本页内容包括:
如果你需要导出大量追踪记录,我们建议使用批量数据导出功能,因为它能更好地处理大数据量,并支持自动重试和跨分区并行处理。

使用过滤参数

对于简单查询,你可以直接使用过滤参数参考中指定的过滤参数,而无需依赖我们的查询语法。
前置条件在运行以下代码片段之前,请先初始化客户端。
from langsmith import Client

client = Client()
以下是一些使用关键字参数列出运行记录的示例:

列出项目中的所有运行记录

project_runs = client.list_runs(project_name="<your_project>")

列出过去 24 小时内的 LLM 和聊天运行记录

todays_llm_runs = client.list_runs(
    project_name="<your_project>",
    start_time=datetime.now() - timedelta(days=1),
    run_type="llm",
)

列出项目中的根运行记录

根运行记录是没有父级的运行记录。它们的 is_root 值为 True。你可以使用此字段过滤根运行记录。
root_runs = client.list_runs(
    project_name="<your_project>",
    is_root=True
)

列出无错误的运行记录

correct_runs = client.list_runs(project_name="<your_project>", error=False)

按运行 ID 列出运行记录

忽略其他参数如果你按上述方式提供运行 ID 列表,则会忽略所有其他过滤参数(如 project_namerun_type 等),并直接返回与给定 ID 匹配的运行记录。
如果你有一个运行 ID 列表,可以直接列出它们:
run_ids = ['a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836','9398e6be-964f-4aa4-8ae9-ad78cd4b7074']
selected_runs = client.list_runs(id=run_ids)

按 ID 获取单条运行记录

要通过 ID 获取单条运行记录(trace),请使用 read_run 方法。当你有一个特定的追踪 ID(例如,来自 LangSmith 分享链接,如 https://smith.langchain.com/public/<trace-id>/r)并希望检索其完整数据时,此方法非常有用。
run_id = "a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836"
run = client.read_run(run_id)

# 访问运行数据
print(run.inputs)
print(run.outputs)
print(run.name)
使用 LangGraph 在本地重放追踪记录如果你在使用带检查点(checkpointing)功能的 LangGraph,可以从 LangSmith 获取追踪记录并在本地重放以进行调试。有关从检查点恢复执行的详细信息,请参阅 LangGraph 的时间旅行与重放文档

使用过滤查询语言

对于更复杂的查询,你可以使用过滤查询语言参考中描述的查询语言。

列出对话线程中的所有根运行记录

这是获取对话线程中运行记录的方法。有关设置线程的更多信息,请参阅我们的如何设置线程操作指南。 线程通过设置共享线程 ID 进行分组。LangSmith UI 允许你使用以下三个元数据键中的任意一个:session_idconversation_idthread_id。会话 ID 也称为追踪项目 ID。以下查询将匹配其中任意一个。
group_key = "<your_thread_id>"
filter_string = f'and(in(metadata_key, ["session_id","conversation_id","thread_id"]), eq(metadata_value, "{group_key}"))'
thread_runs = client.list_runs(
    project_name="<your_project>",
    filter=filter_string,
    is_root=True
)

列出所有名为 “extractor” 且其追踪根被分配了 “user_score” 反馈分数为 1 的运行记录

client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "extractor")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))'
)

列出带有 “star_rating” 键且分数大于 4 的运行记录

client.list_runs(
    project_name="<your_project>",
    filter='and(eq(feedback_key, "star_rating"), gt(feedback_score, 4))'
)

列出执行时间超过 5 秒的运行记录

client.list_runs(project_name="<your_project>", filter='gt(latency, "5s")')

列出所有 “error” 不为 null 的运行记录

client.list_runs(project_name="<your_project>", filter='neq(error, null)')

列出所有 start_time 大于特定时间戳的运行记录

client.list_runs(project_name="<your_project>", filter='gt(start_time, "2023-07-15T12:34:56Z")')

列出所有包含子串 “substring” 的运行记录

client.list_runs(project_name="<your_project>", filter='search("substring")')

列出所有带有 git 哈希 “2aa1cf4” 标签的运行记录

client.list_runs(project_name="<your_project>", filter='has(tags, "2aa1cf4")')

列出所有在特定时间戳之后开始,并且要么 “error” 不为 null,要么 “Correctness” 反馈分数等于 0 的运行记录

client.list_runs(
  project_name="<your_project>",
  filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))'
)

复杂查询:列出所有标签包含 “experimental” 或 “beta” 且延迟大于 2 秒的运行记录

client.list_runs(
  project_name="<your_project>",
  filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))'
)

按全文搜索追踪树

你可以使用不带特定字段的 search() 函数,对运行记录中的所有字符串字段进行全文搜索。这让你能够快速找到与搜索词匹配的追踪记录。
client.list_runs(
  project_name="<your_project>",
  filter='search("image classification")'
)

检查元数据是否存在

如果你想检查元数据是否存在,可以使用 eq 操作符,并可选择性地结合 and 语句来按值匹配。如果你想记录有关运行记录的更结构化信息,这将非常有用。
to_search = {
    "user_id": ""
}

# 检查任何带有 "user_id" 元数据键的运行记录
client.list_runs(
  project_name="default",
  filter="eq(metadata_key, 'user_id')"
)
# 检查 user_id=4070f233-f61e-44eb-bff1-da3c163895a3 的运行记录
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'user_id'), eq(metadata_value, '4070f233-f61e-44eb-bff1-da3c163895a3'))"
)

检查元数据中的环境详情

一种常见的模式是通过元数据将环境信息添加到追踪记录中。如果你想过滤包含环境元数据的运行记录,可以使用与上面相同的模式:
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'environment'), eq(metadata_value, 'production'))"
)

检查元数据中的对话 ID

另一种关联同一对话中追踪记录的常见方式是使用共享的对话 ID。如果你想以此方式基于对话 ID 过滤运行记录,可以在元数据中搜索该 ID。
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

键值对上的反向过滤

你可以对元数据、输入和输出键值对使用反向过滤,以从结果中排除特定的运行记录。以下是一些针对元数据键值对的示例,但同样的逻辑也适用于输入和输出键值对。
# 查找所有元数据中不包含 "conversation_id" 键的运行记录
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'))"
)

# 查找所有元数据中 conversation_id 不是 "a1b2c3d4-e5f6-7890" 的运行记录
client.list_runs(
  project_name="default",
  filter="and(eq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找所有没有 "conversation_id" 元数据键且不存在 "a1b2c3d4-e5f6-7890" 值的运行记录
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), neq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

# 查找所有没有 "conversation_id" 元数据键但存在 "a1b2c3d4-e5f6-7890" 值的运行记录
client.list_runs(
  project_name="default",
  filter="and(neq(metadata_key, 'conversation_id'), eq(metadata_value, 'a1b2c3d4-e5f6-7890'))"
)

组合多个过滤器

如果你想组合多个条件来细化搜索,可以使用 and 操作符以及其他过滤函数。以下是如何搜索名为 “ChatOpenAI” 且其元数据中包含特定 conversation_id 的运行记录:
client.list_runs(
  project_name="default",
  filter="and(eq(name, 'ChatOpenAI'), eq(metadata_key, 'conversation_id'), eq(metadata_value, '69b12c91-b1e2-46ce-91de-794c077e8151'))"
)

树过滤器

列出所有名为 “RetrieveDocs” 的运行记录,要求其根运行具有 “user_score” 反馈为 1,并且整个追踪中的任何运行记录名为 “ExpandQuery”。 当你想根据追踪中达到的各种状态或步骤来提取特定运行记录时,这种类型的查询非常有用。
client.list_runs(
    project_name="<your_project>",
    filter='eq(name, "RetrieveDocs")',
    trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
    tree_filter='eq(name, "ExpandQuery")'
)

高级:导出带子工具使用情况的扁平化追踪视图

以下 Python 示例演示了如何导出追踪记录的扁平化视图,包括每个追踪中代理使用的工具(来自嵌套运行记录)的信息。 这可用于分析代理在多个追踪中的行为。 此示例查询指定天数内的所有工具运行记录,并按父级(根)运行 ID 对它们进行分组。然后,它获取每个根运行记录的相关信息,如运行名称、输入、输出,并将这些信息与子运行记录的信息合并。 为了优化查询,该示例:
  1. 在查询工具运行记录时只选择必要字段,以减少查询时间。
  2. 批量获取根运行记录,同时并发处理工具运行记录。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "my-project"
num_days = 30

# 列出所有工具运行记录
tool_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="tool",
    # 我们不需要获取输入、输出以及其他可能增加查询时间的值
    select=["trace_id", "name", "run_type"],
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

tool_runs_by_parent = defaultdict(lambda: defaultdict(set))
# 不要超过速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 分组工具运行记录
    for run in tqdm(tool_runs):
        # 收集给定追踪中调用的所有工具
        tool_runs_by_parent[run.trace_id]["tools_involved"].add(run.name)
        # 可能将一批父运行 ID 发送到服务器
        # 这让我们可以分批查询根运行记录
        # 同时仍然处理工具运行记录
        if len(tool_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(tool_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=["name", "inputs", "outputs", "run_type"],
                    )
                )
    if this_batch := list(tool_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = tool_runs_by_parent[root_run.id]
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                "run_type": root_run.run_type,
                "inputs": root_run.inputs,
                "outputs": root_run.outputs,
                "tools_involved": list(root_data["tools_involved"]),
            }
        )

# (可选):转换为 pandas DataFrame
import pandas as pd

df = pd.DataFrame(data)
df.head()

高级:导出带反馈的追踪中的检索器输入输出

当你希望基于检索器行为来微调嵌入或诊断端到端系统性能问题时,此查询非常有用。 以下 Python 示例演示了如何导出具有特定反馈分数的追踪中的检索器输入和输出。
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta

import pandas as pd
from langsmith import Client
from tqdm.auto import tqdm

client = Client()
project_name = "your-project-name"
num_days = 1

# 列出所有工具运行记录
retriever_runs = client.list_runs(
    project_name=project_name,
    start_time=datetime.now() - timedelta(days=num_days),
    run_type="retriever",
    # 这次我们确实想要获取输入和输出,因为它们可能被查询扩展步骤调整
    select=["trace_id", "name", "run_type", "inputs", "outputs"],
    trace_filter='eq(feedback_key, "user_score")',
)

data = []
futures: list[Future] = []
trace_cursor = 0
trace_batch_size = 50

retriever_runs_by_parent = defaultdict(lambda: defaultdict(list))
# 不要超过速率限制
with ThreadPoolExecutor(max_workers=2) as executor:
    # 按父运行 ID 分组检索器运行记录
    for run in tqdm(retriever_runs):
        # 收集给定追踪中调用的所有检索器调用
        for k, v in run.inputs.items():
            retriever_runs_by_parent[run.trace_id][f"retriever.inputs.{k}"].append(v)
        for k, v in (run.outputs or {}).items():
            # 扩展文档
            retriever_runs_by_parent[run.trace_id][f"retriever.outputs.{k}"].extend(v)
        # 可能将一批父运行 ID 发送到服务器
        # 这让我们可以分批查询根运行记录
        # 同时仍然处理检索器运行记录
        if len(retriever_runs_by_parent) % trace_batch_size == 0:
            if this_batch := list(retriever_runs_by_parent.keys())[
                trace_cursor : trace_cursor + trace_batch_size
            ]:
                trace_cursor += trace_batch_size
                futures.append(
                    executor.submit(
                        client.list_runs,
                        project_name=project_name,
                        run_ids=this_batch,
                        select=[
                            "name",
                            "inputs",
                            "outputs",
                            "run_type",
                            "feedback_stats",
                        ],
                    )
                )
    if this_batch := list(retriever_runs_by_parent.keys())[trace_cursor:]:
        futures.append(
            executor.submit(
                client.list_runs,
                project_name=project_name,
                run_ids=this_batch,
                select=["name", "inputs", "outputs", "run_type"],
            )
        )

for future in tqdm(futures):
    root_runs = future.result()
    for root_run in root_runs:
        root_data = retriever_runs_by_parent[root_run.id]
        feedback = {
            f"feedback.{k}": v.get("avg")
            for k, v in (root_run.feedback_stats or {}).items()
        }
        inputs = {f"inputs.{k}": v for k, v in root_run.inputs.items()}
        outputs = {f"outputs.{k}": v for k, v in (root_run.outputs or {}).items()}
        data.append(
            {
                "run_id": root_run.id,
                "run_name": root_run.name,
                **inputs,
                **outputs,
                **feedback,
                **root_data,
            }
        )

# (可选):转换为 pandas DataFrame
import pandas as pd
df = pd.DataFrame(data)
df.head()

速率限制

POST /runs/query 端点(Python 中的 list_runs,JavaScript 中的 listRuns)具有基于查询参数而变化的每个租户速率限制:
查询类型限制窗口
短时间窗口(≤ 7 天)10 个请求10 秒
长时间窗口(> 7 天)3 个请求10 秒
全文搜索,短时间窗口(≤ 7 天)3 个请求10 秒
全文搜索,长时间窗口(> 7 天)1 个请求10 秒
选择 child_run_ids,短时间窗口(≤ 7 天)3 个请求10 秒
选择 child_run_ids,长时间窗口(> 7 天)1 个请求10 秒
时间窗口由 end_time - start_time 决定。如果未提供 end_time,LangSmith 将使用当前时间。没有 start_time 的查询被视为长时间窗口查询。###最佳实践 为了避免触发速率限制并减少查询时间,特别是对于输入/输出较大的运行记录:
  • 设置 start_time:省略该参数会触发大时间窗口的速率限制层级(每10秒3次请求,而非10次)。请尽可能使用7天或更短的时间窗口。
  • 使用 select:默认情况下会返回所有字段。仅指定你需要的字段(例如 select=["inputs", "outputs"])可大幅减小响应体积和查询时间,尤其适用于输入/输出较大的运行记录。
  • 设置 limit:如果不需要对所有结果进行分页,请限制返回结果的数量。
  • 避免全文搜索filter='search("...")' 具有最严格的速率限制;请尽可能使用结构化过滤器(例如 eq()has())。
  • 避免选择 child_run_ids:该操作也会触发更严格的速率限制层级。
当超过这些限制时,API 会返回 429 Too Many Requests 响应。有关一般速率限制的信息,请参阅管理概览