BigQuery 回调处理器
社区Python预览版
Google BigQuery 是一个跨云工作、随数据扩展的无服务器且经济高效的企业数据仓库。
BigQueryCallbackHandler 允许您将 LangChain 和 LangGraph 的事件记录到 Google BigQuery。这对于监控、审计和分析 LLM 应用程序的性能非常有用。
主要特性:
- LangGraph 支持:自动检测 LangGraph 节点,记录
NODE_STARTING、NODE_COMPLETED 和 GRAPH_START/END 事件
- 延迟跟踪:为所有 LLM 和工具调用内置延迟测量
- 事件过滤:可配置的允许列表/拒绝列表,以控制记录哪些事件
- 图上下文管理器:明确的图执行边界和精确计时
- 实时仪表板:基于 FastAPI 的监控 Web 应用,支持实时事件流
预览版发布BigQuery 回调处理器目前处于预览版。API 和功能可能会发生变化。
更多信息,请参阅
发布阶段描述。
BigQuery Storage Write API此功能使用BigQuery Storage Write API,这是一项付费服务。
有关费用信息,请参阅
BigQuery 文档。
您需要安装带有 bigquery 额外依赖的 langchain-google-community。对于此示例,您还需要 langchain-google-genai 和 langgraph。
pip install "langchain-google-community[bigquery]" langchain langchain-google-genai langgraph
先决条件
- Google Cloud 项目,并已启用 BigQuery API。
- BigQuery 数据集:在使用回调处理器之前,创建一个数据集来存储日志表。如果表不存在,回调处理器会自动在数据集内创建必要的事件表。
- Google Cloud Storage 存储桶(可选):如果您计划记录多模态内容(图像、音频等),建议创建一个 GCS 存储桶来卸载大文件。
- 身份验证:
- 本地:运行
gcloud auth application-default login。
- 云端:确保您的服务帐户具有所需权限。
IAM 权限
为了使回调处理器正常工作,运行应用程序的主体(例如,服务帐户、用户帐户)需要以下 Google Cloud 角色:
- 项目级别的
roles/bigquery.jobUser 以运行 BigQuery 查询。
- 表级别的
roles/bigquery.dataEditor 以写入日志/事件数据。
- 如果使用 GCS 卸载:目标存储桶上的
roles/storage.objectCreator 和 roles/storage.objectViewer。
与 LangGraph 智能体一起使用
要将 BigQueryCallbackHandler 与 LangGraph 智能体一起使用,请使用您的 Google Cloud 项目 ID、数据集 ID 和表 ID 实例化它。使用 graph_context() 方法来跟踪图执行边界,并启用带有延迟测量的 GRAPH_START/GRAPH_END 事件。
在调用智能体时,通过 config 对象中的 metadata 字典传递 session_id、user_id 和 agent。
import os
from datetime import datetime
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.tools import tool
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
from langchain_google_genai import ChatGoogleGenerativeAI
# 1. 为智能体定义工具
@tool
def get_current_time() -> str:
"""返回当前本地时间。"""
now = datetime.now()
return f"当前时间:{now.strftime('%I:%M:%S %p')},日期:{now.strftime('%B %d, %Y')}"
@tool
def get_weather(city: str) -> str:
"""返回特定城市的当前天气。"""
# 模拟天气数据(生产环境中使用真实 API)
weather_data = {
"new york": {"temp": 22, "condition": "晴朗"},
"tokyo": {"temp": 24, "condition": "晴天"},
"london": {"temp": 14, "condition": "阴天"},
}
city_lower = city.lower()
if city_lower in weather_data:
data = weather_data[city_lower]
return f"{city.title()} 的天气:{data['temp']}°C,{data['condition']}"
return f"'{city}' 的天气数据不可用。"
@tool
def convert_currency(amount: float, from_currency: str, to_currency: str) -> str:
"""在不同货币之间转换金额。"""
rates = {"USD": 1.0, "EUR": 1.08, "GBP": 1.27, "JPY": 0.0067}
from_curr, to_curr = from_currency.upper(), to_currency.upper()
if from_curr not in rates or to_curr not in rates:
return f"未知货币"
result = amount * rates[from_curr] / rates[to_curr]
return f"{amount} {from_curr} = {result:.2f} {to_curr}"
def run_agent_example(project_id: str):
"""运行带有 BigQuery 日志记录的 LangGraph 智能体。"""
dataset_id = "agent_analytics"
table_id = "agent_events_v2"
# 2. 配置回调处理器
config = BigQueryLoggerConfig(
batch_size=1,
batch_flush_interval=0.5,
)
handler = BigQueryCallbackHandler(
project_id=project_id,
dataset_id=dataset_id,
table_id=table_id,
config=config,
graph_name="travel_assistant", # 启用 LangGraph 跟踪
)
# 3. 创建 LLM 和智能体
# 对于 Vertex AI 使用 project 参数,对于 Gemini Developer API 使用 api_key
llm = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
project=project_id, # 用于 Vertex AI
)
tools = [get_current_time, get_weather, convert_currency]
agent = create_agent(llm, tools)
# 4. 使用 graph_context 运行以获取 GRAPH_START/GRAPH_END 事件
query = "现在几点了?东京的天气怎么样?100 美元等于多少欧元?"
run_metadata = {
"session_id": "session-001",
"user_id": "user-123",
"agent": "travel_assistant",
}
with handler.graph_context("travel_assistant", metadata=run_metadata):
result = agent.invoke(
{"messages": [HumanMessage(content=query)]},
config={
"callbacks": [handler],
"metadata": run_metadata,
},
)
print(f"响应:{result['messages'][-1].content}")
# 5. 清理
handler.shutdown()
if __name__ == "__main__":
project_id = os.environ.get("GCP_PROJECT_ID", "your-project-id")
run_agent_example(project_id)
配置选项
您可以使用 BigQueryLoggerConfig 自定义回调处理器。
要将处理程序禁用,使其不将数据记录到 BigQuery 表,请将此参数设置为 False。
clustering_fields
List[str]
default:"['event_type', 'agent', 'user_id']"
自动创建 BigQuery 表时用于聚类的字段。
用于卸载大内容(图像、二进制大对象、大文本)的 GCS 存储桶名称。如果未提供,大内容可能会被截断或替换为占位符。
用作 ObjectRef 列授权器的 BigQuery 连接 ID(例如 us.my-connection)。将 ObjectRef 与 BigQuery ML 一起使用时需要。
(500 KB) 在卸载到 GCS(如果已配置)或截断之前,存储在 BigQuery 内联中的文本内容的最大长度(以字符为单位)。
写入 BigQuery 之前要批量处理的事件数量。
要记录的事件类型列表。如果为 None,则记录除 event_denylist 中事件之外的所有事件。
table_id
str
default:"agent_events_v2"
如果未显式提供给回调处理器构造函数,则使用的默认表 ID。
retry_config
RetryConfig
default:"RetryConfig()"
写入 BigQuery 失败时重试逻辑(最大重试次数、延迟、乘数)的配置。
在丢弃新事件之前,内部缓冲队列中可容纳的最大事件数。
以下代码示例展示了如何为 BigQuery 回调处理器定义带有事件过滤的配置:
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# 1. 配置 BigQueryLoggerConfig
config = BigQueryLoggerConfig(
enabled=True,
event_allowlist=["LLM_REQUEST", "LLM_RESPONSE"], # 仅记录这些特定事件
shutdown_timeout=10.0, # 退出时最多等待 10 秒让日志刷新
max_content_length=500, # 将内容截断为 500 个字符
)
# 2. 初始化回调处理器
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="your_dataset",
table_id="your_table",
config=config,
)
架构和生产设置
插件会在表不存在时自动创建它。但是,对于生产环境,我们建议使用以下 DDL 手动创建表,该 DDL 利用 JSON 类型实现灵活性,并利用 REPEATED RECORD 处理多模态内容。
推荐的 DDL:
CREATE TABLE `your-gcp-project-id.adk_agent_logs.agent_events_v2`
(
timestamp TIMESTAMP NOT NULL OPTIONS(description="事件发生的 UTC 时间戳。"),
event_type STRING OPTIONS(description="事件的类别。"),
agent STRING OPTIONS(description="智能体的名称。"),
session_id STRING OPTIONS(description="对话会话的唯一标识符。"),
invocation_id STRING OPTIONS(description="单轮调用的唯一标识符。"),
user_id STRING OPTIONS(description="最终用户的标识符。"),
trace_id STRING OPTIONS(description="OpenTelemetry 跟踪 ID。"),
span_id STRING OPTIONS(description="OpenTelemetry 跨度 ID。"),
parent_span_id STRING OPTIONS(description="OpenTelemetry 父跨度 ID。"),
content JSON OPTIONS(description="事件的主要负载。"),
content_parts ARRAY<STRUCT<
mime_type STRING,
uri STRING,
object_ref STRUCT<
uri STRING,
version STRING,
authorizer STRING,
details JSON
>,
text STRING,
part_index INT64,
part_attributes STRING,
storage_mode STRING
>> OPTIONS(description="对于多模态事件,包含内容部分的列表。"),
attributes JSON OPTIONS(description="任意键值对。"),
latency_ms JSON OPTIONS(description="延迟测量值。"),
status STRING OPTIONS(description="事件的结果。"),
error_message STRING OPTIONS(description="详细的错误消息。"),
is_truncated BOOLEAN OPTIONS(description="指示内容是否被截断的标志。")
)
PARTITION BY DATE(timestamp)
CLUSTER BY event_type, agent, user_id;
事件类型和负载
content 列包含一个特定于 event_type 的 JSON 对象。
content_parts 列提供了内容的结构化视图,对于图像或卸载的数据尤其有用。
内容截断
- 可变内容字段被截断为
max_content_length(在 BigQueryLoggerConfig 中配置,默认为 500KB)。
- 如果配置了
gcs_bucket_name,大内容将被卸载到 GCS 而不是被截断,并且引用存储在 content_parts.object_ref 中。
LLM 交互
这些事件跟踪发送到 LLM 的原始请求和从 LLM 接收的响应。
| 事件类型 | 内容(JSON)结构 | 属性(JSON) | 示例内容(简化) |
|---|
LLM_REQUEST | 聊天模型: {"messages": [{"content": "..."}]} 传统模型: {"prompts": ["..."]} | {"tags": ["tag1"], "model": "gemini-2.5-flash"} | {"messages": [{"content": "天气怎么样?"}]} |
LLM_RESPONSE | "天气晴朗。"(存储为 JSON 字符串) | {"usage": {"total_tokens": 20}} | "天气晴朗。" |
LLM_ERROR | null | {} | null |
工具使用
这些事件跟踪智能体执行工具的情况。
| 事件类型 | 内容(JSON)结构 |
|---|
TOOL_STARTING | 输入字符串: "city='Paris'" |
TOOL_COMPLETED | 输出字符串: "25°C, 晴天" |
TOOL_ERROR | "错误:连接超时" |
链执行
这些事件跟踪高级链/图的开始和结束。
| 事件类型 | 内容(JSON)结构 |
|---|
CHAIN_START | {"messages": [...]} |
CHAIN_END | {"output": "..."} |
CHAIN_ERROR | null(参见 error_message 列) |
检索器使用
这些事件跟踪检索器的执行。
| 事件类型 | 内容(JSON)结构 |
|---|
RETRIEVER_START | 查询字符串: "法国的首都是什么?" |
RETRIEVER_END | 文档列表: [{"page_content": "巴黎是首都...", "metadata": {"source": "wiki"}}] |
RETRIEVER_ERROR | null(参见 error_message 列) |
智能体操作
这些事件跟踪智能体采取的特定操作。
| 事件类型 | 内容(JSON)结构 |
|---|
AGENT_ACTION | {"tool": "Calculator", "input": "2 + 2"} |
AGENT_FINISH | {"output": "答案是 4"} |
其他事件
| 事件类型 | 内容(JSON)结构 |
|---|
TEXT | 任意文本: "一些日志文本..." |
高级分析查询
一旦您的智能体运行并记录事件,您就可以对 agent_events_v2 表执行强大的分析。
1. 重建跟踪(对话轮次)
使用 trace_id 对属于单个执行流的所有事件(链、LLM、工具)进行分组。
SELECT
timestamp,
event_type,
span_id,
parent_span_id,
-- 根据事件类型提取摘要或特定内容
COALESCE(
JSON_VALUE(content, '$.messages[0].content'),
JSON_VALUE(content, '$.summary'),
JSON_VALUE(content)
) AS summary,
JSON_VALUE(latency_ms, '$.total_ms') AS duration_ms
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
-- 替换为您的日志中的特定 trace_id
trace_id = '019bb986-a0db-7da1-802d-2725795ab340'
ORDER BY
timestamp ASC;
2. 分析 LLM 延迟和令牌使用情况
计算 LLM 调用的平均延迟和总令牌使用量。
SELECT
JSON_VALUE(attributes, '$.model') AS model,
COUNT(*) AS total_calls,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency_ms,
SUM(CAST(JSON_VALUE(attributes, '$.usage.total_tokens') AS INT64)) AS total_tokens
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE
event_type = 'LLM_RESPONSE'
GROUP BY
1;
3. 使用 BigQuery 远程模型(Gemini)分析多模态内容
如果您将图像卸载到 GCS,可以直接使用 BigQuery ML 分析它们。
SELECT
logs.session_id,
-- 获取图像的签名 URL(可选,用于查看)
STRING(OBJ.GET_ACCESS_URL(parts.object_ref, "r").access_urls.read_url) as signed_url,
-- 使用远程模型(例如 gemini-2.5-flash)分析图像
AI.GENERATE(
('简要描述此图像。是什么公司标志?', parts.object_ref)
) AS generated_result
FROM
`your-gcp-project-id.adk_agent_logs.agent_events_v2` logs,
UNNEST(logs.content_parts) AS parts
WHERE
parts.mime_type LIKE 'image/%'
ORDER BY logs.timestamp DESC
LIMIT 1;
4. 分析跨度层次结构和持续时间
使用跨度 ID 可视化智能体操作(LLM 调用、工具使用)的执行流程和性能。
SELECT
span_id,
parent_span_id,
event_type,
timestamp,
-- 从 latency_ms 中提取已完成操作的持续时间
CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) as duration_ms,
-- 识别特定的工具或操作
COALESCE(
JSON_VALUE(content, '$.tool'),
'LLM_CALL'
) as operation
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`
WHERE trace_id = 'your-trace-id'
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED')
ORDER BY timestamp ASC;
5. 查询卸载的内容(获取签名 URL)
SELECT
timestamp,
event_type,
part.mime_type,
part.storage_mode,
part.object_ref.uri AS gcs_uri,
-- 生成签名 URL 以直接读取内容(需要 connection_id 配置)
STRING(OBJ.GET_ACCESS_URL(part.object_ref, 'r').access_urls.read_url) AS signed_url
FROM `your-gcp-project-id.adk_agent_logs.agent_events_v2`,
UNNEST(content_parts) AS part
WHERE part.storage_mode = 'GCS_REFERENCE'
ORDER BY timestamp DESC
LIMIT 10;
6. 高级 SQL 场景
这些高级模式演示了如何使用 BigQuery ML 对数据进行会话化、分析工具使用情况以及执行根本原因分析。
-- 1. 会话化对话历史记录(创建视图)
-- 将所有事件合并为每个会话一行,并带有格式化的历史记录。
CREATE OR REPLACE VIEW `your-project.your-dataset.agent_sessions` AS
SELECT
session_id,
user_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
ARRAY_AGG(
STRUCT(timestamp, event_type, TO_JSON_STRING(content) as content, error_message)
ORDER BY timestamp ASC
) AS events,
STRING_AGG(
CASE
WHEN event_type = 'USER_MESSAGE_RECEIVED' THEN CONCAT('用户:', JSON_VALUE(content, '$.input'))
WHEN event_type = 'LLM_RESPONSE' THEN CONCAT('智能体:', JSON_VALUE(content, '$.text'))
WHEN event_type = 'TOOL_STARTING' THEN CONCAT('系统:正在调用 ', JSON_VALUE(content, '$.tool_name'))
WHEN event_type = 'TOOL_COMPLETED' THEN CONCAT('系统:来自 ', JSON_VALUE(content, '$.tool_name'), ' 的结果')
WHEN event_type = 'TOOL_ERROR' THEN CONCAT('系统:', JSON_VALUE(content, '$.tool_name'), ' 中的错误')
ELSE NULL
END,
'\n' ORDER BY timestamp ASC
) AS full_conversation
FROM
`your-project.your-dataset.agent_events_v2`
GROUP BY
session_id, user_id;
-- 2. 工具使用情况分析
-- 提取工具名称并统计执行状态
SELECT
JSON_VALUE(content, '$.tool_name') AS tool_name,
event_type,
COUNT(*) as count
FROM `your-project.your-dataset.agent_events_v2`
WHERE event_type IN ('TOOL_STARTING', 'TOOL_COMPLETED', 'TOOL_ERROR')
GROUP BY 1, 2
ORDER BY tool_name, event_type;
-- 3. 细粒度成本和令牌估算
-- 根据内容字符长度估算令牌(大约 4 个字符/令牌)
SELECT
session_id,
COUNT(*) as interaction_count,
SUM(LENGTH(TO_JSON_STRING(content))) / 4 AS estimated_tokens,
-- 示例成本:每 1k 令牌 $0.0001
ROUND((SUM(LENGTH(TO_JSON_STRING(content))) / 4) / 1000 * 0.0001, 6) AS estimated_cost_usd
FROM `your-project.your-dataset.agent_events_v2`
GROUP BY session_id
ORDER BY estimated_cost_usd DESC
LIMIT 5;
-- 4. AI 驱动的根本原因分析(需要 BigQuery ML)
-- 使用 Gemini 分析失败的会话
SELECT
session_id,
AI.GENERATE(
('分析此对话并解释失败的根本原因。日志:', full_conversation),
connection_id => 'your-project.us.bqml_connection',
endpoint => 'gemini-2.5-flash'
).result AS root_cause_explanation
FROM `your-project.your-dataset.agent_sessions`
WHERE error_message IS NOT NULL
LIMIT 5;
BigQuery 中的对话分析
对话分析您还可以使用 BigQuery 对话分析 以自然语言分析您的智能体日志。
只需提出以下问题:
- “显示一段时间内的错误率”
- “最常见的工具调用是什么?”
- “识别令牌使用量高的会话”
Looker Studio 仪表板
您可以使用我们预构建的 Looker Studio 仪表板模板 可视化智能体的性能。
要将此仪表板连接到您自己的 BigQuery 表,请使用以下链接格式,将占位符替换为您的特定项目、数据集和表 ID:
https://lookerstudio.google.com/reporting/create?c.reportId=f1c5b513-3095-44f8-90a2-54953d41b125&ds.ds3.connector=bigQuery&ds.ds3.type=TABLE&ds.ds3.projectId=<your-project-id>&ds.ds3.datasetId=<your-dataset-id>&ds.ds3.tableId=<your-table-id>
LangGraph 集成
BigQueryCallbackHandler 为 LangGraph 智能体提供了增强支持,包括自动节点检测、图级跟踪和延迟测量。
LangGraph 事件类型
除了标准的 LangChain 事件外,回调处理器还会自动检测并记录 LangGraph 特定事件:
| 事件类型 | 描述 |
|---|
NODE_STARTING | 当 LangGraph 节点开始执行时发出 |
NODE_COMPLETED | 当 LangGraph 节点成功完成时发出 |
NODE_ERROR | 当 LangGraph 节点失败时发出 |
GRAPH_START | 当图执行开始时发出(通过上下文管理器) |
GRAPH_END | 当图执行完成时发出 |
GRAPH_ERROR | 当图执行失败时发出 |
图上下文管理器
使用 graph_context() 方法显式标记图执行边界。这可以启用带有精确延迟测量的 GRAPH_START 和 GRAPH_END 事件:
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# 使用图名称初始化处理器
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="agent_analytics",
table_id="agent_events_v2",
graph_name="my_agent",
)
# 创建您的智能体
agent = create_agent(llm, tools)
# 使用图上下文管理器以获取正确的 GRAPH_START/GRAPH_END 事件
run_metadata = {
"session_id": "session-123",
"user_id": "user-456",
"agent": "my_agent",
}
with handler.graph_context("my_agent", metadata=run_metadata):
result = agent.invoke(
{"messages": [HumanMessage(content="东京的天气怎么样?")]},
config={
"callbacks": [handler],
"metadata": run_metadata,
},
)
延迟跟踪
回调处理器会自动跟踪所有操作的延迟,并将测量值存储在 latency_ms JSON 列中:
-- 按事件类型查询延迟
SELECT
event_type,
agent,
COUNT(*) as count,
ROUND(AVG(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64)), 2) as avg_latency_ms,
ROUND(APPROX_QUANTILES(CAST(JSON_EXTRACT_SCALAR(latency_ms, '$.total_ms') AS FLOAT64), 100)[OFFSET(95)], 2) as p95_latency_ms
FROM `your-project.your-dataset.agent_events_v2`
WHERE DATE(timestamp) = CURRENT_DATE()
AND event_type IN ('LLM_RESPONSE', 'TOOL_COMPLETED', 'GRAPH_END')
GROUP BY event_type, agent
ORDER BY avg_latency_ms DESC;
事件过滤
使用 event_allowlist 和 event_denylist 来控制记录哪些事件:
from langchain_google_community.callbacks.bigquery_callback import (
BigQueryCallbackHandler,
BigQueryLoggerConfig,
)
# 生产配置:仅记录重要事件
config = BigQueryLoggerConfig(
event_allowlist=[
"LLM_RESPONSE",
"LLM_ERROR",
"TOOL_COMPLETED",
"TOOL_ERROR",
"GRAPH_END",
"GRAPH_ERROR",
],
)
handler = BigQueryCallbackHandler(
project_id="your-project-id",
dataset_id="agent_analytics",
config=config,
)
或者排除嘈杂的事件:
# 排除链事件但记录其他所有事件
config = BigQueryLoggerConfig(
event_denylist=["CHAIN_START", "CHAIN_END"],
)
示例和资源
示例代码
以下示例演示了 BigQuery 回调处理器的各种功能:
| 示例 | 描述 |
|---|
| 基础示例 | 带有 LLM 调用的基本回调用法 |
| LangGraph 智能体 | 带有 6 个现实工具的完整 ReAct 智能体 |
| 异步示例 | 带有并发查询的异步处理器 |
| 事件过滤 | 允许列表/拒绝列表配置 |
| 样本数据生成器 | 跨多种智能体类型生成样本数据 |
分析笔记本
LangGraph 智能体分析笔记本 提供了全面的 BigQuery 分析查询,用于:
- 实时事件监控
- 工具使用情况分析
- 延迟分析和趋势
- 错误调试
- 用户参与度指标
- 时间序列可视化
实时监控仪表板
提供了一个 基于 FastAPI 的监控仪表板,用于实时智能体监控:
特性:
- 通过服务器发送事件 (SSE) 的实时事件流
- 事件分布和延迟趋势的交互式图表
- 带有详细时间线视图的会话跟踪
- 20 多个用于分析查询的 REST API 端点
- 每 5 秒自动刷新
# 运行仪表板
cd libs/community/examples/bigquery_callback/webapp
pip install -r requirements.txt
uvicorn main:app --port 8001
# 打开 http://localhost:8001
我们欢迎您对 BigQuery 智能体分析提出反馈。如果您有任何问题、建议或遇到任何问题,请通过 bqaa-feedback@google.com 联系团队。
其他资源