消息队列允许用户快速连续发送多条消息,而无需等待智能体完成当前消息的处理。每条消息都会在服务器端排队并按顺序处理,让您对等待队列拥有完全的可见性和控制权。
为什么需要消息队列?
在典型的聊天界面中,用户必须等待智能体完成响应后才能发送下一条消息。这在以下几种场景中会造成不便:
- 批量提问:用户希望一次性提出五个相关问题,而不是等待每个答案
- 连续追问:在智能体仍在处理时提交澄清或额外上下文
- 自动化测试序列:以编程方式发送一系列提示以验证智能体行为
- 数据录入工作流:逐个输入结构化数据以供处理
消息队列通过立即接受所有提交并按顺序处理来解决这个问题。
工作原理
在底层,LangGraph 使用 multitaskStrategy: "enqueue" 来管理并发提交。当智能体已在处理时提交消息,该消息会被添加到服务器端队列。当前运行完成后,会自动选取下一个排队的消息。
useStream 钩子暴露了一个 queue 属性,提供对等待消息的实时可见性:
| 属性 | 类型 | 描述 |
|---|
queue.entries | QueueEntry[] | 所有等待队列条目的数组 |
queue.size | number | 当前队列中的条目数量 |
queue.cancel(id) | (id: string) => Promise<void> | 通过 ID 取消特定的排队条目 |
queue.clear() | () => Promise<void> | 取消所有排队条目 |
每个 QueueEntry 对象包含:
| 字段 | 类型 | 描述 |
|---|
id | string | 此队列条目的唯一标识符 |
values | object | 提交的输入值(包括消息) |
options | object | 提交时传递的任何额外选项 |
createdAt | string | 条目创建时的 ISO 时间戳 |
设置 useStream
定义一个与您的智能体状态模式匹配的 TypeScript 接口,并将其作为类型参数传递给 useStream,以便对状态值进行类型安全访问。在下面的示例中,将 typeof myAgent 替换为您的接口名称:
import type { BaseMessage } from "@langchain/core/messages";
interface AgentState {
messages: BaseMessage[];
}
import { useStream } from "@langchain/react";
function Chat() {
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "message_queue",
});
const handleSubmit = (text: string) => {
stream.submit({
messages: [{ type: "human", content: text }],
});
};
// 访问队列状态
const pendingCount = stream.queue.size;
const entries = stream.queue.entries;
return (
<div>
<MessageList messages={stream.messages} />
{pendingCount > 0 && <QueueList entries={entries} queue={stream.queue} />}
<ChatInput onSubmit={handleSubmit} />
</div>
);
}
显示队列
构建一个 QueueList 组件,显示每条等待消息及其取消按钮。这使用户能够看到正在等待的内容,并可以移除不再需要的项目。
function QueueList({ entries, queue }) {
return (
<div className="queue-panel">
<div className="queue-header">
<span>排队消息 ({entries.length})</span>
<button onClick={() => queue.clear()}>全部清除</button>
</div>
<ul className="queue-entries">
{entries.map((entry) => {
const text = entry.values?.messages?.[0]?.content ?? "未知";
return (
<li key={entry.id} className="queue-entry">
<span className="queue-text">{text}</span>
<span className="queue-time">
{new Date(entry.createdAt).toLocaleTimeString()}
</span>
<button
className="queue-cancel"
onClick={() => queue.cancel(entry.id)}
>
取消
</button>
</li>
);
})}
</ul>
</div>
);
}
显示每条排队消息的前几个字符作为预览,以便用户无需阅读完整消息即可快速识别要取消的项目。
取消排队消息
您有两个级别的取消操作:
取消单个条目
通过其 ID 从队列中移除特定消息。智能体将跳过它并处理下一个条目。
await queue.cancel(entryId);
清空整个队列
一次性移除所有等待消息。当用户改变上下文或想要重新开始时非常有用。
取消队列条目仅影响尚未开始处理的消息。如果智能体已经在处理某条消息,从队列中取消它不会产生影响。请使用 stream.stop() 来中断当前运行。
使用 onCreated 链式提交后续消息
onCreated 回调在创建新运行时触发,为您提供了一个以编程方式提交后续消息的钩子。这对于构建多步骤工作流非常有用,其中下一个问题取决于前一个提交是否被接受。
stream.submit(
{ messages: [{ type: "human", content: "什么是量子计算?" }] },
{
onCreated(run) {
console.log("运行已创建:", run.run_id);
// 链式提交后续问题
stream.submit({
messages: [{ type: "human", content: "给我一个简单的类比。" }],
});
},
}
);
这种模式会自然地填充队列。第一条消息立即开始处理,后续问题则排在其后。
开始新对话
当用户想要开始新的对话时,使用 switchThread(null) 来创建新线程。这将清除当前消息历史和队列。
function NewThreadButton() {
const stream = useStream<typeof myAgent>({ /* ... */ });
return (
<button onClick={() => stream.switchThread(null)}>
新对话
</button>
);
}
完整示例
将所有内容整合在一起,这是一个包含队列管理的完整聊天组件:
function QueueChat() {
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "message_queue",
});
const [input, setInput] = useState("");
const handleSubmit = () => {
if (!input.trim()) return;
stream.submit({
messages: [{ type: "human", content: input.trim() }],
});
setInput("");
};
return (
<div className="chat-container">
<header>
<h2>队列聊天</h2>
<button onClick={() => stream.switchThread(null)}>新对话</button>
</header>
<div className="messages">
{stream.messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
{stream.isLoading && <TypingIndicator />}
</div>
{stream.queue.size > 0 && (
<div className="queue-panel">
<strong>排队中 ({stream.queue.size})</strong>
<button onClick={() => stream.queue.clear()}>全部清除</button>
{stream.queue.entries.map((entry) => (
<div key={entry.id} className="queue-item">
<span>{entry.values?.messages?.[0]?.content}</span>
<button onClick={() => stream.queue.cancel(entry.id)}>×</button>
</div>
))}
</div>
)}
<form onSubmit={(e) => { e.preventDefault(); handleSubmit(); }}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="输入消息(您可以发送多条!)"
/>
<button type="submit">发送</button>
</form>
</div>
);
}
最佳实践
- 限制队列大小:虽然客户端没有硬性的队列大小限制,但请注意非常大的队列可能会降低用户体验。当队列超过合理阈值(例如 10 个项目)时,考虑显示警告。
- 显示队列位置:为每个排队项目编号,以便用户了解处理顺序。
- 保持输入焦点:提交后保持输入字段聚焦,以便用户可以立即输入下一条消息。
- 动画过渡:当项目开始处理时,平滑地将其从队列面板移动到消息列表中。
- 优雅处理错误:如果排队消息失败,在不阻塞后续队列条目的情况下显示错误。
- 对快速提交进行防抖:对于自动化或编程式提交,在消息之间添加小延迟,以避免服务器过载。