CockroachDB 是一个基于事务性和强一致性键值存储构建的分布式 SQL 数据库。它支持水平扩展,能够容忍磁盘、机器、机架甚至数据中心故障,且延迟干扰极小,无需人工干预。
关键功能:
- 分布式 SQL:在保持 ACID 保证的同时进行横向扩展
- 原生向量支持:内置
VECTOR 类型(v24.2+)和 C-SPANN 索引(v25.2+)
- 兼容 PostgreSQL:PostgreSQL 应用的即插即用替代品
- 全球复制:多区域部署,低延迟
- 自动分片:数据自动分布在节点之间
- 可串行化隔离:默认采用最强的隔离级别
安装和设置
安装 LangChain 集成:
pip install langchain-cockroachdb
获取您的 CockroachDB 连接字符串
您需要一个 CockroachDB 集群。选择一种选项:
选项 1:CockroachDB Cloud(推荐)
- 在 cockroachlabs.cloud 注册
- 创建一个免费集群
- 获取您的连接字符串:
cockroachdb://user:pass@host:26257/db?sslmode=verify-full
选项 2:Docker(开发)
docker run -d --name cockroachdb -p 26257:26257 \
cockroachdb/cockroach:latest start-single-node --insecure
连接字符串:cockroachdb://root@localhost:26257/defaultdb?sslmode=disable
选项 3:本地二进制文件
从 cockroachlabs.com/docs/releases 下载
向量存储
CockroachDB 可作为向量存储使用,支持原生 VECTOR 类型和 C-SPANN 分布式索引。
关键功能:
- 原生向量支持(v24.2+)
- 针对分布式系统优化的 C-SPANN 索引(v25.2+)
- 高级元数据过滤
- 前缀列多租户支持
- 水平可扩展性
查看 CockroachDB 向量存储文档 以获取详细用法。
快速示例:
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
# Initialize
engine = CockroachDBEngine.from_connection_string(
"cockroachdb://user:pass@host:26257/db"
)
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
)
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
)
# Use it
ids = await vectorstore.aadd_texts(["Hello world"])
results = await vectorstore.asimilarity_search("Hi", k=1)
聊天消息历史
将对话历史存储在 CockroachDB 中,用于持久化、分布式的聊天应用。
关键功能:
- 具有自动复制的分布式存储
- 强一致性(SERIALIZABLE)
- 基于会话的组织
- 高可用性
查看 CockroachDB 聊天历史文档 以获取详细用法。
快速示例:
from langchain_cockroachdb import CockroachDBChatMessageHistory
import uuid
chat_history = CockroachDBChatMessageHistory(
session_id=str(uuid.uuid4()),
connection_string=CONNECTION_STRING,
table_name="chat_history",
)
from langchain.messages import HumanMessage, AIMessage
await chat_history.aadd_message(HumanMessage(content="Hello!"))
await chat_history.aadd_message(AIMessage(content="Hi there!"))
messages = await chat_history.aget_messages()
LangGraph 检查点器
将 LangGraph 工作流状态持久化到 CockroachDB 中,用于短期记忆、人机交互和容错。
同时提供同步(CockroachDBSaver)和异步(AsyncCockroachDBSaver)实现。
首次使用 CockroachDB 检查点器时,请调用 checkpointer.setup()(或 await checkpointer.setup())以创建所需的表。
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import CockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# Example: "cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# checkpointer.setup()
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
for chunk in graph.stream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import AsyncCockroachDBSaver
model = init_chat_model(model="claude-haiku-4-5-20251001")
DB_URI = os.environ["COCKROACHDB_URI"]
# Example: "cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
async with AsyncCockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
# await checkpointer.setup()
async def call_model(state: MessagesState):
response = await model.ainvoke(state["messages"])
return {"messages": response}
builder = StateGraph(MessagesState)
builder.add_node(call_model)
builder.add_edge(START, "call_model")
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "hi! I'm bob"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
async for chunk in graph.astream(
{"messages": [{"role": "user", "content": "what's my name?"}]},
config,
stream_mode="values"
):
chunk["messages"][-1].pretty_print()
请参阅上面的 [获取您的 CockroachDB 连接字符串] 以了解连接选项,包括 CockroachDB Cloud(生产环境推荐)、Docker 和本地二进制安装。对于本地开发,sslmode=disable 是可接受的;在生产环境中始终使用 sslmode=verify-full。
检查点器使用原始 psycopg3 连接(而非 SQLAlchemy)以与 LangGraph 的检查点接口兼容。from_conn_string 工厂接受 cockroachdb:// URL 并自动转换它们。
行级 TTL(v0.2.1+)
使用 CockroachDB 的 行级 TTL 自动过期旧的检查点数据:
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
checkpointer.setup()
# Expire checkpoints older than 30 days, clean up daily
checkpointer.enable_ttl(ttl_interval="30 days", cron="@daily")
# Use the checkpointer normally -- old data is cleaned up automatically
graph = builder.compile(checkpointer=checkpointer)
# To disable TTL later:
# checkpointer.disable_ttl()
异步变体:await checkpointer.aenable_ttl(ttl_interval="7 days", cron="@hourly")
TTL 删除是最终一致的——过期的行在 CockroachDB 后台作业按指定 cron 计划运行之前仍可查询。
性能优化(v0.2.1+)
检查点器包含多项针对低延迟读取的优化:
- 批量获取:
list() 在所有 blob 和写入上使用 2 个批量查询,而不是每个检查点 2 个
- 原始 BYTEA:使用 psycopg3 二进制协议,而不是在 SQL 中对 blobs 进行十六进制编码
- 预编译语句:
from_conn_string() 启用服务器端查询计划缓存(prepare_threshold=5)
多租户
通过可选的命名空间列按租户隔离向量数据。启用后,所有 CRUD 和搜索操作都限定在指定的命名空间中。
CockroachDB 的 C-SPANN 索引支持前缀列,因此命名空间过滤直接使用向量索引,无需单独扫描。
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings
engine = CockroachDBEngine.from_connection_string(CONNECTION_STRING)
# Create the table with a namespace column
await engine.ainit_vectorstore_table(
table_name="documents",
vector_dimension=1536,
namespace_column="namespace",
)
# Create a vectorstore scoped to a specific tenant
vectorstore = AsyncCockroachDBVectorStore(
engine=engine,
embeddings=OpenAIEmbeddings(),
collection_name="documents",
namespace="tenant_a",
)
# All operations are scoped to tenant_a
ids = await vectorstore.aadd_texts(["Tenant A document"])
results = await vectorstore.asimilarity_search("query", k=5)
为什么为 AI 应用选择 CockroachDB?
设计即为分布式
- 水平可扩展性:添加节点以处理更多负载
- 多区域部署:以低延迟为全球用户提供服务
- 自动重新平衡:数据自动分布在节点之间
生产就绪的可靠性
- 高可用性:容忍节点、机架和数据中心故障
- 零停机升级:滚动更新无需停机
- 备份和恢复:时间点恢复
大规模向量搜索
- C-SPANN 索引:分布式近似最近邻搜索
- 原生向量类型:对嵌入的一等支持
- 实时索引:新向量无需重建
- 多租户:前缀列用于高效租户隔离
PostgreSQL 兼容性
- 轻松迁移:PostgreSQL 的即插即用替代品
- 熟悉的 SQL:标准 PostgreSQL 语法
- 现有工具:适用于 PostgreSQL 驱动程序和工具