Skip to main content
CockroachDB 是一个基于事务性和强一致性键值存储构建的分布式 SQL 数据库。它支持水平扩展,能够容忍磁盘、机器、机架甚至数据中心故障,且延迟干扰极小,无需人工干预。
关键功能:
  • 分布式 SQL:在保持 ACID 保证的同时进行横向扩展
  • 原生向量支持:内置 VECTOR 类型(v24.2+)和 C-SPANN 索引(v25.2+)
  • 兼容 PostgreSQL:PostgreSQL 应用的即插即用替代品
  • 全球复制:多区域部署,低延迟
  • 自动分片:数据自动分布在节点之间
  • 可串行化隔离:默认采用最强的隔离级别

安装和设置

安装 LangChain 集成:
pip install langchain-cockroachdb

获取您的 CockroachDB 连接字符串

您需要一个 CockroachDB 集群。选择一种选项: 选项 1:CockroachDB Cloud(推荐)
  1. cockroachlabs.cloud 注册
  2. 创建一个免费集群
  3. 获取您的连接字符串:cockroachdb://user:pass@host:26257/db?sslmode=verify-full
选项 2:Docker(开发)
docker run -d --name cockroachdb -p 26257:26257 \
  cockroachdb/cockroach:latest start-single-node --insecure
连接字符串:cockroachdb://root@localhost:26257/defaultdb?sslmode=disable 选项 3:本地二进制文件 cockroachlabs.com/docs/releases 下载

集成

向量存储

CockroachDB 可作为向量存储使用,支持原生 VECTOR 类型和 C-SPANN 分布式索引。 关键功能:
  • 原生向量支持(v24.2+)
  • 针对分布式系统优化的 C-SPANN 索引(v25.2+)
  • 高级元数据过滤
  • 前缀列多租户支持
  • 水平可扩展性
查看 CockroachDB 向量存储文档 以获取详细用法。 快速示例:
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings

# Initialize
engine = CockroachDBEngine.from_connection_string(
    "cockroachdb://user:pass@host:26257/db"
)

await engine.ainit_vectorstore_table(
    table_name="documents",
    vector_dimension=1536,
)

vectorstore = AsyncCockroachDBVectorStore(
    engine=engine,
    embeddings=OpenAIEmbeddings(),
    collection_name="documents",
)

# Use it
ids = await vectorstore.aadd_texts(["Hello world"])
results = await vectorstore.asimilarity_search("Hi", k=1)

聊天消息历史

将对话历史存储在 CockroachDB 中,用于持久化、分布式的聊天应用。 关键功能:
  • 具有自动复制的分布式存储
  • 强一致性(SERIALIZABLE)
  • 基于会话的组织
  • 高可用性
查看 CockroachDB 聊天历史文档 以获取详细用法。 快速示例:
from langchain_cockroachdb import CockroachDBChatMessageHistory
import uuid

chat_history = CockroachDBChatMessageHistory(
    session_id=str(uuid.uuid4()),
    connection_string=CONNECTION_STRING,
    table_name="chat_history",
)

from langchain.messages import HumanMessage, AIMessage

await chat_history.aadd_message(HumanMessage(content="Hello!"))
await chat_history.aadd_message(AIMessage(content="Hi there!"))

messages = await chat_history.aget_messages()

LangGraph 检查点器

将 LangGraph 工作流状态持久化到 CockroachDB 中,用于短期记忆、人机交互和容错。 同时提供同步(CockroachDBSaver)和异步(AsyncCockroachDBSaver)实现。
首次使用 CockroachDB 检查点器时,请调用 checkpointer.setup()(或 await checkpointer.setup())以创建所需的表。
import os
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, MessagesState, START
from langchain_cockroachdb import CockroachDBSaver

model = init_chat_model(model="claude-haiku-4-5-20251001")

