Skip to main content
Temporal 是一个持久化执行平台,使开发者能够构建弹性的分布式应用程序。本指南将向您展示如何使用 OpenTelemetry 在 LangSmith 中追踪 Temporal 工作流和活动。 LangSmith 支持 OpenTelemetry (OTEL) 追踪数据摄取,可与 Temporal 原生的 OpenTelemetry 拦截器无缝集成。这使得您可以在工作流执行、活动以及其中的任何 LLM 调用之间实现完整的分布式追踪。

先决条件

  • 一个 LangSmith 账户 和 API 密钥
  • 正在运行的 Temporal 服务器(本地或云端)
  • 适用于您所用语言的 OpenTelemetry SDK

环境变量

为所有实现设置以下环境变量:
变量必需描述
LANGSMITH_API_KEY来自设置页面的 LangSmith API 密钥。
LANGSMITH_PROJECT项目名称(默认为 "default")。
对于欧盟区域或自托管的 LangSmith 安装,还需将 LANGCHAIN_BASE_URL 设置为您的 LangSmith 实例 URL。

设置追踪

Go 使用 langsmith-go SDK 和 Temporal 的 OpenTelemetry 拦截器来自动追踪工作流和活动。
1

安装

安装 LangSmith Go SDK、Temporal SDK 和 OpenTelemetry 拦截器:
go get github.com/langchain-ai/langsmith-go@v0.1.0-alpha.7
go get go.temporal.io/sdk
go get go.temporal.io/sdk/contrib/opentelemetry
2

初始化追踪器

初始化 LangSmith 追踪器,创建 Temporal 的 OpenTelemetry 拦截器,并将其注册到 Temporal 客户端和工作线程:
package main

import (
	"context"
	"log"

	"github.com/langchain-ai/langsmith-go"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/contrib/opentelemetry"
	"go.temporal.io/sdk/interceptor"
	"go.temporal.io/sdk/worker"
)

func main() {
	ctx := context.Background()

	// 初始化 LangSmith 追踪器(读取 LANGSMITH_API_KEY 和 LANGSMITH_PROJECT)
	ls, err := langsmith.NewTracer(
		langsmith.WithServiceName("temporal-worker"),
	)
	if err != nil {
		log.Fatal("Failed to initialize LangSmith tracer:", err)
	}
	defer ls.Shutdown(ctx)

	// 创建 Temporal 追踪拦截器
	tracer := ls.Tracer("temporal-app")
	tracingInterceptor, err := opentelemetry.NewTracingInterceptor(
		opentelemetry.TracerOptions{Tracer: tracer},
	)
	if err != nil {
		log.Fatal("Failed to create tracing interceptor:", err)
	}

	// 创建带追踪的 Temporal 客户端
	c, err := client.Dial(client.Options{
		Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
	})
	if err != nil {
		log.Fatal("Failed to create Temporal client:", err)
	}
	defer c.Close()

	// 创建带追踪的工作线程(使用相同的客户端)
	w := worker.New(c, "my-task-queue", worker.Options{})
	w.RegisterWorkflow(MyWorkflow)
	w.RegisterActivity(MyActivity)

	// 启动工作线程
	if err := w.Run(worker.InterruptCh()); err != nil {
		log.Fatal("Worker failed:", err)
	}
}
3

定义工作流和活动

定义一个执行活动的工作流。该活动演示了如何为 LangSmith 可见性添加自定义跨度属性:
package main

import (
	"context"
	"fmt"
	"time"

	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
	"go.temporal.io/sdk/activity"
	"go.temporal.io/sdk/workflow"
)

// MyWorkflow 执行一个活动
func MyWorkflow(ctx workflow.Context, input string) (string, error) {
	ao := workflow.ActivityOptions{
		StartToCloseTimeout: 10 * time.Second,
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	var result string
	err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result)
	return result, err
}

