跳到主要内容

06. Emitter 与流式输出

回到 README | 上一章:05-hooks-and-lifecycle.md | 下一章:07-liteforge.md

Emitter 是专注模式与外部世界(前端 UI、CLI、日志)通信的唯一通道。本章讲清楚:

  • Emitter 的结构与可组合性
  • 主要发送方法分类
  • 流式机制(AIResponse / chanx.UnlimitedChan / 节点 ID)
  • reactloops 中实际发出的事件清单
  • UX 最佳实践

6.1 Emitter 结构

源码 common/ai/aid/aicommon/emitter.go:25-58

type BaseEmitter func(e *schema.AiOutputEvent) (*schema.AiOutputEvent, error)
type EventProcesser func(e *schema.AiOutputEvent) *schema.AiOutputEvent

type Emitter struct {
streamWG *sync.WaitGroup
id string // CoordinatorId(一般是 ReAct runtime UUID)
baseEmitter BaseEmitter // 真正写到外部
eventProcesserStack *utils.Stack[EventProcesser] // 中间件栈
interactiveEventSaver func(string, *schema.AiOutputEvent)
streamNodeIdI18nProvider func(nodeId string) *schema.I18n
}

Emitter 不是 interface 而是 struct。这意味着:

  • 你拿到的是同一个对象的不同包装(通过 PushEventProcesser 派生新实例)
  • 派生时浅拷贝 + 复制 processer 栈,不共享栈
  • 每个 emit 经过 processer 栈处理后再调 baseEmitter

EventProcesser 栈用法

EventProcesser 给事件加 metadata。典型场景:

// 给所有事件追加 ProcessId(关联 AI 流程)
emitter.AssociativeAIProcess(process)

// 给所有事件附 model name(异步获取)
emitter.WithAIInfoProvider(func() AIEventMeta {
return AIEventMeta{Service: "...", ModelName: "..."}
})

派生出的 emitter 不会污染原 emitter。

6.2 主要发送方法分类

Emitter 有 60+ 个 emit 方法,按用途分类如下。

A. 结构化 JSON 事件(EmitJSON 系列)

