LangSmith 可以通过 OpenTelemetry 工具捕获 Pipecat 生成的追踪信息。本指南将展示如何自动捕获来自 Pipecat 语音 AI 流水线的追踪数据,并将其发送到 LangSmith 进行监控和分析。
完整实现请参考 演示仓库。
安装所需的包:
pip install langsmith "pipecat-ai[whisper,openai,local]" opentelemetry-exporter-otlp python-dotenv
如果你计划使用高级音频录制功能,还需安装:pip install scipy numpy
快速入门教程
按照这个分步教程,使用 Pipecat 和 LangSmith 追踪创建一个语音 AI 智能体。你将通过复制粘贴代码片段来构建一个完整的工作示例。
步骤 1:设置环境
在你的项目目录中创建一个 .env 文件:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.smith.langchain.com/otel
OTEL_EXPORTER_OTLP_HEADERS=x-api-key=<你的-langsmith-api-密钥>, Langsmith-Project=pipecat-voice
OPENAI_API_KEY=<你的-openai-api-密钥>
步骤 2:下载跨度处理器
添加启用 LangSmith 追踪的 自定义跨度处理器文件。将其保存为项目目录中的 langsmith_processor.py。
该跨度处理器使用 LangSmith 兼容的属性来丰富 Pipecat 的 OpenTelemetry 跨度,以便你的追踪信息能在 LangSmith 中正确显示。主要功能:
- 将 Pipecat 跨度类型(stt、llm、tts、turn、conversation)转换为 LangSmith 格式。
- 为消息可视化添加
gen_ai.prompt.* 和 gen_ai.completion.* 属性。
- 跨轮次跟踪和聚合对话消息。
- 处理音频文件附件(用于高级用法)。
当你将其导入代码时,处理器会自动激活。
步骤 3:创建你的语音智能体文件
创建一个名为 agent.py 的新文件,并添加以下代码。我们将分部分构建它,以便你可以复制粘贴每个部分。
第 1 部分:导入依赖项
import asyncio
import uuid
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 导入 Pipecat 组件
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.whisper.stt import WhisperSTTService
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.transports.local.audio import LocalAudioTransport, LocalAudioTransportParams
# 导入跨度处理器以启用 LangSmith 追踪
from langsmith_processor import span_processor
第 2 部分:定义主函数
async def main():
# 为 LangSmith 生成唯一的对话 ID
conversation_id = str(uuid.uuid4())
print(f"开始对话: {conversation_id}")
# 配置带有语音活动检测的音频输入/输出
transport = LocalAudioTransport(
LocalAudioTransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
)
)
# 初始化 AI 服务
stt = WhisperSTTService()
llm = OpenAILLMService(model="gpt-4.1-mini")
tts = OpenAITTSService(voice="alloy")
# 使用系统提示设置对话上下文
context = OpenAILLMContext(
messages=[
{
"role": "system",
"content": "你是一个有用的语音助手。保持回答简洁且口语化。"
}
]
)
context_aggregator = llm.create_context_aggregator(context)
# 构建处理流水线
pipeline = Pipeline([
transport.input(), # 捕获麦克风输入
stt, # 将语音转换为文本
context_aggregator.user(), # 将用户消息添加到上下文
llm, # 生成 AI 响应
tts, # 将响应转换为语音
transport.output(), # 通过扬声器播放
context_aggregator.assistant(), # 将助手响应添加到上下文
])
# 创建启用追踪的任务
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True),
enable_tracing=True,
enable_turn_tracking=True,
conversation_id=conversation_id,
)
# 运行智能体
runner = PipelineRunner()
await runner.run(task)
第 3 部分:添加入口点
if __name__ == "__main__":
asyncio.run(main())
步骤 4:运行你的智能体
运行你的语音智能体:
通过麦克风与智能体对话。所有追踪信息将自动出现在 LangSmith 中。以下是 LangSmith 中追踪信息的示例:LangSmith 追踪与 Pipecat。
查看完整的 agent.py 代码。
高级用法
自定义元数据和标签
你可以使用跨度属性向追踪信息添加自定义元数据:
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def run_voice_session():
with tracer.start_as_current_span("voice_conversation") as span:
# 添加自定义元数据
span.set_attribute("langsmith.metadata.session_type", "voice_assistant")
span.set_attribute("langsmith.metadata.user_id", "user_123")
span.set_attribute("langsmith.span.tags", "pipecat,voice-ai,stt-llm-tts")
# 你的 Pipecat 流水线代码放在这里
task = PipelineTask(pipeline, enable_tracing=True)
await task.queue_frames([TextFrame("Hello")])
录制音频并附加到追踪信息
你可以捕获语音对话中的音频,并将其附加到 LangSmith 的追踪信息中。这允许你边听实际音频边查看转录文本和 AI 响应。
完整对话录制
参考 AudioRecorder 实现,它处理输入(麦克风)和输出(TTS)音频之间的采样率不匹配问题。
从头到尾捕获所有音频,并将其附加到对话跨度:
from pathlib import Path
from datetime import datetime
from audio_recorder import AudioRecorder
# 设置录制目录
recordings_dir = Path(__file__).parent / "recordings"
recordings_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
recording_path = recordings_dir / f"conversation_{timestamp}.wav"
# 创建音频录制器
audio_recorder = AudioRecorder(str(recording_path))
# 注册到跨度处理器,以便附加到对话跨度
span_processor.register_recording(
conversation_id,
str(recording_path),
audio_recorder=audio_recorder
)
# 添加到你的流水线
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
audio_recorder, # 将音频录制器添加到流水线
transport.output(),
context_aggregator.assistant(),
])
# 运行流水线
runner = PipelineRunner()
try:
await runner.run(task)
finally:
# 重要:在对话跨度完成之前保存录制
audio_recorder.save_recording()
每轮对话录制
参考 TurnAudioRecorder 实现,它为每轮对话分别捕获用户语音和 AI 响应。
为每轮对话捕获单独的音频片段,用户语音和 AI 响应保存为单独的文件:
from turn_audio_recorder import TurnAudioRecorder
# 创建每轮对话音频录制器
turn_audio_recorder = TurnAudioRecorder(
span_processor=span_processor,
conversation_id=conversation_id,
recordings_dir=recordings_dir,
turn_tracker=None, # 将在任务创建后设置
)
# 注册到跨度处理器
span_processor.register_turn_audio_recorder(conversation_id, turn_audio_recorder)
# 添加到你的流水线
pipeline = Pipeline([
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
audio_recorder, # 完整对话录制
turn_audio_recorder, # 每轮对话音频片段
transport.output(),
context_aggregator.assistant(),
])
# 创建任务
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True),
enable_tracing=True,
enable_turn_tracking=True, # 每轮对话音频录制所需
conversation_id=conversation_id,
)
# 任务创建后连接轮次跟踪器
if task.turn_tracking_observer:
turn_audio_recorder.connect_to_turn_tracker(task.turn_tracking_observer)
# 运行流水线
runner = PipelineRunner()
try:
await runner.run(task)
finally:
audio_recorder.save_recording()
故障排除
跨度未出现在 LangSmith 中
如果追踪信息未显示在 LangSmith 中:
- 验证环境变量:确保
.env 文件中正确设置了 OTEL_EXPORTER_OTLP_ENDPOINT 和 OTEL_EXPORTER_OTLP_HEADERS。
- 检查 API 密钥:确认你的 LangSmith API 密钥具有写入权限。
- 验证导入:确保你从
langsmith_processor.py 导入了 span_processor。
- 检查 .env 加载:确保在导入 Pipecat 组件之前调用了
load_dotenv()。
消息未正确显示
如果对话消息显示不正常:
- 检查跨度处理器:确认
langsmith_processor.py 在你的项目目录中且导入正确。
- 验证对话 ID:确保在
PipelineTask 中设置了唯一的 conversation_id。
- 启用轮次跟踪:确保在
PipelineTask 中设置了 enable_turn_tracking=True。
音频无法工作
如果你的麦克风或扬声器无法工作:
- 检查权限:确保你的终端/IDE 有麦克风访问权限。
- 测试音频设备:验证你的麦克风和扬声器在其他应用程序中正常工作。
- VAD 设置:如果未检测到语音,尝试调整
SileroVADAnalyzer() 的设置。
- 检查服务:确保 OpenAI API 密钥有效且有权访问 Whisper 和 TTS。
导入错误
如果你遇到导入错误:
- 安装依赖项:运行
pip install langsmith "pipecat-ai[whisper,openai,local]" opentelemetry-exporter-otlp python-dotenv。
- 检查 Python 版本:确保你使用的是 Python 3.9 或更高版本。
- 验证 langsmith_processor:确保
langsmith_processor.py 已下载且与你的 agent.py 在同一目录中。
性能问题
如果响应缓慢:
- 使用更快的模型:为 LLM 切换到
gpt-4.1-mini(教程中已使用)。
- 检查网络:确保 API 调用的网络连接稳定。
- 本地 STT:考虑使用本地 Whisper 而不是基于 API 的服务。
高级:音频录制故障排除
有关高级音频录制功能的问题,请参阅 完整演示文档。