// MyActivity 处理输入并添加自定义跨度属性
func MyActivity(ctx context.Context, input string) (string, error) {
	logger := activity.GetLogger(ctx)
	logger.Info("Processing", "input", input)

	// 获取由 Temporal 拦截器创建的跨度
	span := trace.SpanFromContext(ctx)

	// 为 LangSmith 可见性添加 Gen AI 属性
	span.SetAttributes(
		attribute.String("gen_ai.prompt", input),
		attribute.String("gen_ai.operation.name", "chat"),
	)

	result := fmt.Sprintf("Processed: %s", input)

	// 设置完成属性
	span.SetAttributes(
		attribute.String("gen_ai.completion", result),
	)

	return result, nil
}
4

执行工作流

在一个单独的客户端应用程序中,初始化追踪器并执行工作流:
client.go
// 在单独的函数或客户端应用程序中
func executeWorkflow() {
	ctx := context.Background()

	// 为客户端初始化追踪器
	ls, err := langsmith.NewTracer(
		langsmith.WithServiceName("temporal-client"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer ls.Shutdown(ctx)

	// 创建带追踪的客户端
	tracer := ls.Tracer("temporal-app")
	tracingInterceptor, err := opentelemetry.NewTracingInterceptor(
		opentelemetry.TracerOptions{Tracer: tracer},
	)
	if err != nil {
		log.Fatal(err)
	}

	c, err := client.Dial(client.Options{
		Interceptors: []interceptor.ClientInterceptor{tracingInterceptor},
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	// 执行工作流
	workflowOptions := client.StartWorkflowOptions{
		ID:        "my-workflow-1",
		TaskQueue: "my-task-queue",
	}

	we, err := c.ExecuteWorkflow(ctx, workflowOptions, MyWorkflow, "Hello World")
	if err != nil {
		log.Fatal(err)
	}

	var result string
	if err := we.Get(ctx, &result); err != nil {
		log.Fatal(err)
	}

	log.Printf("Workflow result: %s", result)
}

在 LangSmith 中查看追踪数据

配置完成后,追踪数据将出现在您的 LangSmith 项目中:
  1. 导航到您的 LangSmith 实例。
  2. 选择您的项目。
  3. Tracing 选项卡中查看追踪数据。
  4. 点击单个追踪以查看完整的跨度层次结构。

配置选项

设置自定义服务名称

设置自定义服务名称以区分不同的 Temporal 工作线程或服务:
ls, err := langsmith.NewTracer(
    langsmith.WithServiceName("my-temporal-worker"),
)

添加自定义跨度属性

添加自定义属性以丰富您的追踪数据:
import "go.opentelemetry.io/otel/attribute"

span := trace.SpanFromContext(ctx)
span.SetAttributes(
    attribute.String("user.id", userID),
    attribute.String("workflow.version", "v2"),
)

配置采样

对于高流量的工作流,配置采样以减少追踪数据量:
// 注意:langsmith.NewTracer() 使用默认采样
// 对于自定义采样,请直接使用 TracerProvider
tp := sdktrace.NewTracerProvider(
    sdktrace.WithBatcher(exporter),
    sdktrace.WithSampler(sdktrace.TraceIDRatioBased(0.1)), // 10% 采样率
)

故障排除

追踪数据未出现

  1. 验证 API 密钥:确保 LANGSMITH_API_KEY 设置正确
  2. 检查端点:确认您使用的是 https://api.smith.langchain.com/otel/v1/traces
  3. 在关闭时刷新:在应用程序退出前调用 provider.shutdown() 以刷新待处理的跨度
  4. 检查项目:验证追踪数据是否发送到正确的项目(默认为 "default"

缺少活动跨度

确保在客户端和工作线程上都配置了追踪拦截器:
  • 客户端:需要拦截器来启动工作流
  • 工作线程:需要拦截器来执行活动

上下文传播问题

验证传播器是否正确配置:
  • Golangsmith.NewTracer() 自动配置传播器
  • Python/TypeScript:确保 OpenTelemetry SDK 已正确初始化并配置了追踪传播器

工作线程关闭挂起

如果追踪数据未刷新,请确保使用适当的超时调用关闭方法:
defer ls.Shutdown(context.Background())

后续步骤

其他资源