第 03 章
流式处理
流式处理 — 实时看 AI 在干什么
这章学会用 runStreamed() 实时接收 AI 的每一步动作,而不是傻等到最后。
为什么需要流式?
上一章用的 run() 有个问题:AI 可能要干 30 秒甚至更久,这段时间你的程序就卡在那里等,用户看到的是一片空白。
打个比方:
run()像点外卖 — 下完单就等着,菜全做完了才一起送到runStreamed()像看后厨直播 — 厨师切菜你看得到,炒菜你也看得到,边做边上
在做用户界面或 CLI 工具时,流式处理是必须的。谁也不想盯着一个空白屏幕干等。
run() vs runStreamed() 对比
| 特性 | run() |
runStreamed() |
|---|---|---|
| 返回时机 | 全部完成后一次返回 | 立即返回,事件一个个来 |
| 返回类型 | Promise<RunResult> |
Promise<{ events: AsyncGenerator<ThreadEvent> }> |
| 适合场景 | 后台任务、不需要实时反馈 | CLI 工具、UI 界面、需要进度展示 |
| 复杂度 | 简单,3 行代码 | 稍复杂,需要处理事件循环 |
第一个流式程序
typescript// streaming-basic.ts
import { Codex } from "@openai/codex-sdk";
import type { ThreadEvent } from "@openai/codex-sdk";
async function main() {
const codex = new Codex();
const thread = codex.startThread({ skipGitRepoCheck: true });
console.log("🚀 开始流式请求...\n");
const { events } = await thread.runStreamed("用三句话解释什么是递归");
for await (const event of events) {
console.log(`📨 [${event.type}]`);
if (event.type === "item.completed" && event.item.type === "agent_message") {
console.log(`\n🤖 AI 说: ${event.item.text}`);
}
if (event.type === "turn.completed") {
console.log(`\n📊 Token 消耗: 输入 ${event.usage.input_tokens}, 输出 ${event.usage.output_tokens}`);
}
}
console.log("\n✅ 完成!");
}
main().catch(console.error);运行:npx tsx streaming-basic.ts
你会看到事件一个一个蹦出来:
🚀 开始流式请求...
📨 [thread.started]
📨 [turn.started]
📨 [item.started]
📨 [item.completed]
🤖 AI 说: 递归就是一个函数自己调用自己...
📨 [turn.completed]
📊 Token 消耗: 输入 128, 输出 67
✅ 完成!事件类型全览
事件按时间顺序出现,形成一个清晰的生命周期:
thread.started → turn.started → [item.started → item.updated* → item.completed]* → turn.completed每种事件的含义:
| 事件类型 | 什么时候触发 | 携带数据 | 你通常要做什么 |
|---|---|---|---|
thread.started |
线程创建成功 | thread_id |
保存 ID(用于后续恢复) |
turn.started |
一轮对话开始 | 无 | 显示"思考中..." |
item.started |
AI 开始做一件事 | ThreadItem(状态 in_progress) |
显示"正在执行..." |
item.updated |
进度更新 | ThreadItem(中间状态) |
更新进度条/待办列表 |
item.completed |
一件事做完了 | ThreadItem(最终状态) |
显示结果 |
turn.completed |
这轮对话结束 | usage(token 统计) |
统计消耗、标记完成 |
turn.failed |
这轮对话失败 | error: { message } |
显示错误信息 |
error |
致命错误 | message |
报错退出 |
关键理解:一个 turn 里可能有多个 item。比如 AI 决定先执行一个命令,再根据命令结果给你回复,那就是两个 item(一个 command_execution + 一个 agent_message)。
实战:做一个实时进度显示器
这个例子处理所有常见事件,用不同的图标和颜色展示 AI 的每一步动作:
typescript// progress.ts
import { Codex } from "@openai/codex-sdk";
import type { ThreadEvent, ThreadItem } from "@openai/codex-sdk";
function handleItemCompleted(item: ThreadItem): void {
switch (item.type) {
case "agent_message":
console.log(`\n💬 AI 回复:\n${item.text}`);
break;
case "reasoning":
console.log(`\n🧠 思考过程: ${item.text}`);
break;
case "command_execution": {
const exitInfo = item.exit_code !== undefined ? ` (退出码: ${item.exit_code})` : "";
const icon = item.status === "completed" ? "✅" : "❌";
console.log(`\n${icon} 执行命令: ${item.command}${exitInfo}`);
if (item.aggregated_output) {
console.log(` 输出: ${item.aggregated_output.slice(0, 200)}${item.aggregated_output.length > 200 ? "..." : ""}`);
}
break;
}
case "file_change":
for (const change of item.changes) {
const icon = { add: "📄", update: "✏️", delete: "🗑️" }[change.kind];
console.log(`${icon} 文件${change.kind === "add" ? "新建" : change.kind === "update" ? "修改" : "删除"}: ${change.path}`);
}
break;
case "mcp_tool_call":
console.log(`🔧 MCP 工具调用: ${item.server}/${item.tool} → ${item.status}`);
break;
case "web_search":
console.log(`🔍 Web 搜索: ${item.query}`);
break;
case "error":
console.log(`⚠️ 非致命错误: ${item.message}`);
break;
}
}
function handleItemUpdated(item: ThreadItem): void {
if (item.type === "todo_list") {
console.log("\n📋 AI 的工作计划:");
for (const todo of item.items) {
console.log(` ${todo.completed ? "✅" : "⬜"} ${todo.text}`);
}
}
}
function handleEvent(event: ThreadEvent): void {
switch (event.type) {
case "thread.started":
console.log(`🆔 线程 ID: ${event.thread_id}`);
break;
case "turn.started":
console.log("\n⏳ AI 开始工作...");
break;
case "item.started":
// item 开始时不打印太多,等 completed 再显示详情
break;
case "item.updated":
handleItemUpdated(event.item);
break;
case "item.completed":
handleItemCompleted(event.item);
break;
case "turn.completed":
console.log(`\n📊 消耗: 输入 ${event.usage.input_tokens} | 缓存 ${event.usage.cached_input_tokens} | 输出 ${event.usage.output_tokens} tokens`);
break;
case "turn.failed":
console.error(`\n❌ 失败: ${event.error.message}`);
break;
}
}
async function main() {
const codex = new Codex();
const thread = codex.startThread();
const prompt = process.argv[2] || "看看当前目录有什么文件,简单介绍一下这个项目";
console.log(`📝 提问: ${prompt}\n`);
const { events } = await thread.runStreamed(prompt);
for await (const event of events) {
handleEvent(event);
}
console.log("\n✅ 全部完成!");
}
main().catch(console.error);把它保存为 progress.ts,在任意 Git 项目里运行:
bashnpx tsx progress.ts "帮我看看这个项目的 package.json 有什么依赖"你会看到 AI 的每一步操作实时展示:先执行 cat package.json 命令,然后给你分析结果。
async generator 小知识
runStreamed() 返回的 events 是一个 async generator(异步生成器)。如果你没用过这个语法,这里快速过一下:
基本用法
typescriptfor await (const event of events) {
// 每来一个事件,这里执行一次
}
// 所有事件处理完,循环结束提前退出
typescriptfor await (const event of events) {
if (event.type === "turn.failed") {
console.error("出错了,提前退出");
break; // 跳出循环,不再处理后续事件
}
}错误处理
typescripttry {
for await (const event of events) {
handleEvent(event);
}
} catch (err) {
console.error("事件流出错:", err);
}收集所有事件
typescript// 如果你既想流式处理,又想最后拿到所有事件
const allEvents: ThreadEvent[] = [];
for await (const event of events) {
allEvents.push(event);
handleEvent(event); // 实时处理
}
// 这里 allEvents 包含了所有事件小结
run()等全部完成再返回,runStreamed()实时推送事件- 事件生命周期:
thread.started → turn.started → item.* → turn.completed - 一个 turn 里可能有多个 item(命令执行、文件修改、最终回复等)
- 用
for await...of遍历事件流 - 流式处理是做 CLI 工具和 UI 的基础
下一章:ThreadItem 详解 — 把 AI 产出的 7 种"东西"全部讲透。