9. 流式输出——当 Agent 一边想一边说
LLM 流式输出不难,Agent 级别的流式输出才是分水岭。这篇我们给 Agent 加上 streaming 能力——让用户在字符逐行出现的时候就看见 Agent 在"思考",在调用工具的间隙看到状态变化,而不是对着一个空转的 spinner 发呆。
前八篇文章走过来,我们的 Agent 能跑完整的 Tool Calling 循环了。你给它一个问题,它在后台调 LLM、执行工具、再调 LLM、再执行工具,直到给出最终答案。整个流程是可靠的,测试全绿的。
但有一个体验问题:用户一直在等。
你问 Agent "请帮我查一下最近三个月的销售数据,然后和去年同期对比,再生成一份报告",Agent 可能要和 LLM 来回交互三四轮。在这段时间里,用户看到的是什么呢?
是一个旋转的 loading 动画。或者是一个冰冷的"正在思考..."。
用户不知道 Agent 是在调工具还是卡住了,不知道它已经查完了销售数据正在对比,更不知道最终报告长什么样。信息真空会放大焦虑。
流式输出解决的不是技术问题,是信任问题。当用户看到字符逐行出现,看到"正在查询数据库...查询完成,结果是 12345 条记录...正在生成报告...",他就有安全感。
流式的两个层次
很多初学 Agent 的人以为流式输出就是"调用 LLM 的 stream API"。这是对的,但只对了一半。
完整的 Agent 流式输出包含两个层次:
LLM 层流式。LLM 不是一次性吐出整段回答,而是逐 token 输出。OpenAI 的 SSE(Server-Sent Events)流、Anthropic 的流式 API,都是这个层次。你在 ChatGPT 网页上看到字符逐行出现的效果,就是 LLM 层流式。
Agent 层流式。Agent 不只是把 LLM 的输出逐字转发。它还包含了工具调用的开始和结束事件、工具执行的结果、迭代轮次的变化。比如 Agent 说"让我查一下数据库",这个"说"的过程是 LLM 流式输出的,但"查完数据库了,结果是 X"这个事件是在工具执行完成后才发生的,它和 LLM 流式输出是两回事。
我们的 Provider 层在第 6 篇文章就已经支持了 LLM 层流式——OpenAIProvider.completeStream() 已经完整实现了 SSE 解析和逐块回调。但 Agent 层一直用的是 complete(),没有把流式能力暴露给调用方。
这一篇要做的,就是把这两个层次打通。
事件模型的设计
先想清楚一个问题:Agent 在流式执行过程中,调用方(前端、CLI、其他程序)需要知道哪些信息?
逐条梳理:
LLM 的文本输出。这是最直观的。用户要看到字符逐行出现。对应 contentDelta 事件。
工具调用。当 LLM 决定调工具时,用户应该知道——而不是突然看到一个 loading。对应 toolCall 事件,携带工具名称和参数。
工具执行结果。工具执行完了,结果是什么。对应 toolResult 事件。
迭代轮次。Agent 在一轮 Tool Calling 循环中完成了哪些步骤。对应 iteration 事件。
全部完成。最终结果。对应 done 事件,携带完整的 AgentResult。
把这些放进一个联合类型:
export interface AgentStreamEvent {
contentDelta?: string;
toolCall?: {
id: string;
name: string;
args: Record<string, unknown>;
};
toolResult?: {
toolCallId: string;
toolName: string;
content: string;
isError: boolean;
};
iteration?: number;
done?: AgentResult;
error?: string;
}
所有字段都是可选的。一个事件就是一个 判别式合取(discriminatory conjunction)——检查哪个字段非空,就知道是什么类型的事件。
你可能注意到,没有 toolCallArgsDelta。也就是说,LLM 在流式输出过程中参数也是逐 token 拼接的,但我没有在 Agent 层暴露这个细节。
这是一个有意的简化。LLM 流式输出 tool_calls 时,参数是分块到达的——先来 {",再来 "expr,再来 ession……如果把这些增量暴露给调用方,调用方要自己维护一个按 index 分组的参数缓冲区。这个复杂度不应该推给上层。
我的选择是:在 Provider 层做好参数拼接,等到参数完整了再以一个 toolCall 事件通知上层。调用方只需要展示"Agent 正在调 XXX 工具,参数是 YYY"就够了。
在 Agent 接口上暴露 stream()
接口设计很直接——和 run() 并排的一个新方法:
export interface Agent {
readonly config: AgentConfig;
hooks?: AgentHooks;
run(input: string | Message[]): Promise<AgentResult>;
stream(
input: string | Message[],
onEvent: (event: AgentStreamEvent) => void
): Promise<AgentResult>;
reset(): void;
}
stream() 返回的还是 AgentResult。这是故意的——onEvent 负责中间事件,返回值负责最终结果。调用方可以只处理 done 事件拿到结果,也可以从返回值拿。双保险。
与 run() 相比,stream() 只是多了两个变化:
1. 调用 completeStream 而不是 complete
2. 在关键节点触发 onEvent
这两个变化被封装在同一套执行循环里:
// DefaultAgent 内部
private async executeLoop(
input: string | Message[],
streaming: boolean,
onEvent?: (event: AgentStreamEvent) => void
): Promise<AgentResult> {
// 消息组装:从 Session 和 ShortTermMemory 加载已有消息,
// 在最前面插入系统提示词,再追加用户输入。
// 这套逻辑和 run() 完全一致。
while (iterations < maxIterations) {
// ...
if (streaming && onEvent) {
response = await provider.completeStream(
{ messages, model, tools, ... },
(chunk) => {
if (chunk.contentDelta) {
onEvent({ contentDelta: chunk.contentDelta });
}
}
);
} else {
response = await provider.complete(...);
}
// ... 同样的工具执行逻辑 ...
// 推送 iteration 事件(在第几轮迭代结束时触发)
if (streaming && onEvent) {
onEvent({ iteration: iterations });
}
if (response.toolCalls && response.toolCalls.length > 0) {
// 推送 toolCall 事件
if (streaming && onEvent) {
for (const tc of response.toolCalls) {
onEvent({ toolCall: { id: tc.id, name: tc.name, args: tc.args } });
}
}
// 执行工具(串行或并行)
await this.executeToolCalls(toolCalls, messages, streaming, onEvent);
}
}
// 推送 done 事件
if (streaming && onEvent) {
onEvent({ done: result });
}
return result;
}
注意,工具执行的 executeToolCallsSequential 和 executeToolCallsParallel 都接受了可选的 streaming 和 onEvent 参数。在工具执行完成后,它们会推送 toolResult 事件:
// 在 executeToolCallsSequential 内部
const result = await tool.execute(args);
messages.push({
role: 'tool',
content: result.output,
toolCallId: tc.id,
toolName: tc.name,
});
if (streaming && onEvent) {
onEvent({
toolResult: {
toolCallId: tc.id,
toolName: tc.name,
content: result.output,
isError: !result.success,
},
});
}
这样,调用方看到的完整事件流就是原生的、按时间顺序排列的:
toolCall → toolResult → iteration → contentDelta → iteration → done
第一轮调工具、等结果、完成一轮;第二轮 LLM 直接输出文本、完成一轮;全部完成。
事件流在真实场景中长什么样
单独看事件定义不过瘾。我们拿一个实际场景跑一遍。
用户说:"帮我查一下今年第一季度的销售额,然后和去年同期的数据对比。"
Agent 的事件流可能是这样的:
事件 1: toolCall → { name: "query_sales", args: { quarter: "Q1", year: 2026 } }
事件 2: toolResult → { content: "2026 Q1 销售额:1,250,000 元", isError: false }
事件 3: iteration → 1
事件 4: toolCall → { name: "query_sales", args: { quarter: "Q1", year: 2025 } }
事件 5: toolResult → { content: "2025 Q1 销售额:1,100,000 元", isError: false }
事件 6: iteration → 2
事件 7: contentDelta → "2026 年第一季度销售额为 125 万元,"
事件 8: contentDelta → "较去年同期的 110 万元增长了"
事件 9: contentDelta → " 13.6%。整体趋势向好。"
事件 10: iteration → 3
事件 11: done → { output: "2026 年第一季度销售额为..." }
前端接收到这些事件后,可以做很多事情:
第 1-2 事件之间:显示"正在查询 2026 年 Q1 数据..."
第 3-4 事件之间:显示"查询完成,正在对比去年数据..."
第 7-9 事件之间:逐字展示 LLM 生成的报告
第 11 事件:隐藏所有中间状态,显示最终回复
用户看到的不是空转的 spinner,而是 Agent 的工作进度。这是 Agent 与普通 API 最大的体验差异。
Provider 层已经就绪
你可能注意到,整个实现过程中我几乎没有碰 OpenAIProvider。
那是因为第 6 篇文章实现 Provider 时已经预留了 completeStream 方法。当时怎么看都是超前设计——谁会在一开始就用流式输出呢?
现在看,这个决策的价值体现出来了。OpenAIProvider 里的流式实现使用浏览器原生 fetch API 的 ReadableStream,逐行解析 SSE 数据流:
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith('data: ')) continue;
const data = trimmed.slice(6);
if (data === '[DONE]') continue;
const chunk = JSON.parse(data);
const delta = chunk.choices?.[0]?.delta;
// ... 提取 contentDelta 和 tool_calls ...
}
}
注意这里的一个细节:buffer 的跨行处理。SSE 流的数据是按事件边界分割的,但网络传输可能在一个 chunk 中间切分。lines 数组的最后一项是不完整的行,留在 buffer 中与下一个 chunk 拼接。这是 SSE 解析的标准做法,漏掉这一步在弱网环境下会有 parse error。
completeStream 的参数 onChunk 是一个回调函数。每解析出一个合法的 SSE chunk,就调用一次回调,把当前的增量推给 Agent 层。
这样,Agent 层不需要关心 SSE 解析、不需要关心 [DONE] 标记、不需要关心 buffer 拼接。Provider 层把这些复杂性全部都消化了。
为什么不把 streaming 放到 hooks 里
你可能会有疑问:为什么不用现有的 hooks 机制来实现流式输出?比如加一个 onStreamChunk hook。
我之前也考虑过。但很快否定了。原因是:hooks 是生命周期事件,streaming 是数据流。它们是两回事。
hooks 的设计目的是"在某个点做额外的事情"——记录日志、发送指标、修改消息。调用方注册 hook 时是在说"顺便帮我做一下这个"。
streaming 不同。onEvent 是 Agent 的主要输出通道。调用方依赖事件流来渲染 UI。如果把事件流放到 hooks 里,语义就变味了——明明是核心功能,看起来却像是个可选的插件。更糟糕的是,调用方看 AgentHooks 接口时,无法区分哪些是必须实现的(streaming)、哪些是可选的(logging/metrics)。清晰的 API 设计应该让两者在接口层面就分开。
所以我选择的方法很朴素:stream() 是一个独立方法,和 run() 平级。两者共享同一套执行循环,只是 LLM 调用方式和事件推送方式不同。streaming 是 Agent 接口的原生能力,不是挂在边上的插件。
测试覆盖
六个流式测试:
it('直接回答时应该推送 contentDelta 事件和 done 事件', ...);
it('工具调用时应该推送 toolCall 和 toolResult 事件', ...);
it('工具不存在时应该推送错误 toolResult', ...);
it('多次迭代时事件应该按自然顺序排列', ...);
it('maxIterations 限制在流式模式下同样生效', ...);
it('stream() 返回的 AgentResult 字段齐全', ...);
第二个测试验证了事件类型齐全:toolCall、toolResult、iteration、done 全部出现,且顺序正确。
第四个测试用工具多次迭代验证事件顺序:先 toolCall、再 toolResult、再迭代完成,最后 done。
第五个测试验证了 maxIterations 在流式模式下和 run() 行为一致。
最后一个测试验证了 done 事件携带的 AgentResult 和 stream() 返回值一致——这个测试曾经暴露过一个 bug:done 事件在返回值组装之前就发出了,导致 done.result 缺少部分字段。后来我修了。
不出现在这里的:什么是(还)不做的
这一篇有意没有做两件事。
不做客户端 SSE 推送。我不会在 Agent 内部实现一个 HTTP SSE 端点。因为这不是 Agent 运行时的职责——它属于 API 网关或应用框架层。Agent 只负责产生事件流,怎么传输给前端是上层的事。
不做 tool_calls 的流式参数展示。前面解释过了——Provider 层已经拼接好了完整的参数,Agent 层不再拆分。如果你需要展示"LLM 正在构造工具调用的参数",那应该在 Provider 层加一个 onToolCallArgDelta 回调,而不是在 Agent 层。
下一篇预告
九篇文章,从零搭出来的 Agent 运行时核心已经具备了骨架、血肉和灵魂。项目能不能交付给别人用、能不能作为 npm 包发布出去,取决于最后一篇。
下一篇,我们做测试和打包发布。单元测试的组织、集成测试的边界、npm 包的构建和发布流程——让这个项目成为真正可交付的产品。
评论
发表评论