先决条件
- 一个 LangSmith 账户 和 API 密钥
- 正在运行的 Temporal 服务器(本地或云端)
- 适用于您所用语言的 OpenTelemetry SDK
环境变量
为所有实现设置以下环境变量:| 变量 | 必需 | 描述 |
|---|---|---|
LANGSMITH_API_KEY | 是 | 来自设置页面的 LangSmith API 密钥。 |
LANGSMITH_PROJECT | 否 | 项目名称(默认为 "default")。 |
对于欧盟区域或自托管的 LangSmith 安装,还需将
LANGCHAIN_BASE_URL 设置为您的 LangSmith 实例 URL。设置追踪
- Go
- Python
- TypeScript / JavaScript
Go 使用
langsmith-go SDK 和 Temporal 的 OpenTelemetry 拦截器来自动追踪工作流和活动。安装
安装 LangSmith Go SDK、Temporal SDK 和 OpenTelemetry 拦截器:
go get github.com/langchain-ai/langsmith-go@v0.1.0-alpha.7
go get go.temporal.io/sdk
go get go.temporal.io/sdk/contrib/opentelemetry
初始化追踪器
初始化 LangSmith 追踪器,创建 Temporal 的 OpenTelemetry 拦截器,并将其注册到 Temporal 客户端和工作线程:
package main
import (
"context"
"log"
"github.com/langchain-ai/langsmith-go"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/contrib/opentelemetry"
"go.temporal.io/sdk/interceptor"
"go.temporal.io/sdk/worker"
)
func main() {
ctx := context.Background()
// 初始化 LangSmith 追踪器(读取 LANGSMITH_API_KEY 和 LANGSMITH_PROJECT)
ls, err := langsmith.NewTracer(
langsmith.WithServiceName("temporal-worker"),
)
if err != nil {
log.Fatal("Failed to initialize LangSmith tracer:", err)
}
defer ls.Shutdown(ctx)
// 创建 Temporal 追踪拦截器
tracer := ls.Tracer("temporal-app")
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(
opentelemetry.TracerOptions{Tracer: tracer},
)
if err != nil {
log.Fatal("Failed to create tracing interceptor:", err)
}
// 创建带追踪的 Temporal 客户端
c, err := client.Dial(client.Options{
Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
})
if err != nil {
log.Fatal("Failed to create Temporal client:", err)
}
defer c.Close()
// 创建带追踪的工作线程(使用相同的客户端)
w := worker.New(c, "my-task-queue", worker.Options{})
w.RegisterWorkflow(MyWorkflow)
w.RegisterActivity(MyActivity)
// 启动工作线程
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatal("Worker failed:", err)
}
}
定义工作流和活动
定义一个执行活动的工作流。该活动演示了如何为 LangSmith 可见性添加自定义跨度属性:
package main
import (
"context"
"fmt"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)
// MyWorkflow 执行一个活动
func MyWorkflow(ctx workflow.Context, input string) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result)
return result, err
}
// MyActivity 处理输入并添加自定义跨度属性
func MyActivity(ctx context.Context, input string) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Processing", "input", input)
// 获取由 Temporal 拦截器创建的跨度
span := trace.SpanFromContext(ctx)
// 为 LangSmith 可见性添加 Gen AI 属性
span.SetAttributes(
attribute.String("gen_ai.prompt", input),
attribute.String("gen_ai.operation.name", "chat"),
)
result := fmt.Sprintf("Processed: %s", input)
// 设置完成属性
span.SetAttributes(
attribute.String("gen_ai.completion", result),
)
return result, nil
}
执行工作流
在一个单独的客户端应用程序中,初始化追踪器并执行工作流:
client.go
// 在单独的函数或客户端应用程序中
func executeWorkflow() {
ctx := context.Background()
// 为客户端初始化追踪器
ls, err := langsmith.NewTracer(
langsmith.WithServiceName("temporal-client"),
)
if err != nil {
log.Fatal(err)
}
defer ls.Shutdown(ctx)
// 创建带追踪的客户端
tracer := ls.Tracer("temporal-app")
tracingInterceptor, err := opentelemetry.NewTracingInterceptor(
opentelemetry.TracerOptions{Tracer: tracer},
)
if err != nil {
log.Fatal(err)
}
c, err := client.Dial(client.Options{
Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
// 执行工作流
workflowOptions := client.StartWorkflowOptions{
ID: "my-workflow-1",
TaskQueue: "my-task-queue",
}
we, err := c.ExecuteWorkflow(ctx, workflowOptions, MyWorkflow, "Hello World")
if err != nil {
log.Fatal(err)
}
var result string
if err := we.Get(ctx, &result); err != nil {
log.Fatal(err)
}
log.Printf("Workflow result: %s", result)
}
Python 使用
temporalio SDK 和 OpenTelemetry 拦截器,通过 OTLP 将追踪数据导出到 LangSmith。安装
安装 Temporal SDK、LangSmith SDK 和 OpenTelemetry 包:
pip install temporalio
pip install langsmith
pip install opentelemetry-sdk
pip install opentelemetry-exporter-otlp-proto-http
初始化追踪器
创建一个配置为向 LangSmith 发送追踪数据的 OTLP 导出器的 OpenTelemetry
TracerProvider:import asyncio
import os
from datetime import timedelta
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.worker import Worker
def init_tracer_provider() -> TracerProvider:
"""使用 LangSmith 导出器初始化 OpenTelemetry。"""
# 为 LangSmith 创建 OTLP 导出器
exporter = OTLPSpanExporter(
endpoint="https://api.smith.langchain.com/otel/v1/traces",
headers={
"x-api-key": os.environ.get("LANGSMITH_API_KEY", ""),
"Langsmith-Project": os.environ.get("LANGSMITH_PROJECT", "default"),
},
)
# 创建带有资源属性的 TracerProvider
resource = Resource.create({
SERVICE_NAME: "temporal-worker",
})
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(exporter))
# 设置为全局提供者
trace.set_tracer_provider(provider)
return provider
定义工作流和活动
定义一个工作流类和活动函数。该活动演示了如何为 LangSmith 可见性添加自定义跨度属性:
@activity.defn
async def process_activity(input: str) -> str:
"""处理输入并添加自定义跨度属性的活动。"""
activity.logger.info(f"Processing: {input}")
# 获取当前跨度并添加 Gen AI 属性
span = trace.get_current_span()
span.set_attribute("gen_ai.prompt", input)
span.set_attribute("gen_ai.operation.name", "chat")
result = f"Processed: {input}"
span.set_attribute("gen_ai.completion", result)
return result
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, input: str) -> str:
return await workflow.execute_activity(
process_activity,
input,
start_to_close_timeout=timedelta(seconds=10),
)
运行工作线程
创建一个带有
TracingInterceptor 的 Temporal 客户端并启动工作线程:async def main():
# 初始化追踪
provider = init_tracer_provider()
try:
# 创建带追踪拦截器的 Temporal 客户端
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()],
)
# 运行工作线程
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[process_activity],
)
print("Starting worker...")
await worker.run()
finally:
# 关闭追踪器提供者以刷新追踪数据
provider.shutdown()
if __name__ == "__main__":
asyncio.run(main())
执行工作流
在一个单独的脚本中,使用追踪拦截器连接到 Temporal 并执行工作流:
client.py
import asyncio
from temporalio.client import Client
from temporalio.contrib.opentelemetry import TracingInterceptor
# 导入相同的追踪器设置
from worker import init_tracer_provider
async def main():
provider = init_tracer_provider()
try:
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()],
)
# 执行工作流
result = await client.execute_workflow(
MyWorkflow.run,
"Hello World",
id="my-workflow-1",
task_queue="my-task-queue",
)
print(f"Workflow result: {result}")
finally:
provider.shutdown()
if __name__ == "__main__":
asyncio.run(main())
TypeScript 使用
@temporalio/sdk 和 OpenTelemetry 拦截器将追踪数据发送到 LangSmith。安装
安装 Temporal SDK、OpenTelemetry 拦截器和追踪包:
npm install @temporalio/client @temporalio/worker @temporalio/activity @temporalio/workflow
npm install @temporalio/interceptors-opentelemetry
npm install @opentelemetry/sdk-node @opentelemetry/sdk-trace-node
npm install @opentelemetry/exporter-trace-otlp-http
npm install @opentelemetry/resources @opentelemetry/semantic-conventions
初始化追踪器
创建一个配置为向 LangSmith 发送追踪数据的 OTLP 导出器的
NodeTracerProvider:tracer.ts
import { Resource } from '@opentelemetry/resources';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
export function initTracerProvider(): NodeTracerProvider {
// 为 LangSmith 创建 OTLP 导出器
const exporter = new OTLPTraceExporter({
url: 'https://api.smith.langchain.com/otel/v1/traces',
headers: {
'x-api-key': process.env.LANGSMITH_API_KEY || '',
'Langsmith-Project': process.env.LANGSMITH_PROJECT || 'default',
},
});
// 创建 TracerProvider
const provider = new NodeTracerProvider({
resource: new Resource({
[ATTR_SERVICE_NAME]: 'temporal-worker',
}),
});
provider.addSpanProcessor(new BatchSpanProcessor(exporter));
provider.register();
return provider;
}
定义工作流
定义一个具有超时配置的代理活动的工作流:
workflows.ts
import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';
const { processActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: '10 seconds',
});
export async function myWorkflow(input: string): Promise<string> {
return await processActivity(input);
}
定义活动
定义一个演示如何为 LangSmith 可见性添加自定义跨度属性的活动:
activities.ts
import { log } from '@temporalio/activity';
import { trace } from '@opentelemetry/api';
export async function processActivity(input: string): Promise<string> {
log.info('Processing', { input });
// 获取当前跨度并添加 Gen AI 属性
const span = trace.getActiveSpan();
span?.setAttribute('gen_ai.prompt', input);
span?.setAttribute('gen_ai.operation.name', 'chat');
const result = `Processed: ${input}`;
span?.setAttribute('gen_ai.completion', result);
return result;
}
运行工作线程
创建一个带有用于活动的 OpenTelemetry 拦截器和用于工作流跨度的工作流导出器的工作线程:
worker.ts
import { Worker, NativeConnection } from '@temporalio/worker';
import { Resource } from '@opentelemetry/resources';
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
import {
makeWorkflowExporter,
OpenTelemetryActivityInboundInterceptor,
} from '@temporalio/interceptors-opentelemetry';
import { trace } from '@opentelemetry/api';
import * as activities from './activities';
import { initTracerProvider } from './tracer';
async function run() {
const provider = initTracerProvider();
try {
const connection = await NativeConnection.connect({
address: 'localhost:7233',
});
const worker = await Worker.create({
connection,
namespace: 'default',
taskQueue: 'my-task-queue',
workflowsPath: require.resolve('./workflows'),
activities,
sinks: {
exporter: makeWorkflowExporter(
trace.getTracer('temporal-app'),
new Resource({ [ATTR_SERVICE_NAME]: 'temporal-worker' })
),
},
interceptors: {
activity: [() => ({ inbound: new OpenTelemetryActivityInboundInterceptor() })],
},
});
console.log('Starting worker...');
await worker.run();
} finally {
await provider.shutdown();
}
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
执行工作流
在一个单独的客户端文件中,连接到 Temporal 并执行工作流:
client.ts
import { Client, Connection } from '@temporalio/client';
import { initTracerProvider } from './tracer';
async function run() {
// 初始化追踪
const provider = initTracerProvider();
try {
const connection = await Connection.connect({ address: 'localhost:7233' });
const client = new Client({ connection });
const result = await client.workflow.execute('myWorkflow', {
taskQueue: 'my-task-queue',
workflowId: 'my-workflow-1',
args: ['Hello World'],
});
console.log('Workflow result:', result);
} finally {
await provider.shutdown();
}
}
run().catch(console.error);
在 LangSmith 中查看追踪数据
配置完成后,追踪数据将出现在您的 LangSmith 项目中:- 导航到您的 LangSmith 实例。
- 选择您的项目。
- 在 Tracing 选项卡中查看追踪数据。
- 点击单个追踪以查看完整的跨度层次结构。
配置选项
设置自定义服务名称
设置自定义服务名称以区分不同的 Temporal 工作线程或服务:ls, err := langsmith.NewTracer(
langsmith.WithServiceName("my-temporal-worker"),
)
添加自定义跨度属性
添加自定义属性以丰富您的追踪数据:import "go.opentelemetry.io/otel/attribute"
span := trace.SpanFromContext(ctx)
span.SetAttributes(
attribute.String("user.id", userID),
attribute.String("workflow.version", "v2"),
)
配置采样
对于高流量的工作流,配置采样以减少追踪数据量:// 注意:langsmith.NewTracer() 使用默认采样
// 对于自定义采样,请直接使用 TracerProvider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), // 10% 采样率
)
故障排除
追踪数据未出现
- 验证 API 密钥:确保
LANGSMITH_API_KEY设置正确 - 检查端点:确认您使用的是
https://api.smith.langchain.com/otel/v1/traces - 在关闭时刷新:在应用程序退出前调用
provider.shutdown()以刷新待处理的跨度 - 检查项目:验证追踪数据是否发送到正确的项目(默认为
"default")
缺少活动跨度
确保在客户端和工作线程上都配置了追踪拦截器:- 客户端:需要拦截器来启动工作流
- 工作线程:需要拦截器来执行活动
上下文传播问题
验证传播器是否正确配置:- Go:
langsmith.NewTracer()自动配置传播器 - Python/TypeScript:确保 OpenTelemetry SDK 已正确初始化并配置了追踪传播器
工作线程关闭挂起
如果追踪数据未刷新,请确保使用适当的超时调用关闭方法:defer ls.Shutdown(context.Background())
后续步骤
其他资源
- Temporal 文档
- Temporal Go SDK
- Temporal Python SDK
- Temporal TypeScript SDK
- OpenTelemetry 文档
- LangSmith Go SDK
Connect these docs to Claude, VSCode, and more via MCP for real-time answers.

