S
Codex SDK 教程 TypeScript
第 03 章

流式处理

流式处理 — 实时看 AI 在干什么

这章学会用 runStreamed() 实时接收 AI 的每一步动作,而不是傻等到最后。

为什么需要流式?

上一章用的 run() 有个问题:AI 可能要干 30 秒甚至更久,这段时间你的程序就卡在那里等,用户看到的是一片空白。

打个比方:

  • run() 像点外卖 — 下完单就等着,菜全做完了才一起送到
  • runStreamed() 像看后厨直播 — 厨师切菜你看得到,炒菜你也看得到,边做边上

在做用户界面或 CLI 工具时,流式处理是必须的。谁也不想盯着一个空白屏幕干等。

run() vs runStreamed() 对比

特性 run() runStreamed()
返回时机 全部完成后一次返回 立即返回,事件一个个来
返回类型 Promise<RunResult> Promise<{ events: AsyncGenerator<ThreadEvent> }>
适合场景 后台任务、不需要实时反馈 CLI 工具、UI 界面、需要进度展示
复杂度 简单,3 行代码 稍复杂,需要处理事件循环

第一个流式程序

typescript// streaming-basic.ts
import { Codex } from "@openai/codex-sdk";
import type { ThreadEvent } from "@openai/codex-sdk";

async function main() {
  const codex = new Codex();
  const thread = codex.startThread({ skipGitRepoCheck: true });

  console.log("🚀 开始流式请求...\n");

  const { events } = await thread.runStreamed("用三句话解释什么是递归");

  for await (const event of events) {
    console.log(`📨 [${event.type}]`);

    if (event.type === "item.completed" && event.item.type === "agent_message") {
      console.log(`\n🤖 AI 说: ${event.item.text}`);
    }

    if (event.type === "turn.completed") {
      console.log(`\n📊 Token 消耗: 输入 ${event.usage.input_tokens}, 输出 ${event.usage.output_tokens}`);
    }
  }

  console.log("\n✅ 完成!");
}

main().catch(console.error);

运行:npx tsx streaming-basic.ts

你会看到事件一个一个蹦出来:

🚀 开始流式请求...

📨 [thread.started]
📨 [turn.started]
📨 [item.started]
📨 [item.completed]

🤖 AI 说: 递归就是一个函数自己调用自己...

📨 [turn.completed]

📊 Token 消耗: 输入 128, 输出 67

✅ 完成!

事件类型全览

事件按时间顺序出现,形成一个清晰的生命周期:

thread.started → turn.started → [item.started → item.updated* → item.completed]* → turn.completed

每种事件的含义:

事件类型 什么时候触发 携带数据 你通常要做什么
thread.started 线程创建成功 thread_id 保存 ID(用于后续恢复)
turn.started 一轮对话开始 显示"思考中..."
item.started AI 开始做一件事 ThreadItem(状态 in_progress) 显示"正在执行..."
item.updated 进度更新 ThreadItem(中间状态) 更新进度条/待办列表
item.completed 一件事做完了 ThreadItem(最终状态) 显示结果
turn.completed 这轮对话结束 usage(token 统计) 统计消耗、标记完成
turn.failed 这轮对话失败 error: { message } 显示错误信息
error 致命错误 message 报错退出

关键理解:一个 turn 里可能有多个 item。比如 AI 决定先执行一个命令,再根据命令结果给你回复,那就是两个 item(一个 command_execution + 一个 agent_message)。

实战:做一个实时进度显示器

这个例子处理所有常见事件,用不同的图标和颜色展示 AI 的每一步动作:

typescript// progress.ts
import { Codex } from "@openai/codex-sdk";
import type { ThreadEvent, ThreadItem } from "@openai/codex-sdk";

