加入与重新加入功能允许你在不停止智能体的情况下断开与正在运行的智能体流的连接,稍后再重新连接。当客户端离开时,智能体在服务器端继续执行,而你可以在离开时的确切位置重新接入流。
为什么需要加入与重新加入?
传统的流式 API 将客户端和服务器紧密耦合:如果客户端断开连接,流就会丢失。加入与重新加入打破了这种耦合,实现了几个重要的模式:
- 网络中断:在蜂窝基站或 Wi-Fi 网络之间移动的移动用户可以无缝恢复
- 页面导航:用户离开聊天页面并在稍后返回,不会丢失进度
- 移动端后台运行:被操作系统挂起的应用可以在回到前台时重新加入流
- 长时间运行的任务:智能体执行多分钟的操作(研究、代码生成、数据分析),用户无需保持页面打开
- 多设备切换:在手机上开始对话,在桌面上重新加入
核心概念
加入/重新加入模式涉及三个关键机制:
| 方法 / 选项 | 用途 |
|---|
stream.stop() | 断开客户端与流的连接,但不停止智能体 |
stream.joinStream(runId) | 通过运行 ID 重新连接到现有流 |
onDisconnect: "continue" | 提交选项,告诉服务器在客户端断开连接后继续运行 |
streamResumable: true | 提交选项,使流可以在稍后被重新加入 |
stream.stop() 与取消运行有根本区别。停止仅断开客户端的连接。智能体在服务器端继续处理。要实际取消智能体的执行,应使用中断或取消机制。
设置 useStream
关键设置步骤是从 onCreated 回调中捕获 run_id,以便稍后重新加入。
定义一个与你的智能体状态模式匹配的 TypeScript 接口,并将其作为类型参数传递给 useStream,以便对状态值进行类型安全访问。在下面的示例中,将 typeof myAgent 替换为你的接口名称:
import type { BaseMessage } from "@langchain/core/messages";
interface AgentState {
messages: BaseMessage[];
}
import { useStream } from "@langchain/react";
import { useState } from "react";
function Chat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
return (
<div>
<ConnectionStatus connected={isConnected} />
<MessageList messages={stream.messages} />
<ChatControls
stream={stream}
savedRunId={savedRunId}
isConnected={isConnected}
/>
</div>
);
}
使用可恢复选项提交
提交消息时,传递 onDisconnect: "continue" 和 streamResumable: true 以启用加入/重新加入流程:
stream.submit(
{ messages: [{ type: "human", content: text }] },
{
onDisconnect: "continue",
streamResumable: true,
}
);
| 选项 | 默认值 | 描述 |
|---|
onDisconnect | "cancel" | 客户端断开连接时发生的情况。"continue" 保持智能体运行;"cancel" 停止它。 |
streamResumable | false | 当为 true 时,服务器保留流状态,以便客户端稍后可以重新加入。 |
始终同时使用这两个选项。设置 onDisconnect: "continue" 而不设置 streamResumable: true 意味着智能体继续运行,但你无法重新加入流以查看其输出。
断开与流的连接
调用 stream.stop() 以断开客户端连接。智能体在服务器端继续处理。
调用 stop() 后:
stream.isLoading 变为 false
- 消息列表保留到断开连接点为止收到的所有消息
- 智能体在服务器上继续运行
- 在重新加入之前不会收到新消息
重新加入流
使用保存的运行 ID 调用 stream.joinStream(runId) 以重新连接:
stream.joinStream(savedRunId);
重新加入后:
stream.isLoading 再次变为 true
- 断开连接期间生成的任何消息都会被传递
- 新的流式消息实时恢复
- 如果智能体已经完成,你会立即收到最终状态
构建连接状态指示器
视觉指示器帮助用户了解他们是否正在主动接收来自智能体的更新。
function ConnectionStatus({ connected }: { connected: boolean }) {
return (
<div className="connection-status">
<span
className={`status-dot ${connected ? "connected" : "disconnected"}`}
/>
<span className="status-text">
{connected ? "已连接" : "已断开连接"}
</span>
</div>
);
}
使用绿色/红色圆点样式化指示器:
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
display: inline-block;
margin-right: 6px;
}
.status-dot.connected {
background-color: #22c55e;
box-shadow: 0 0 4px #22c55e;
}
.status-dot.disconnected {
background-color: #ef4444;
box-shadow: 0 0 4px #ef4444;
}
断开连接和重新加入控件
提供明确的断开连接和重新加入按钮,使用户拥有完全控制权:
function ChatControls({ stream, savedRunId, isConnected }) {
const [input, setInput] = useState("");
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="controls">
<div className="input-row">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="输入消息..."
onKeyDown={(e) => e.key === "Enter" && handleSend()}
/>
<button onClick={handleSend}>发送</button>
</div>
<div className="stream-controls">
{isConnected ? (
<button onClick={() => stream.stop()} className="disconnect-btn">
断开连接
</button>
) : (
savedRunId && (
<button
onClick={() => stream.joinStream(savedRunId)}
className="rejoin-btn"
>
重新加入流
</button>
)
)}
</div>
</div>
);
}
持久化运行 ID
对于跨会话重新加入(例如,用户关闭浏览器并在稍后返回),将运行 ID 持久化到存储中:
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
localStorage.setItem("activeRunId", run.run_id);
},
});
// 页面加载时,检查是否有活动的运行
const existingRunId = localStorage.getItem("activeRunId");
if (existingRunId) {
stream.joinStream(existingRunId);
}
持久化的运行 ID 应在运行完成时清理。监听流完成并移除存储的 ID,以避免尝试重新加入已完成的运行。
错误处理
如果运行已过期、被删除或服务器已重启,重新加入可能会失败。优雅地处理这些情况:
try {
stream.joinStream(savedRunId);
} catch (error) {
console.error("重新加入流失败:", error);
// 清除过期的运行 ID 并通知用户
setSavedRunId(null);
localStorage.removeItem("activeRunId");
}
完整示例
function JoinRejoinChat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const [input, setInput] = useState("");
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="chat-container">
<header>
<h2>加入与重新加入演示</h2>
<ConnectionStatus connected={isConnected} />
</header>
<div className="messages">
{stream.messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
</div>
<div className="controls">
<form onSubmit={(e) => { e.preventDefault(); handleSend(); }}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="输入消息..."
/>
<button type="submit">发送</button>
</form>
<div className="stream-actions">
{isConnected ? (
<button onClick={() => stream.stop()}>
断开连接
</button>
) : (
savedRunId && (
<button onClick={() => stream.joinStream(savedRunId)}>
重新加入流
</button>
)
)}
</div>
</div>
</div>
);
}
最佳实践
- 始终保存运行 ID:没有它,重新加入是不可能的。同时使用组件状态和持久化存储以提高弹性。
- 显示清晰的连接状态:用户应始终知道他们是在接收实时更新还是在查看快照。
- 在可见性变化时自动重新加入:使用页面可见性 API 在用户返回标签页时自动重新加入。
- 设置合理的超时:如果重新加入尝试耗时过长,则回退到获取线程历史记录。
- 清理已完成的运行:当智能体完成时,移除持久化的运行 ID,以避免尝试重新加入过期的运行。