Skip to main content
LangSmith 可以通过 OpenTelemetry 工具捕获 Pipecat 生成的追踪信息。本指南将展示如何自动捕获来自 Pipecat 语音 AI 流水线的追踪数据,并将其发送到 LangSmith 进行监控和分析。 完整实现请参考 演示仓库

安装

安装所需的包:
pip install langsmith "pipecat-ai[whisper,openai,local]" opentelemetry-exporter-otlp python-dotenv
如果你计划使用高级音频录制功能,还需安装:pip install scipy numpy

快速入门教程

按照这个分步教程,使用 Pipecat 和 LangSmith 追踪创建一个语音 AI 智能体。你将通过复制粘贴代码片段来构建一个完整的工作示例。

步骤 1:设置环境

在你的项目目录中创建一个 .env 文件:
.env
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.smith.langchain.com/otel
OTEL_EXPORTER_OTLP_HEADERS=x-api-key=<你的-langsmith-api-密钥>, Langsmith-Project=pipecat-voice
OPENAI_API_KEY=<你的-openai-api-密钥>

步骤 2:下载跨度处理器

添加启用 LangSmith 追踪的 自定义跨度处理器文件。将其保存为项目目录中的 langsmith_processor.py
该跨度处理器使用 LangSmith 兼容的属性来丰富 Pipecat 的 OpenTelemetry 跨度,以便你的追踪信息能在 LangSmith 中正确显示。主要功能:
  • 将 Pipecat 跨度类型(stt、llm、tts、turn、conversation)转换为 LangSmith 格式。
  • 为消息可视化添加 gen_ai.prompt.*gen_ai.completion.* 属性。
  • 跨轮次跟踪和聚合对话消息。
  • 处理音频文件附件(用于高级用法)。
当你将其导入代码时,处理器会自动激活。

步骤 3:创建你的语音智能体文件

创建一个名为 agent.py 的新文件,并添加以下代码。我们将分部分构建它,以便你可以复制粘贴每个部分。

第 1 部分:导入依赖项

import asyncio
import uuid
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# 导入 Pipecat 组件
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.whisper.stt import WhisperSTTService
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams

# 导入跨度处理器以启用 LangSmith 追踪
from langsmith_processor import span_processor

第 2 部分:定义主函数

async def main():
    # 为 LangSmith 生成唯一的对话 ID
    conversation_id = str(uuid.uuid4())
    print(f"开始对话: {conversation_id}")

    # 配置带有语音活动检测的音频输入/输出
    transport = LocalAudioTransport(
        LocalAudioTransportParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
        )
    )

    # 初始化 AI 服务
    stt = WhisperSTTService()
    llm = OpenAILLMService(model="gpt-4.1-mini")
    tts = OpenAITTSService(voice="alloy")

    # 使用系统提示设置对话上下文
    context = OpenAILLMContext(
        messages=[
            {
                "role": "system",
                "content": "你是一个有用的语音助手。保持回答简洁且口语化。"
            }
        ]
    )
    context_aggregator = llm.create_context_aggregator(context)

    # 构建处理流水线
    pipeline = Pipeline([
        transport.input(),           # 捕获麦克风输入
        stt,                         # 将语音转换为文本
        context_aggregator.user(),   # 将用户消息添加到上下文
        llm,                         # 生成 AI 响应
        tts,                         # 将响应转换为语音
        transport.output(),          # 通过扬声器播放
        context_aggregator.assistant(),  # 将助手响应添加到上下文
    ])

    # 创建启用追踪的任务
    task = PipelineTask(
        pipeline,
        params=PipelineParams(enable_metrics=True),
        enable_tracing=True,
        enable_turn_tracking=True,
        conversation_id=conversation_id,
    )

    # 运行智能体
    runner = PipelineRunner()
    await runner.run(task)

第 3 部分:添加入口点

if __name__ == "__main__":
    asyncio.run(main())

步骤 4:运行你的智能体

运行你的语音智能体:
python agent.py
通过麦克风与智能体对话。所有追踪信息将自动出现在 LangSmith 中。以下是 LangSmith 中追踪信息的示例:LangSmith 追踪与 Pipecat 查看完整的 agent.py 代码

高级用法

自定义元数据和标签

你可以使用跨度属性向追踪信息添加自定义元数据:
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

async def run_voice_session():
    with tracer.start_as_current_span("voice_conversation") as span:
        # 添加自定义元数据
        span.set_attribute("langsmith.metadata.session_type", "voice_assistant")
        span.set_attribute("langsmith.metadata.user_id", "user_123")
        span.set_attribute("langsmith.span.tags", "pipecat,voice-ai,stt-llm-tts")

        # 你的 Pipecat 流水线代码放在这里
        task = PipelineTask(pipeline, enable_tracing=True)
        await task.queue_frames([TextFrame("Hello")])