function handleItemCompleted(item: ThreadItem): void {
  switch (item.type) {
    case "agent_message":
      console.log(`\n💬 AI 回复:\n${item.text}`);
      break;

    case "reasoning":
      console.log(`\n🧠 思考过程: ${item.text}`);
      break;

    case "command_execution": {
      const exitInfo = item.exit_code !== undefined ? ` (退出码: ${item.exit_code})` : "";
      const icon = item.status === "completed" ? "✅" : "❌";
      console.log(`\n${icon} 执行命令: ${item.command}${exitInfo}`);
      if (item.aggregated_output) {
        console.log(`   输出: ${item.aggregated_output.slice(0, 200)}${item.aggregated_output.length > 200 ? "..." : ""}`);
      }
      break;
    }

    case "file_change":
      for (const change of item.changes) {
        const icon = { add: "📄", update: "✏️", delete: "🗑️" }[change.kind];
        console.log(`${icon} 文件${change.kind === "add" ? "新建" : change.kind === "update" ? "修改" : "删除"}: ${change.path}`);
      }
      break;

    case "mcp_tool_call":
      console.log(`🔧 MCP 工具调用: ${item.server}/${item.tool} → ${item.status}`);
      break;

    case "web_search":
      console.log(`🔍 Web 搜索: ${item.query}`);
      break;

    case "error":
      console.log(`⚠️ 非致命错误: ${item.message}`);
      break;
  }
}

function handleItemUpdated(item: ThreadItem): void {
  if (item.type === "todo_list") {
    console.log("\n📋 AI 的工作计划:");
    for (const todo of item.items) {
      console.log(`   ${todo.completed ? "✅" : "⬜"} ${todo.text}`);
    }
  }
}

function handleEvent(event: ThreadEvent): void {
  switch (event.type) {
    case "thread.started":
      console.log(`🆔 线程 ID: ${event.thread_id}`);
      break;
    case "turn.started":
      console.log("\n⏳ AI 开始工作...");
      break;
    case "item.started":
      // item 开始时不打印太多,等 completed 再显示详情
      break;
    case "item.updated":
      handleItemUpdated(event.item);
      break;
    case "item.completed":
      handleItemCompleted(event.item);
      break;
    case "turn.completed":
      console.log(`\n📊 消耗: 输入 ${event.usage.input_tokens} | 缓存 ${event.usage.cached_input_tokens} | 输出 ${event.usage.output_tokens} tokens`);
      break;
    case "turn.failed":
      console.error(`\n❌ 失败: ${event.error.message}`);
      break;
  }
}

async function main() {
  const codex = new Codex();
  const thread = codex.startThread();

  const prompt = process.argv[2] || "看看当前目录有什么文件,简单介绍一下这个项目";
  console.log(`📝 提问: ${prompt}\n`);

  const { events } = await thread.runStreamed(prompt);

  for await (const event of events) {
    handleEvent(event);
  }

  console.log("\n✅ 全部完成!");
}

main().catch(console.error);

把它保存为 progress.ts,在任意 Git 项目里运行:

bashnpx tsx progress.ts "帮我看看这个项目的 package.json 有什么依赖"

你会看到 AI 的每一步操作实时展示:先执行 cat package.json 命令,然后给你分析结果。

async generator 小知识

runStreamed() 返回的 events 是一个 async generator(异步生成器)。如果你没用过这个语法,这里快速过一下:

基本用法

typescriptfor await (const event of events) {
  // 每来一个事件,这里执行一次
}
// 所有事件处理完,循环结束

提前退出

typescriptfor await (const event of events) {
  if (event.type === "turn.failed") {
    console.error("出错了,提前退出");
    break; // 跳出循环,不再处理后续事件
  }
}

错误处理

typescripttry {
  for await (const event of events) {
    handleEvent(event);
  }
} catch (err) {
  console.error("事件流出错:", err);
}

收集所有事件

typescript// 如果你既想流式处理,又想最后拿到所有事件
const allEvents: ThreadEvent[] = [];
for await (const event of events) {
  allEvents.push(event);
  handleEvent(event); // 实时处理
}
// 这里 allEvents 包含了所有事件

小结

  • run() 等全部完成再返回,runStreamed() 实时推送事件
  • 事件生命周期:thread.started → turn.started → item.* → turn.completed
  • 一个 turn 里可能有多个 item(命令执行、文件修改、最终回复等)
  • for await...of 遍历事件流
  • 流式处理是做 CLI 工具和 UI 的基础

下一章ThreadItem 详解 — 把 AI 产出的 7 种"东西"全部讲透。