DB_URI = os.environ["COCKROACHDB_URI"]
# Example: "cockroachdb://user:password@host:26257/defaultdb?sslmode=verify-full"
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
    # checkpointer.setup()

    def call_model(state: MessagesState):
        response = model.invoke(state["messages"])
        return {"messages": response}

    builder = StateGraph(MessagesState)
    builder.add_node(call_model)
    builder.add_edge(START, "call_model")

    graph = builder.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": "1"}}

    for chunk in graph.stream(
        {"messages": [{"role": "user", "content": "hi! I'm bob"}]},
        config,
        stream_mode="values"
    ):
        chunk["messages"][-1].pretty_print()

    for chunk in graph.stream(
        {"messages": [{"role": "user", "content": "what's my name?"}]},
        config,
        stream_mode="values"
    ):
        chunk["messages"][-1].pretty_print()
请参阅上面的 [获取您的 CockroachDB 连接字符串] 以了解连接选项,包括 CockroachDB Cloud(生产环境推荐)、Docker 和本地二进制安装。对于本地开发,sslmode=disable 是可接受的;在生产环境中始终使用 sslmode=verify-full
检查点器使用原始 psycopg3 连接(而非 SQLAlchemy)以与 LangGraph 的检查点接口兼容。from_conn_string 工厂接受 cockroachdb:// URL 并自动转换它们。

行级 TTL(v0.2.1+)

使用 CockroachDB 的 行级 TTL 自动过期旧的检查点数据:
with CockroachDBSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()

    # Expire checkpoints older than 30 days, clean up daily
    checkpointer.enable_ttl(ttl_interval="30 days", cron="@daily")

    # Use the checkpointer normally -- old data is cleaned up automatically
    graph = builder.compile(checkpointer=checkpointer)

    # To disable TTL later:
    # checkpointer.disable_ttl()
异步变体:await checkpointer.aenable_ttl(ttl_interval="7 days", cron="@hourly")
TTL 删除是最终一致的——过期的行在 CockroachDB 后台作业按指定 cron 计划运行之前仍可查询。

性能优化(v0.2.1+)

检查点器包含多项针对低延迟读取的优化:
  • 批量获取list() 在所有 blob 和写入上使用 2 个批量查询,而不是每个检查点 2 个
  • 原始 BYTEA:使用 psycopg3 二进制协议,而不是在 SQL 中对 blobs 进行十六进制编码
  • 预编译语句from_conn_string() 启用服务器端查询计划缓存(prepare_threshold=5

多租户

通过可选的命名空间列按租户隔离向量数据。启用后,所有 CRUD 和搜索操作都限定在指定的命名空间中。 CockroachDB 的 C-SPANN 索引支持前缀列,因此命名空间过滤直接使用向量索引,无需单独扫描。
from langchain_cockroachdb import AsyncCockroachDBVectorStore, CockroachDBEngine
from langchain_openai import OpenAIEmbeddings

engine = CockroachDBEngine.from_connection_string(CONNECTION_STRING)

# Create the table with a namespace column
await engine.ainit_vectorstore_table(
    table_name="documents",
    vector_dimension=1536,
    namespace_column="namespace",
)

# Create a vectorstore scoped to a specific tenant
vectorstore = AsyncCockroachDBVectorStore(
    engine=engine,
    embeddings=OpenAIEmbeddings(),
    collection_name="documents",
    namespace="tenant_a",
)

# All operations are scoped to tenant_a
ids = await vectorstore.aadd_texts(["Tenant A document"])
results = await vectorstore.asimilarity_search("query", k=5)

为什么为 AI 应用选择 CockroachDB?

设计即为分布式

  • 水平可扩展性:添加节点以处理更多负载
  • 多区域部署:以低延迟为全球用户提供服务
  • 自动重新平衡:数据自动分布在节点之间

生产就绪的可靠性

  • 高可用性:容忍节点、机架和数据中心故障
  • 零停机升级:滚动更新无需停机
  • 备份和恢复:时间点恢复

大规模向量搜索

  • C-SPANN 索引:分布式近似最近邻搜索
  • 原生向量类型:对嵌入的一等支持
  • 实时索引:新向量无需重建
  • 多租户:前缀列用于高效租户隔离

PostgreSQL 兼容性

  • 轻松迁移:PostgreSQL 的即插即用替代品
  • 熟悉的 SQL:标准 PostgreSQL 语法
  • 现有工具:适用于 PostgreSQL 驱动程序和工具

资源

支持