录制音频并附加到追踪信息

你可以捕获语音对话中的音频,并将其附加到 LangSmith 的追踪信息中。这允许你边听实际音频边查看转录文本和 AI 响应。

完整对话录制

参考 AudioRecorder 实现,它处理输入(麦克风)和输出(TTS)音频之间的采样率不匹配问题。 从头到尾捕获所有音频,并将其附加到对话跨度:
from pathlib import Path
from datetime import datetime
from audio_recorder import AudioRecorder

# 设置录制目录
recordings_dir = Path(__file__).parent / "recordings"
recordings_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
recording_path = recordings_dir / f"conversation_{timestamp}.wav"

# 创建音频录制器
audio_recorder = AudioRecorder(str(recording_path))

# 注册到跨度处理器,以便附加到对话跨度
span_processor.register_recording(
    conversation_id,
    str(recording_path),
    audio_recorder=audio_recorder
)

# 添加到你的流水线
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    tts,
    audio_recorder,              # 将音频录制器添加到流水线
    transport.output(),
    context_aggregator.assistant(),
])

# 运行流水线
runner = PipelineRunner()
try:
    await runner.run(task)
finally:
    # 重要:在对话跨度完成之前保存录制
    audio_recorder.save_recording()

每轮对话录制

参考 TurnAudioRecorder 实现,它为每轮对话分别捕获用户语音和 AI 响应。 为每轮对话捕获单独的音频片段,用户语音和 AI 响应保存为单独的文件:
from turn_audio_recorder import TurnAudioRecorder

# 创建每轮对话音频录制器
turn_audio_recorder = TurnAudioRecorder(
    span_processor=span_processor,
    conversation_id=conversation_id,
    recordings_dir=recordings_dir,
    turn_tracker=None,  # 将在任务创建后设置
)

# 注册到跨度处理器
span_processor.register_turn_audio_recorder(conversation_id, turn_audio_recorder)

# 添加到你的流水线
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    tts,
    audio_recorder,              # 完整对话录制
    turn_audio_recorder,         # 每轮对话音频片段
    transport.output(),
    context_aggregator.assistant(),
])

# 创建任务
task = PipelineTask(
    pipeline,
    params=PipelineParams(enable_metrics=True),
    enable_tracing=True,
    enable_turn_tracking=True,  # 每轮对话音频录制所需
    conversation_id=conversation_id,
)

# 任务创建后连接轮次跟踪器
if task.turn_tracking_observer:
    turn_audio_recorder.connect_to_turn_tracker(task.turn_tracking_observer)

# 运行流水线
runner = PipelineRunner()
try:
    await runner.run(task)
finally:
    audio_recorder.save_recording()

故障排除

跨度未出现在 LangSmith 中

如果追踪信息未显示在 LangSmith 中:
  1. 验证环境变量:确保 .env 文件中正确设置了 OTEL_EXPORTER_OTLP_ENDPOINTOTEL_EXPORTER_OTLP_HEADERS
  2. 检查 API 密钥:确认你的 LangSmith API 密钥具有写入权限。
  3. 验证导入:确保你从 langsmith_processor.py 导入了 span_processor
  4. 检查 .env 加载:确保在导入 Pipecat 组件之前调用了 load_dotenv()

消息未正确显示

如果对话消息显示不正常:
  1. 检查跨度处理器:确认 langsmith_processor.py 在你的项目目录中且导入正确。
  2. 验证对话 ID:确保在 PipelineTask 中设置了唯一的 conversation_id
  3. 启用轮次跟踪:确保在 PipelineTask 中设置了 enable_turn_tracking=True

音频无法工作

如果你的麦克风或扬声器无法工作:
  1. 检查权限:确保你的终端/IDE 有麦克风访问权限。
  2. 测试音频设备:验证你的麦克风和扬声器在其他应用程序中正常工作。
  3. VAD 设置:如果未检测到语音,尝试调整 SileroVADAnalyzer() 的设置。
  4. 检查服务:确保 OpenAI API 密钥有效且有权访问 Whisper 和 TTS。

导入错误

如果你遇到导入错误:
  1. 安装依赖项:运行 pip install langsmith "pipecat-ai[whisper,openai,local]" opentelemetry-exporter-otlp python-dotenv
  2. 检查 Python 版本:确保你使用的是 Python 3.9 或更高版本。
  3. 验证 langsmith_processor:确保 langsmith_processor.py 已下载且与你的 agent.py 在同一目录中。

性能问题

如果响应缓慢:
  1. 使用更快的模型:为 LLM 切换到 gpt-4.1-mini(教程中已使用)。
  2. 检查网络:确保 API 调用的网络连接稳定。
  3. 本地 STT:考虑使用本地 Whisper 而不是基于 API 的服务。

高级:音频录制故障排除

有关高级音频录制功能的问题,请参阅 完整演示文档