方法EventType用途
EmitJSON(typeName, id, data)自定义通用基础方法
EmitStructured(nodeId, data)STRUCTUREDUI 结构化数据节点
EmitStatus(key, value)STRUCTURED (id="status")状态键值对(如 loadingStatus
EmitYakitRisk(id, title, ...)YAKIT_RISK漏洞风险记录
EmitYakitHTTPFlow(...)YAKIT_HTTPFLOWHTTP 流量记录
EmitYakitExecResult(exec)YAKIT_EXEC_RESULT执行结果
EmitFocusOn / LoseFocusFOCUS_ON / LOSE_FOCUS子 loop 切换通知
EmitReActFail / SuccessFAIL_REACT / SUCCESS_REACT整个 ReAct 任务结束

B. ReAct 推理三元组

EmitThought(nodeId, thought)      // 思考
EmitAction(nodeId, action, type, args) // 行动
EmitObservation(nodeId, obs, source) // 观察
EmitIteration(nodeId, current, max, msg) // 迭代信息
EmitResult(nodeId, result, success, ...) // 结果

虽然 reactloops 内部不主动发这些(结构化语义事件),但语义上对应:

三元组reactloops 实际事件
Thought流式 re-act-loop-thought 节点(来自 LLM reasoning)
ActionEVENT_TYPE_STRUCTURED (action 信息) + EmitFileArtifact
Observationtimeline + Feedback 注入下一轮 prompt

C. 流式事件(核心!)

方法典型 NodeId用途
EmitStreamEvent(nodeId, time, reader, taskIdx, ...)任意流式输出(默认 markdown)
EmitStreamEventEx(...)同上 + disableMarkdown 控制
EmitStreamEventWithContentType(nodeId, reader, taskIdx, contentType, ...)同上指定 ContentType(http_flow / yak_code / text_markdown 等)
EmitTextMarkdownStreamEventre-act-loop-answer-payload便捷的 markdown 流
EmitDefaultStreamEvent任意不做 markdown 处理的纯流
EmitYaklangCodeStreamEventyaklang_code_editoryak 代码流
EmitHTTPRequestStreamEventhttp_flowHTTP 请求流

EmitStreamEvent 内部会把整个 io.Reader 拆成 chunk 一边读一边推。reader 关闭时自动 emit STREAM 完成事件。

D. 思考流(thought)

EmitThoughtStream(taskId, fmtTpl, item...)         // 字符串
EmitThoughtStreamReader(taskId, reader, finished) // 流(内部调 EmitStreamEvent("re-act-loop-thought", ...))

NodeId 固定 re-act-loop-thought。reactloops 主循环在 callAITransaction 内部把 LLM reasoning 流自动接到这里。

E. 文件 / 引用资料

方法EventType
EmitFileArtifactWithExt(name, ext, content)落盘 + emit
EmitPinFilename(path)FILESYSTEM_PIN_FILENAME
EmitPinDirectory(path)FILESYSTEM_PIN_DIRECTORY
EmitTextReferenceMaterial(eventId, content)REFERENCE_MATERIAL
EmitTextReferenceMaterialWithFile(...)同上 + 落盘

F. 工具调用相关

EmitToolCallStart / Status / Done / Error / UserCancel
EmitToolCallSummary / Decision / Result / Param / LogDir

工具调用走完整生命周期事件。

G. 任务/流程

EmitJSON(EVENT_TYPE_AI_TASK_SWITCHED_TO_ASYNC, ...)  // 异步切换
EmitJSON(EVENT_TYPE_FOCUS_ON_LOOP, ...) // 切到子 loop
EmitJSON(EVENT_TYPE_LOSE_FOCUS_LOOP, ...) // 离开子 loop
EmitPromptProfile(profileData) // prompt 段落 token 占比

H. 资源 / 风险 / 知识

EmitYakitRisk / EmitYakitHTTPFlow / EmitYakitExecResult
EmitJSON(EVENT_TYPE_KNOWLEDGE, ...)

I. 终态结果

EmitResultAfterStream(content)

特别:这是 Emitter 上没有,但 invoker(AIInvokeRuntime)上有 的方法。在 directly_answer action handler 里典型用法:

invoker.EmitFileArtifactWithExt("directly_answer", ".md", payload)
invoker.EmitResultAfterStream(payload)

EmitResultAfterStream 在所有当前的流式事件结束后再发结果,避免乱序。

6.3 AIResponse 流(reactloops 主循环依赖的底层)

主循环每轮调 callAITransaction(streamWg, prompt, nonce),内部用 aicommon.AIResponse 包装 LLM 流式输出。结构:

type AIResponse struct {
output *chanx.UnlimitedChan[*OutputChunk] // 主输出 chan
reasoning *chanx.UnlimitedChan[*OutputChunk] // reasoning 流(可选)
// ...
}

关键设计:用 chanx.UnlimitedChan 而不是有界 channel,避免 LLM 输出 burst 时阻塞 transport。

GetOutputStreamReader(nodeId, isReasoning, emitter) 把 chan 包成 io.Reader

// emitter 在收到流时会自动 EmitStreamEvent(nodeId, ...) 推送
output := response.GetOutputStreamReader("answer", false, emitter)
reasoning := response.GetOutputStreamReader("re-act-loop-thought", true, emitter)

源码 aicommon/response.go:397-440

Fan-out 给字段提取器

reactloops 的 callAITransaction 内部还会把同一份 reader fan-out 给:

  1. JSON 字段流提取器:扫描 streamFields 配置,遇到 JSON 字段开流时实时调用 callback。
  2. AITag 流提取器:扫描 aiTagFields,遇到 <|TAG_<nonce>|> 开始时切到 tag 内容流。
  3. Action 解析器:完整 JSON 解析,提取 @action 等字段。

所以 LLM 输出只读一遍流,多个消费者并行处理。

6.4 reactloops 中实际发出的事件清单

下面是从代码 grep 出来的事件清单,按主循环阶段排列。

任务开始

事件NodeId / Key时机来源
STRUCTURED (key=status)re-act-loading-status-keyr.loadingStatus("初始化...")exec.go:382-397
自定义 thought 流quick_planInitTask 阶段 LiteForge 流式各 loop init

每轮迭代

事件NodeId时机来源
STRUCTUREDreact_task_mode_changed切到异步模式exec.go 异步路径
STREAM (reasoning)re-act-loop-thoughtLLM 思考流callAITransaction
STREAM (answer)各 action 注册的 NodeIdLLM 输出 stream fieldStreamField 配置
STREAM (AITag)同上AITag 内容流AITagField 配置
STRUCTUREDprompt_profile每轮 prompt 段落统计emitPromptObservationStatus
STRUCTUREDhttp_fuzz_request_changeHTTP fuzz 改包loop_http_fuzztest
STREAMyaklang_code_editoryak 代码生成loop_yaklangcode
STREAMdirectly_call_tool_params工具参数流loopinfra

子 loop 切换

事件EventType
FOCUS_ON_LOOP进入子 loop
LOSE_FOCUS_LOOP离开子 loop

源码 action_from_loop.go:51-58

工具调用

完整 8 个事件:tool_call_starttool_call_statustool_call_paramtool_call_log_dirtool_call_done / tool_call_errortool_call_summarytool_call_resulttool_call_decision

任务结束

事件EventType
RESULTEmitResultAfterStream
FILESYSTEM_PIN_FILENAMEEmitFileArtifact 落盘
REFERENCE_MATERIALfinalize fallback 时
STRUCTURED (status="end")loadingStatus("end")
SUCCESS_REACT / FAIL_REACT全局 hook 决定

中断 / 异常

事件EventType
AI_TASK_SWITCHED_TO_ASYNC异步切换
REQUIRE_USER_INTERACTIVEask_for_clarification

6.5 流式输出的两种模式

模式 A:JSON 字段流

LLM 输出标准 JSON,里面某字段开始流时实时推送:

// 注册
reactloops.WithRegisterLoopActionWithStreamField(
"my_action",
"my action description",
options,
[]*reactloops.LoopStreamField{
{
FieldName: "long_explanation",
AINodeId: "my-action-node",
ContentType: aicommon.TypeTextMarkdown,
},
},
verifier, handler,
)

LLM 还在写 "long_explanation": "..." 时,前端就能看到 my-action-node 节点流。

模式 B:AITag 流

LLM 输出 JSON 之外的 <|FINAL_ANSWER_<nonce>|>...<|FINAL_ANSWER_END_<nonce>|>

// 注册
reactloops.WithAITagFieldWithAINodeId(
"FINAL_ANSWER",
"tag_final_answer",
"re-act-loop-answer-payload",
aicommon.TypeTextMarkdown,
)

LLM 输出(伪示例):

{"@action": "directly_answer", "human_readable_thought": "give final answer"}

<|FINAL_ANSWER_aB3x|>
## 分析报告

发现以下问题:
...
<|FINAL_ANSWER_END_aB3x|>

提取的 markdown 内容存到 loop.Get("tag_final_answer"),并实时流到 re-act-loop-answer-payload

选哪个?

场景推荐
字段是简单字符串、json escape 没问题StreamField
字段是长 markdown / 代码 / HTTP packetAITag
一个内容字段两种都给(互斥)两个都注册
多段独立内容多个 AITagField

详细对比见 04-actions.md

6.6 NodeId 命名约定

NodeId 决定前端把流挂在哪个 UI 元素上。reactloops 用了一些固定的 NodeId:

NodeId含义
re-act-loop-thought思考流(reasoning)
re-act-loop-answer-payload最终答案 markdown 流
re-act-loading-status-key加载状态 key
directly_call_tool_params工具参数流
prompt_profileprompt 段落开销
http_flowHTTP 请求/响应
yaklang_code_editoryak 代码
quick_plan快速规划思路
reference_material引用资料

ContentType 决定渲染方式:

ContentType含义
text_markdownMarkdown 渲染
text_plain纯文本
code_yaklangyak 代码高亮
code_http_requestHTTP 包格式化
default默认

6.7 UX 最佳实践

1. 总是把"思考"流出来

LLM 思考阶段沉默几秒钟用户会觉得卡住。reactloops 自动把 reasoning 流到 re-act-loop-thought,但你也可以主动 EmitThoughtStream:

emitter.EmitThoughtStream(task.GetIndex(), "Initialized fuzz request from extracted HTTP packet.")

loop_http_fuzztest/init.go:144 在 init 阶段先 emit 一句"我已经从输入提取到 packet",让用户立刻有反馈。

2. 关键 action 后立刻 EmitFileArtifact

不要等到结束才一次性 emit。比如代码生成 action:

// 在 ActionHandler 里
yakCode := generate(...)
invoker.EmitFileArtifactWithExt("generated_code", ".yak", yakCode)
loop.Set("last_generated_code", yakCode)
operator.Continue()

3. 长输出用 AITag + StreamField 两路

directly_answer action 同时注册 answer_payload(StreamField)和 FINAL_ANSWER(AITag)。LLM 自由选择短答案或长答案。前端不需要关心,因为 NodeId 一致,事件流落到同一个 UI 元素。

4. 不要在静默路径里偷偷消耗模型

很多 hook(OnPostIteraction 之类)调 LiteForge 没有流式输出,用户不知道在跑什么。所以:

  • 调 LiteForge 时带 NodeId 流回调,让用户看到生成过程
  • 或者 EmitThoughtStream 一句话告诉用户"系统正在生成总结"

参考 loop_http_flow_analyze/finalize.go:174-191

aicommon.WithGeneralConfigStreamableFieldEmitterCallback([]string{"summary"}, func(key string, r io.Reader, emitter *aicommon.Emitter) {
if event, _ := emitter.EmitStreamEventWithContentType(
"re-act-loop-answer-payload",
utils.JSONStringReader(r),
taskID,
aicommon.TypeTextMarkdown,
func() {},
); event != nil {
streamId := event.GetStreamEventWriterId()
emitter.EmitTextReferenceMaterial(streamId, contextMaterials)
}
}),

不仅流出,还把 context materials 作为 reference material 关联——用户可以查看"这个总结基于哪些原始数据"。

5. loadingStatus 的语义

主循环每个阶段都会调 r.loadingStatus(...)

  • "初始化 / initializing..."
  • "执行初始化函数 / execute init handler..."
  • "记忆快速装载中 / waiting for fast memories to load..."
  • "执行中... / executing..."
  • "[action_name]执行中 / executing action..."
  • "end"(除非 noEndLoadingStatus=true

UI 一般在底部状态栏显示。WithNoEndLoadingStatus(true) 让子 loop 不发 "end",避免错误结束信号。

6. EmitPinFilename 给可下载的产物

落盘后调 EmitPinFilename(path),前端会显示一个"打开文件"链接。EmitFileArtifactWithExt 内部已经做了。

6.8 实战:自定义流式 action

reactloops.WithRegisterLoopActionWithStreamField(
"generate_report",
"生成 markdown 报告",
[]aitool.ToolOption{
aitool.WithStringParam("report_content", aitool.WithParam_Required(true)),
},
[]*reactloops.LoopStreamField{
{
FieldName: "report_content",
AINodeId: "report-stream",
ContentType: aicommon.TypeTextMarkdown,
},
},
verifier,
func(loop *reactloops.ReActLoop, action *aicommon.Action, op *reactloops.LoopActionHandlerOperator) {
report := action.GetString("report_content")
invoker := loop.GetInvoker()
invoker.EmitFileArtifactWithExt("report", ".md", report)
invoker.EmitResultAfterStream(report)
op.Exit()
},
)

LLM 输出 report_content 字段时,前端 report-stream 节点就开始接收。结束后落盘 + EmitResultAfterStream 发最终结果事件。

6.9 进一步阅读

  • 14-streaming-ux.md:流式输出与 UX 实战 —— 本章是手册,14 章是「专注模式作者写流式怎么做对」的实战指南(yak / Go 双侧、ContentType / NodeId 命名规范、终局三连、八条踩坑)
  • 03-prompt-system.md:prompt 模板里的 nonce / TAG 防注入
  • 04-actions.md:StreamField vs AITagField 的取舍
  • 12-debugging-and-observability.mdEmitPromptProfile 调试
  • 13-yak-focus-mode.md:用 yak 脚本写专注模式(包含 __AI_TAG_FIELDS__ / stream_fields 在 yak 侧的精确字段名)