Recursive Dual-Engine Architecture

Abstract
The Recursive Dual-Engine represents Memfit AI's foundational architectural innovation, addressing the fundamental tension between strategic planning and tactical execution in autonomous AI agents. Drawing from cognitive science's dual-process theory (Kahneman, 2011) and hierarchical task network planning (Erol et al., 1994), this architecture enables fractal task decomposition through a nested execution model where strategic planning can be invoked recursively as an atomic action within tactical execution loops.
This document provides a comprehensive, code-backed analysis of the recursive dual-engine architecture, examining its theoretical foundations, implementation details, and the emergent properties that arise from the interaction between its two constituent engines.
Theoretical Foundation
The Dual-Process Paradigm
The recursive dual-engine architecture instantiates a computational dual-process model, drawing direct inspiration from cognitive psychology:
| Cognitive System | Engine | Characteristics | Response Time | Cognitive Load |
|---|---|---|---|---|
| System 2 (Deliberative) | Plan Engine | Goal decomposition, dependency analysis, strategic reasoning | Seconds to minutes | High |
| System 1 (Reactive) | ReAct Engine | Tool execution, error recovery, dynamic adaptation | Milliseconds to seconds | Low |
This design is informed by foundational research in cognitive science and artificial intelligence:
- ReAct: Synergizing Reasoning and Acting (Yao et al., 2022) — Provides the theoretical basis for the tactical layer, demonstrating that interleaved reasoning and acting outperforms pure reasoning or pure acting approaches.
- Plan-and-Solve Prompting (Wang et al., 2023) — Establishes the importance of explicit planning before execution for complex tasks, forming the basis for the strategic layer.
- Hierarchical Task Network (HTN) Planning (Erol et al., 1994) — Provides the formal framework for recursive decomposition of complex tasks into primitive operations.
- Dual Process Theory (Kahneman, 2011) — The cognitive science foundation explaining why separating fast/slow thinking improves decision quality.
The Recursion Principle: Planning as Action
The critical architectural insight is that planning itself constitutes a valid action. When a ReAct Loop encounters a task beyond its immediate capability, it can invoke the Plan Engine to generate a sub-plan, which spawns its own set of ReAct Loops. This creates a fractal execution structure where tasks can be infinitely subdivided until they become atomic operations solvable by available tools.
This principle is formally captured in the action registration:
// From loopinfra/action_request_plan_and_execution.go
var loopAction_RequestPlanAndExecution = &reactloops.LoopAction{
AsyncMode: true,
ActionType: schema.AI_REACT_LOOP_ACTION_REQUEST_PLAN_EXECUTION,
Description: `Request a detailed plan and execute it step-by-step
to achieve the user's goal.`,
ActionHandler: func(loop *reactloops.ReActLoop, action *aicommon.Action,
operator *reactloops.LoopActionHandlerOperator) {
// The ReAct Loop can invoke the Plan Engine as an action
invoker := loop.GetInvoker()
invoker.AsyncPlanAndExecute(task.GetContext(), rewriteQuery, func(err error) {
loop.FinishAsyncTask(task, err)
})
},
}
Emergence Through Composition
The dual-engine architecture exhibits emergent properties that neither engine could achieve independently:
| Property | Plan Engine Alone | ReAct Engine Alone | Combined Architecture |
|---|---|---|---|
| Complexity Handling | Rigid (fails on unforeseen subtasks) | Limited (loses global context) | Adaptive fractal decomposition |
| Error Recovery | Poor (no runtime adaptation) | Good (per-action) | Multi-level recovery |
| Context Preservation | Excellent (global view) | Poor (local view) | Hierarchical context flow |
| Execution Efficiency | Low (over-planning) | High (reactive) | Optimal (plan what's needed) |
1. Architectural Overview
1.1 System Layering
The system is organized into three distinct layers, each with clear responsibilities and well-defined interfaces:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Layer 1: Coordinator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Lifecycle │ │ Context │ │ Event Bus │ │ Memory │ │
│ │ Manager │ │ Container │ │ Router │ │ Triage │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│ Layer 2: Plan Engine │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Task Tree │ │ Dependency │ │ Scheduler │ │ Review │ │
│ │ Generator │ │ Analyzer │ │ (DFS) │ │ Gateway │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────────────────┤
│ Layer 3: ReAct Runtime │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ OODA Loop │ │ Tool │ │ Error │ │ Self │ │
│ │ Executor │ │ Invocation │ │ Recovery │ │ Reflection │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ↓ │
│ [Can invoke Layer 2 recursively via RequestPlanExecution] │
└─────────────────────────────────────────────────────────────────────────────┘
1.2 Layer Responsibilities
| Layer | Primary Role | Key Components | State Ownership |
|---|---|---|---|
| Coordinator | Orchestration | Config, Timeline, EventManager | Global session state |
| Plan Engine | Strategic planning | PlanRequest, TaskTree, Scheduler | Task decomposition state |
| ReAct Runtime | Tactical execution | ReActLoop, ActionHandlers | Action execution state |
2. Core Data Structures
2.1 The AiTask Tree
The fundamental data structure is the AiTask, which supports recursive task definitions with full parent-child relationship tracking:
// From task.go - The recursive task structure
type AiTask struct {
*Coordinator // Reference to orchestrating coordinator
*aicommon.AIStatefulTaskBase // State management mixin
Index string `json:"index"` // Hierarchical index (e.g., "1-2-3")
Name string `json:"name"` // Human-readable task name
Goal string `json:"goal"` // Task objective specification
ParentTask *AiTask `json:"parent_task"` // Parent reference (nil for root)
Subtasks []*AiTask `json:"subtasks"` // Recursive children
// Execution state tracking
StatusSummary string `json:"status_summary"` // Real-time status
TaskSummary string `json:"task_summary"` // Completion summary
ShortSummary string `json:"short_summary"` // Brief summary
LongSummary string `json:"long_summary"` // Detailed summary
// Tool call tracking
toolCallResultIds *omap.OrderedMap[int64, *aitool.ToolResult]
}
2.2 Hierarchical Index System
The hierarchical index system enables precise task identification across arbitrarily deep nesting levels. The indexing follows a path-based notation where each level is separated by hyphens:
// From task.go - Recursive index assignment
func _assignHierarchicalIndicesRecursive(currentTask *AiTask, currentIndex string) {
if currentTask == nil {
return
}
currentTask.Index = currentIndex
currentTask.SetID(currentIndex)
for i, subTask := range currentTask.Subtasks {
// Child index = parent index + child ordinal (1-based)
// Example: parent "1-2" → children "1-2-1", "1-2-2", "1-2-3"
subTaskIndex := fmt.Sprintf("%s-%d", currentIndex, i+1)
_assignHierarchicalIndicesRecursive(subTask, subTaskIndex)
}
}
// GenerateIndex ensures consistent indexing from any node
func (t *AiTask) GenerateIndex() {
if t == nil {
return
}
root := t
// Traverse up to find the actual root (with safety limit)
for i := 0; i < 1000 && root.ParentTask != nil; i++ {
root = root.ParentTask
}
// Start indexing from root with "1"
_assignHierarchicalIndicesRecursive(root, "1")
}
The indexing scheme provides several critical capabilities:
| Feature | Implementation | Example |
|---|---|---|
| Unique Identification | Path-based index | Task "1-2-3" is uniquely addressable |
| Hierarchy Encoding | Hyphen-separated levels | "1-2-3" encodes 3-level nesting |
| Sibling Ordering | Ordinal suffix | "1-2-1", "1-2-2" preserve order |
| Fast Lookup | DFS traversal with index matching | O(n) where n = total tasks |
2.3 Task State Machine
Tasks follow a well-defined state machine with explicit transitions and event emission:
// From aicommon/taskif.go - Task state definitions
type AITaskState string
const (
AITaskState_Created AITaskState = "created" // Initial state after instantiation
AITaskState_Queueing AITaskState = "queueing" // Waiting in execution queue
AITaskState_Processing AITaskState = "processing" // Currently executing
AITaskState_Completed AITaskState = "completed" // Successfully finished
AITaskState_Aborted AITaskState = "aborted" // Failed or terminated abnormally
AITaskState_Skipped AITaskState = "skipped" // User-initiated skip
)
State transitions are tracked with full observability:
// From aicommon/taskif.go - State transition with event emission
func (s *AIStatefulTaskBase) SetStatus(status AITaskState) {
old := s.status
s.status = status
defer func() {
if s.IsFinished() {
s.Cancel() // Clean up context on terminal states
}
}()
if old != status {
log.Debugf("Task %s status changed: %s -> %s", s.GetId(), old, status)
if s.Emitter != nil {
// Emit structured event for UI updates and logging
s.Emitter.EmitStructured("react_task_status_changed", map[string]any{
"react_task_id": s.GetId(),
"react_task_old_status": old,
"react_task_now_status": status,
})
}
}
}
State Transition Diagram:
┌─────────────────────────────────────────┐
│ │
┌────────┐ ┌─▼───────┐ ┌───────────┐ ┌────────▼───┐
│Created │────►│Queueing │────►│Processing │────►│ Completed │
└────────┘ └─────────┘ └─────┬─────┘ └────────────┘
│
┌────────────────┼────────────────┐
▼ ▼
┌──────────┐ ┌──────────┐
│ Aborted │ │ Skipped │
└──────────┘ └──────────┘
3. The Coordinator (Layer 1)
3.1 Role and Responsibilities
The Coordinator serves as the system bus, lifecycle manager, and context container:
// From coordinator.go - Core Coordinator structure
type Coordinator struct {
*aicommon.Config // Inherited configuration
userInput string // Original user request
runtime *runtime // Task scheduler reference
PlanMocker func(config *Coordinator) *PlanResponse // Testing hook
ContextProvider *PromptContextProvider // Dynamic context injection
ResultHandler func(cod *Coordinator) // Completion callback
rootTask *AiTask // Root of task tree
}
The Coordinator's responsibilities span multiple concerns:
| Responsibility | Implementation | Impact |
|---|---|---|
| Session Initialization | NewCoordinatorContext() | Creates execution environment |
| Shared Memory Management | Timeline object | Preserves context across layers |
| User Interaction Handling | Epm (Endpoint Manager) | Processes interrupts, reviews |
| Event Routing | InputEventManager | Distributes events to handlers |
| Task Tree Management | FindSubtaskByIndex() | Enables dynamic task manipulation |
3.2 Execution Lifecycle
The Coordinator.Run() method orchestrates the complete execution lifecycle through five distinct phases:
// From coordinator.go - Complete execution orchestration
func (c *Coordinator) Run() error {
c.registerPEModeInputEventCallback()
c.EmitCurrentConfigInfo()
// ═══════════════════════════════════════════════════════════
// Phase 1: Plan Request Creation
// ═══════════════════════════════════════════════════════════
c.EmitInfo("start to create plan request")
planReq, err := c.createPlanRequest(c.userInput)
if err != nil {
c.EmitError("create planRequest failed: %v", err)
return utils.Errorf("coordinator: create planRequest failed: %v", err)
}
// ═══════════════════════════════════════════════════════════
// Phase 2: Plan Generation (AI-driven decomposition)
// ═══════════════════════════════════════════════════════════
c.EmitInfo("start to invoke plan request")
rsp, err := planReq.Invoke()
if err != nil {
c.EmitError("invoke planRequest failed(first): %v", err)
return utils.Errorf("coordinator: invoke planRequest failed: %v", err)
}
// ═══════════════════════════════════════════════════════════
// Phase 3: Human-in-the-Loop Review
// ═══════════════════════════════════════════════════════════
ep := c.Epm.CreateEndpointWithEventType(schema.EVENT_TYPE_PLAN_REVIEW_REQUIRE)
ep.SetDefaultSuggestionContinue()
c.EmitRequireReviewForPlan(rsp, ep.GetId())
c.DoWaitAgree(c.GetContext(), ep) // Blocking wait for user response
params := ep.GetParams()
c.ReleaseInteractiveEvent(ep.GetId(), params)
if params == nil {
c.EmitError("user review params is nil, plan failed")
return utils.Errorf("coordinator: user review params is nil")
}
// ═══════════════════════════════════════════════════════════
// Phase 4: Plan Refinement Based on Review
// ═══════════════════════════════════════════════════════════
c.EmitInfo("start to handle review plan response")
rsp, err = planReq.handleReviewPlanResponse(rsp, params)
if err != nil {
c.EmitError("handle review plan response failed: %v", err)
return utils.Errorf("coordinator: handle review plan response failed: %v", err)
}
// ═══════════════════════════════════════════════════════════
// Phase 5: Runtime Creation and Task Execution
// ═══════════════════════════════════════════════════════════
if rsp.RootTask == nil {
c.EmitError("root aiTask is nil, plan failed")
return utils.Errorf("coordinator: root aiTask is nil")
}
root := rsp.RootTask
c.rootTask = root
c.ContextProvider.StoreRootTask(root)
// Validate task tree
if len(root.Subtasks) <= 0 {
c.EmitError("no subtasks found, this task is not a valid task")
return utils.Errorf("coordinator: no subtasks found")
}
// Log the execution plan
log.Infof("create aiTask pipeline: %v", root.Name)
for stepIdx, taskIns := range root.Subtasks {
log.Infof("step %d: %v", stepIdx, taskIns.Name)
}
// Create and invoke the runtime scheduler
c.EmitInfo("start to create runtime")
rt := c.createRuntime()
c.runtime = rt
return rt.Invoke(root)
}
3.3 Task Manipulation: Skip and Redo
The Coordinator supports dynamic task manipulation through user-initiated events, enabling human oversight:
// From coordinator.go - Skip subtask handling
func (c *Coordinator) HandleSkipSubtaskInPlan(event *ypb.AIInputEvent) error {
// Parse parameters from event
subtaskIndex := utils.InterfaceToString(params["subtask_index"])
userReason := utils.InterfaceToString(params["reason"])
// Locate the target task via DFS
task := c.FindSubtaskByIndex(subtaskIndex)
if task == nil {
return utils.Errorf("subtask with index %s not found", subtaskIndex)
}
// Update state and cancel context
task.SetStatus(aicommon.AITaskState_Skipped)
task.Cancel()
// Record in timeline for context preservation
timelineMessage := fmt.Sprintf("用户主动跳过了当前子任务,原因: %s", userReason)
c.Timeline.PushText(c.AcquireId(),
"[user-skiped-subtask] 任务 %s (%s) 被用户主动跳过: %s",
task.Index, task.Name, timelineMessage)
return nil
}
// From coordinator.go - Find subtask by hierarchical index
func (c *Coordinator) FindSubtaskByIndex(index string) *AiTask {
if c.rootTask == nil {
return nil
}
// Use DFS traversal to find matching task
taskLink := DFSOrderAiTask(c.rootTask)
for i := 0; i < taskLink.Len(); i++ {
task, ok := taskLink.Get(i)
if !ok {
continue
}
if task.Index == index {
return task
}
}
return nil
}
4. The Plan Engine (Layer 2)
4.1 Role and Responsibilities
The Plan Engine functions as the strategic layer, responsible for decomposing complex goals into executable subtask trees:
| Capability | Description | Implementation |
|---|---|---|
| Goal Decomposition | Breaking complex objectives into subtasks | planRequest.Invoke() |
| Dependency Analysis | Determining execution order | Implicit in subtask ordering |
| Task Tree Construction | Building hierarchical AiTask structure | generateAITask() |
| Review Integration | Allowing human modification | handleReviewPlanResponse() |
4.2 Plan Generation Process
The planning process is implemented as a specialized ReAct Loop that produces structured task definitions:
// From plan.go - Plan invocation with task extraction
func (pr *planRequest) Invoke() (*PlanResponse, error) {
// Check for mock/template plans (testing and domain-specific shortcuts)
if pr.cod.PlanMocker != nil {
pr.cod.EmitThoughtStream("mock task", "使用模版预设任务")
planRes := pr.cod.PlanMocker(pr.cod)
if utils.IsNil(planRes) {
return nil, utils.Error("planMocker returns nil, unknown error")
}
return planRes, nil
}
var rootTask = pr.cod.generateAITaskWithName("root-default", "root-default")
// Ensure configuration propagation after plan extraction
defer func() {
var propagateConfig func(task *AiTask)
propagateConfig = func(task *AiTask) {
if task == nil {
return
}
task.Coordinator = pr.cod
for _, sub := range task.Subtasks {
sub.ParentTask = task // Establish parent-child link
propagateConfig(sub)
}
}
propagateConfig(rootTask)
rootTask.GenerateIndex() // Assign hierarchical indices
}()
// Create a dedicated task for the planning phase
planTask := aicommon.NewStatefulTaskBase(
"plan-task",
pr.rawInput,
pr.cod.Ctx,
pr.cod.Emitter,
true,
)
// Execute the plan-generation ReAct Loop
err := pr.cod.ExecuteLoopTask(
schema.AI_REACT_LOOP_NAME_PLAN,
planTask,
reactloops.WithOnPostIteraction(func(loop *reactloops.ReActLoop,
iteration int, task aicommon.AIStatefulTask, isDone bool, reason any) {
if isDone {
// Extract structured plan from AI response
planData := loop.Get(loop_plan.PLAN_DATA_KEY)
action, err := aicommon.ExtractAction(planData, "plan", "plan")
if err != nil {
log.Errorf("extract action from plan data failed: %v", err)
return
}
// Construct root task from extracted data
rootTask = pr.cod.generateAITaskWithName(
action.GetAnyToString("main_task"),
action.GetAnyToString("main_task_goal"))
// Construct subtasks
for _, subtask := range action.GetInvokeParamsArray("tasks") {
if subtask.GetAnyToString("subtask_name") == "" {
continue
}
rootTask.Subtasks = append(rootTask.Subtasks,
pr.cod.generateAITask(subtask))
}
}
}))
if err != nil {
return nil, err
}
return pr.cod.newPlanResponse(rootTask), nil
}
4.3 Plan Schema Definition
The AI generates plans according to a strict JSON schema that ensures structured, parseable output:
// From jsonschema/plan/plan.json - Plan schema definition
{
"type": "object",
"required": ["main_task", "main_task_goal", "tasks"],
"properties": {
"main_task": {
"type": "string",
"description": "The refined, comprehensive task description that captures the full scope of the user's request."
},
"main_task_goal": {
"type": "string",
"description": "The specific, measurable objective that defines successful completion of the main task."
},
"tasks": {
"type": "array",
"description": "Ordered list of subtasks that decompose the main_task into executable steps.",
"items": {
"type": "object",
"required": ["subtask_name", "subtask_goal"],
"properties": {
"subtask_name": {
"type": "string",
"description": "Concise name using 'verb+noun' format (e.g., 'Analyze Requirements', 'Deploy Service')."
},
"subtask_goal": {
"type": "string",
"description": "Specific objective with clear completion criteria for this subtask."
}
}
}
}
}
}
4.4 Task Generation and Context Preservation
Each task preserves the original user context while adding structured metadata:
// From plan.go - Task generation with context injection
func (c *Coordinator) generateAITaskWithName(name, goal string) *AiTask {
task := &AiTask{
Coordinator: c,
Name: name,
Goal: goal,
}
// Create stateful task base with formatted input
taskBase := aicommon.NewStatefulTaskBase(
"plan-task"+uuid.NewString(),
fmt.Sprintf("任务名称: %s\n任务目标: %s", task.Name, task.Goal),
c.Ctx,
c.Emitter,
true,
)
task.AIStatefulTaskBase = taskBase
// Inject original user input for context preservation
nonce := utils.RandStringBytes(4)
taskInput := task.GetUserInput()
enhancedInput := utils.MustRenderTemplate(`
<|用户原始需求_{{.nonce}}|>
{{ .RawUserInput }}
<|用户原始需求_END_{{.nonce}}|>
---
{{ .Origin }}
`,
map[string]any{
"nonce": nonce,
"RawUserInput": c.userInput,
"Origin": taskInput,
})
task.SetUserInput(enhancedInput)
return task
}
5. The ReAct Runtime (Layer 3)
5.1 Role and Responsibilities
The ReAct Runtime serves as the tactical layer, executing atomic tasks through an iterative OODA (Observe-Orient-Decide-Act) Loop:
// From reactloops/reactloop.go - ReActLoop structure
type ReActLoop struct {
invoker aicommon.AIInvokeRuntime
config aicommon.AICallerConfigIf
emitter *aicommon.Emitter
maxIterations int // Safety limit for iterations
loopName string // Loop type identifier
// Capability gates (dynamic feature toggles)
allowAIForge func() bool // AI Blueprint invocation
allowPlanAndExec func() bool // Recursive planning
allowRAG func() bool // Retrieval-augmented generation
allowToolCall func() bool // External tool calls
allowUserInteract func() bool // User interaction requests
// Action management
actions *omap.OrderedMap[string, *LoopAction]
loopActions *omap.OrderedMap[string, LoopActionFactory]
streamFields *omap.OrderedMap[string, *LoopStreamField]
aiTagFields *omap.OrderedMap[string, *LoopAITagField]
// Memory management
memorySizeLimit int
currentMemories *omap.OrderedMap[string, *aicommon.MemoryEntity]
memoryTriage aicommon.MemoryTriage
// Self-reflection support
enableSelfReflection bool
// Spin detection thresholds
sameActionTypeSpinThreshold int
sameLogicSpinThreshold int
// Action history for pattern detection
actionHistory []*ActionRecord
actionHistoryMutex *sync.Mutex
currentIterationIndex int
}
5.2 The OODA Execution Loop
Each ReAct iteration follows the OODA (Observe-Orient-Decide-Act) cycle, enhanced with self-reflection:
// From reactloops/exec.go - Main execution loop
LOOP:
for {
iterationCount++
// ═══════════════════════════════════════════════════════════
// Safety Bound: Maximum Iteration Check
// ═══════════════════════════════════════════════════════════
if iterationCount > maxIterations {
r.finishIterationLoopWithError(iterationCount, task,
utils.Errorf("reached max iterations (%d), stopping code generation loop", maxIterations))
log.Warnf("Reached max iterations (%d), stopping code generation loop", maxIterations)
needSummary.SetTo(true)
break LOOP
}
// ═══════════════════════════════════════════════════════════
// OBSERVE Phase: Load context and memories
// ═══════════════════════════════════════════════════════════
waitMem := make(chan struct{})
go func() {
defer close(waitMem)
r.fastLoadSearchMemoryWithoutAI(task.GetUserInput())
}()
r.loadingStatus("记忆快速装载中 / waiting for fast memories to load...")
select {
case <-task.GetContext().Done():
return utils.Errorf("task context done before execute ReActLoop")
case <-waitMem:
r.loadingStatus("记忆已装载 / memories loaded")
case <-time.After(200 * time.Millisecond):
r.loadingStatus("跳过快速记忆装载,原因:超时")
}
// ═══════════════════════════════════════════════════════════
// ORIENT Phase: Generate prompt with full context
// ═══════════════════════════════════════════════════════════
r.loadingStatus("执行中... / executing...")
prompt, finalError = r.generateLoopPrompt(
nonce,
task.GetUserInput(),
r.GetCurrentMemoriesContent(),
operator,
)
if finalError != nil {
r.finishIterationLoopWithError(iterationCount, task, finalError)
return finalError
}
// ═══════════════════════════════════════════════════════════
// DECIDE Phase: AI determines next action
// ═══════════════════════════════════════════════════════════
streamWg := new(sync.WaitGroup)
actionParams, handler, transactionErr := r.callAITransaction(streamWg, prompt, nonce)
if transactionErr != nil {
r.finishIterationLoopWithError(iterationCount, task, transactionErr)
break LOOP
}
if utils.IsNil(actionParams) {
r.finishIterationLoopWithError(iterationCount, task,
utils.Error("action is nil in ReActLoop"))
break LOOP
}
// ═══════════════════════════════════════════════════════════
// ACT Phase: Execute the selected action
// ═══════════════════════════════════════════════════════════
actionName := actionParams.Name()
r.loadingStatus(fmt.Sprintf("[%v]执行中 / [%v] executing action...", actionName, actionName))
// Record action for history tracking and spin detection
r.actionHistoryMutex.Lock()
r.currentIterationIndex = iterationCount
actionRecord := &ActionRecord{
ActionType: actionParams.ActionType(),
ActionName: actionName,
ActionParams: make(map[string]interface{}),
IterationIndex: iterationCount,
}
for k, v := range actionParams.GetParams() {
actionRecord.ActionParams[k] = v
}
r.actionHistory = append(r.actionHistory, actionRecord)
r.actionHistoryMutex.Unlock()
// Add iteration info to timeline
msg := fmt.Sprintf("[%v]======== ReAct iteration %d ========", loopName, iterationCount)
if reason := actionParams.GetString("human_readable_thought"); reason != "" {
msg += "\nReason/Next-Step: " + reason
}
r.GetInvoker().AddToTimeline("iteration", msg)
// Execute the action handler
actionStartTime := time.Now()
handler.ActionHandler(r, actionParams, operator)
actionExecutionDuration := time.Since(actionStartTime)
// ═══════════════════════════════════════════════════════════
// REFLECT Phase: Self-assessment (if enabled)
// ═══════════════════════════════════════════════════════════
reflectionLevel := r.shouldTriggerReflection(handler, operator, iterationCount)
if reflectionLevel != ReflectionLevel_None {
r.loadingStatus(fmt.Sprintf("[%v]反思中 / [%v] self-reflecting...", actionName, actionName))
r.executeReflection(handler, actionParams, operator, reflectionLevel,
iterationCount, actionExecutionDuration)
}
// Check termination conditions
if isTerminated, err := operator.IsTerminated(); isTerminated {
log.Infof("ReactLoop[%v] terminated", r.loopName)
if err != nil {
finalError = err
r.finishIterationLoopWithError(iterationCount, task, finalError)
return finalError
}
r.finishIterationLoopWithError(iterationCount, task, nil)
return nil
}
// Handle async mode transition
if handler.AsyncMode {
r.loadingStatus("当前任务进入异步模式 / Async mode, ending loop")
r.finishIterationLoopWithError(iterationCount, task, nil)
return nil
}
}
5.3 Action History Tracking
Every action is recorded for spin detection, debugging, and pattern analysis:
// From reactloops/reactloop.go - Action record structure
type ActionRecord struct {
ActionType string `json:"action_type"`
ActionName string `json:"action_name"`
ActionParams map[string]interface{} `json:"action_params"`
IterationIndex int `json:"iteration_index"`
}
The action history enables sophisticated pattern detection:
| Detection Type | Implementation | Purpose |
|---|---|---|
| Same Action Type Spin | IsInSameActionTypeSpin() | Detect repetitive actions |
| Logic Spin | IsInSameLogicSpinWithAI() | AI-driven semantic analysis |
| Iteration Tracking | currentIterationIndex | Progress monitoring |
6. The Recursive Mechanism
6.1 Planning as an Action
The critical innovation is registering the Plan Engine as a standard action within the ReAct Loop, enabling seamless recursive invocation:
// From loopinfra/action_request_plan_and_execution.go
var loopAction_RequestPlanAndExecution = &reactloops.LoopAction{
AsyncMode: true, // Does not block the current loop
ActionType: schema.AI_REACT_LOOP_ACTION_REQUEST_PLAN_EXECUTION,
Description: `Request a detailed plan and execute it step-by-step
to achieve the user's goal.`,
Options: []aitool.ToolOption{
aitool.WithStringParam(
"plan_request_payload",
aitool.WithParam_Description("Provide a one-sentence summary of
the complex task that needs a multi-step plan."),
),
},
StreamFields: []*reactloops.LoopStreamField{
{FieldName: `plan_request_payload`},
},
ActionVerifier: func(loop *reactloops.ReActLoop, action *aicommon.Action) error {
// Prevent nested plan execution conflicts
invoker := loop.GetInvoker()
if reactInvoker, ok := invoker.(interface {
GetCurrentPlanExecutionTask() aicommon.AIStatefulTask
}); ok {
if reactInvoker.GetCurrentPlanExecutionTask() != nil {
return utils.Errorf("another plan execution task is already running, " +
"please wait for it to complete or use directly_answer")
}
}
improveQuery := action.GetString("plan_request_payload")
if improveQuery == "" {
improveQuery = action.GetInvokeParams("next_action").GetString("plan_request_payload")
}
if improveQuery == "" {
return utils.Errorf("request_plan_and_execution action must have 'plan_request_payload' field")
}
loop.Set("plan_request_payload", improveQuery)
return nil
},
ActionHandler: func(loop *reactloops.ReActLoop, action *aicommon.Action,
operator *reactloops.LoopActionHandlerOperator) {
task := operator.GetTask()
rewriteQuery := loop.Get("plan_request_payload")
invoker := loop.GetInvoker()
// Asynchronously invoke plan-and-execute with completion callback
invoker.AsyncPlanAndExecute(task.GetContext(), rewriteQuery, func(err error) {
loop.FinishAsyncTask(task, err)
})
},
}
6.2 Recursive Invocation Implementation
When the ReAct Engine triggers a sub-plan, it creates a new Coordinator with shared context through event mirroring:
// From aireact/invoke_plan_and_execute.go
func (r *ReAct) invokePlanAndExecute(doneChannel chan struct{}, ctx context.Context,
planPayload string, forgeName string, forgeParams any) (finalErr error) {
doneOnce := new(sync.Once)
done := func() {
doneOnce.Do(func() { close(doneChannel) })
}
defer func() {
done()
if err := recover(); err != nil {
log.Errorf("invokePlanAndExecute panic: %v", err)
utils.PrintCurrentGoroutineRuntimeStack()
}
}()
uid := uuid.New().String()
params := map[string]any{
"re-act_id": r.config.Id,
"re-act_task": r.GetCurrentTask().GetId(),
"coordinator_id": uid,
}
r.EmitJSON(schema.EVENT_TYPE_START_PLAN_AND_EXECUTION, r.config.Id, params)
defer func() {
if finalErr != nil {
r.EmitPlanExecFail(finalErr.Error())
}
r.EmitJSON(schema.EVENT_TYPE_END_PLAN_AND_EXECUTION, r.config.Id, params)
}()
if ctx == nil {
ctx = r.config.Ctx
}
planCtx, cancel := context.WithCancel(ctx)
defer cancel()
// ═══════════════════════════════════════════════════════════
// Event Mirroring: Bridge parent's event stream to child
// ═══════════════════════════════════════════════════════════
inputChannel := chanx.NewUnlimitedChan[*ypb.AIInputEvent](r.config.Ctx, 10)
r.config.InputEventManager.RegisterMirrorOfAIInputEvent(uid, func(event *ypb.AIInputEvent) {
go func() {
switch event.SyncType {
case SYNC_TYPE_QUEUE_INFO:
// Queue info handled separately
default:
log.Infof("InvokePlanAndExecute: Received AI input event: %v", event)
}
inputChannel.SafeFeed(event)
}()
})
defer func() {
r.config.InputEventManager.UnregisterMirrorOfAIInputEvent(uid)
}()
// ═══════════════════════════════════════════════════════════
// Create child Coordinator with inherited context
// ═══════════════════════════════════════════════════════════
baseOpts := aicommon.ConvertConfigToOptions(r.config)
baseOpts = append(baseOpts,
aicommon.WithID(uid),
aicommon.WithTimeline(r.config.Timeline), // Share Timeline for context continuity!
aicommon.WithAICallback(r.config.OriginalAICallback),
aicommon.WithEventInputChanx(inputChannel),
aicommon.WithContext(planCtx),
aicommon.WithPersistentSessionId(r.config.PersistentSessionId),
)
cod, err := aid.NewCoordinatorContext(planCtx, planPayload, baseOpts...)
if err != nil {
return utils.Errorf("failed to create coordinator: %v", err)
}
return cod.Run()
}
6.3 Context Nesting and Inheritance
The child Coordinator inherits the parent's Timeline, enabling seamless context propagation across recursion levels:
┌─────────────────────────────────────────────────────────────────────────────┐
│ Parent ReAct Loop (SessionID: A) │
│ ├── Timeline: Shared │
│ ├── Context: ctx-parent │
│ └── Triggers: RequestPlanExecution │
│ ↓ │
├─────────────────────────────────────────────────────────────────────────────┤
│ Child Coordinator (SessionID: B, inherits from A) │
│ ├── Timeline: Inherited from parent (writes visible to parent) │
│ ├── Context: ctx-child (derived from ctx-parent) │
│ ├── EventMirror: Receives events from parent's InputEventManager │
│ └── Creates: New task tree with own hierarchical indices │
│ ↓ │
├─────────────────────────────────────────────────────────────────────────────┤
│ Child ReAct Loops (For each leaf task) │
│ ├── Timeline: Same shared instance │
│ └── Can recursively trigger more RequestPlanExecution actions │
└─────────────────────────────────────────────────────────────────────────────┘
This architecture ensures:
| Property | Mechanism | Benefit |
|---|---|---|
| Context Continuity | Shared Timeline | Child actions are visible to parent |
| Event Propagation | Mirror callbacks | User interrupts reach all levels |
| Resource Isolation | Derived contexts | Cancellation propagates correctly |
| Index Independence | Separate task trees | Indices don't conflict |
7. Task Scheduling and Execution
7.1 The Runtime Scheduler
The runtime struct manages task scheduling using Depth-First Search traversal:
// From runtime.go - Runtime scheduler structure
type runtime struct {
RootTask *AiTask // Root of task tree
config *Coordinator // Reference to coordinator
currentIndex int // Current position in traversal
TaskLink *linktable.LinkedList[*AiTask] // Linearized task sequence
statusMutex sync.Mutex // Synchronization for status updates
}
func (c *Coordinator) createRuntime() *runtime {
r := &runtime{
config: c,
TaskLink: linktable.New[*AiTask](),
}
return r
}
7.2 DFS Traversal Algorithm
Tasks are processed in Depth-First order, ensuring parent context is established before children:
// From task_dfs.go - DFS traversal implementation
func DFSOrderAiTask(root *AiTask) *linktable.LinkedList[*AiTask] {
result := linktable.New[*AiTask]()
treeStack := []*AiTask{root}
for len(treeStack) > 0 {
// Pop a node from the stack
lastIndex := len(treeStack) - 1
currentTask := treeStack[lastIndex]
treeStack = treeStack[:lastIndex]
result.PushBack(currentTask)
// Push children in reverse order (rightmost first)
// This ensures leftmost children are processed first
children := currentTask.Subtasks
for i := len(children) - 1; i >= 0; i-- {
treeStack = append(treeStack, children[i])
}
}
return result
}
// Post-order variant for cleanup operations
func DFSOrderAiTaskPostOrder(root *AiTask) *linktable.LinkedList[*AiTask] {
result := linktable.New[*AiTask]()
if root == nil {
return result
}
treeStack := make([]*AiTask, 0)
var lastVisited *AiTask
treeStack = append(treeStack, root)
for len(treeStack) > 0 {
peekNode := treeStack[len(treeStack)-1]
// Visit conditions:
// 1. Leaf node (no children)
// 2. All children have been visited
isLeaf := len(peekNode.Subtasks) == 0
allChildrenVisited := !isLeaf && lastVisited == peekNode.Subtasks[len(peekNode.Subtasks)-1]
if isLeaf || allChildrenVisited {
result.PushBack(peekNode)
treeStack = treeStack[:len(treeStack)-1]
lastVisited = peekNode
} else {
// Push children right-to-left for left-to-right processing
for i := len(peekNode.Subtasks) - 1; i >= 0; i-- {
treeStack = append(treeStack, peekNode.Subtasks[i])
}
}
}
return result
}
7.3 Task Execution with State Management
The runtime invokes tasks with comprehensive state management:
// From runtime.go - Task invocation with state handling
func (r *runtime) Invoke(task *AiTask) error {
if r.RootTask == nil {
r.RootTask = task
}
r.updateTaskLink() // Build DFS traversal order
r.currentIndex = 0
invokeTask := func(current *AiTask) error {
// Check for user-initiated skip (Skipped ≠ Aborted)
if current.GetStatus() == aicommon.AITaskState_Skipped {
r.config.EmitInfo("subtask %s was skipped by user, moving to next task", current.Name)
return nil
}
// Check global context cancellation
if r.config.IsCtxDone() {
return utils.Errorf("coordinator context is done")
}
// Check task-local context (may be reset by skip/redo)
if current.IsCtxDone() {
if current.GetStatus() == aicommon.AITaskState_Skipped {
r.config.EmitInfo("subtask %s context cancelled (skipped), moving to next task", current.Name)
return nil
}
return utils.Errorf("task context is done")
}
r.config.EmitInfo("invoke subtask: %v", current.Name)
// Only leaf nodes (no subtasks) are executed
if len(current.Subtasks) == 0 {
current.SetStatus(aicommon.AITaskState_Processing)
}
r.config.EmitPushTask(current)
defer func() {
r.config.EmitUpdateTaskStatus(current)
r.config.EmitPopTask(current)
}()
// Execute only leaf nodes
if len(current.Subtasks) == 0 {
return current.executeTaskPushTaskIndex()
}
return nil // Non-leaf nodes are organizational only
}
// Main execution loop
for {
currentTask, ok := r.NextStep()
if !ok {
return nil // All tasks completed
}
if err := invokeTask(currentTask); err != nil {
// Handle user-initiated skip vs. actual failure
isSkipped := currentTask.GetStatus() == aicommon.AITaskState_Skipped
isContextCanceled := strings.Contains(err.Error(), "context canceled") ||
strings.Contains(err.Error(), "context done")
if isSkipped || (isContextCanceled && currentTask.GetStatus() == aicommon.AITaskState_Skipped) {
r.config.EmitInfo("task %s was skipped by user, continuing to next task", currentTask.Name)
continue
}
// Check global cancellation
if r.config.IsCtxDone() {
r.config.EmitInfo("coordinator context cancelled, stopping execution")
return err
}
r.config.EmitPlanExecFail("invoke task[%s] failed: %v", currentTask.Name, err)
r.config.EmitError("invoke subtask failed: %v", err)
log.Errorf("invoke subtask failed: %v", err)
return err
}
}
}
func (r *runtime) updateTaskLink() {
if r.RootTask == nil {
return
}
r.TaskLink = DFSOrderAiTask(r.RootTask)
}
7.4 Leaf Node Execution with Context Propagation
Task execution includes sophisticated context propagation:
// From task_execute.go - Task execution with parent context
func (t *AiTask) GetUserInput() string {
if utils.IsNil(t.ParentTask) {
return t.AIStatefulTaskBase.GetUserInput()
}
var buf bytes.Buffer
// Collect parent task chain inputs (recursive up to 20 levels)
var collectParentInputs func(task *AiTask, depth int) []string
collectParentInputs = func(task *AiTask, depth int) []string {
if task == nil || task.ParentTask == nil || depth >= 20 {
return nil
}
var inputs []string
// First collect higher-level parent inputs
if task.ParentTask.ParentTask != nil {
inputs = collectParentInputs(task.ParentTask, depth+1)
}
// Then add current parent's input
if task.ParentTask.AIStatefulTaskBase != nil {
input := task.ParentTask.AIStatefulTaskBase.GetUserInput()
if input != "" {
inputs = append(inputs, input)
}
}
return inputs
}
var parentInputs []string
if !utils.IsNil(t.ParentTask) {
parentInputs = collectParentInputs(t, 0)
}
var currentInput string
if t.AIStatefulTaskBase != nil {
currentInput = t.AIStatefulTaskBase.GetUserInput()
}
// Format output with clear delineation
if len(parentInputs) > 0 {
buf.WriteString("<|PARENT_TASK|>\n")
for i, input := range parentInputs {
if i > 0 {
buf.WriteString("\n")
}
buf.WriteString(input)
}
buf.WriteString("\n<|PARENT_TASK_END|>\n\n")
}
buf.WriteString("<|CURRENT_TASK|>\n")
if currentInput != "" {
buf.WriteString(currentInput)
}
buf.WriteString("\n<|CURRENT_TASK_END|>\n\n")
// Add execution guidelines
buf.WriteString("<|INSTRUCTION|>\n")
buf.WriteString("## 任务执行原则\n\n")
buf.WriteString("**核心要求**:请专注于完成 `<|CURRENT_TASK|>` 中定义的任务目标。\n\n")
buf.WriteString("**父任务的作用**:`<|PARENT_TASK|>` 中的信息仅作为上下文参考...\n")
return buf.String()
}
8. Progress Tracking and Visualization
8.1 Hierarchical Progress Display
The system generates human-readable progress indicators for both AI context and user visualization:
// From runtime.go - Progress rendering
func (t *AiTask) dumpProgressEx(i int, w io.Writer, details bool) {
prefix := strings.Repeat(" ", i)
executing := false
finished := false
// Determine aggregate state for non-leaf nodes
if len(t.Subtasks) > 0 {
allFinished := true
haveExecutedTask := false
for _, subtask := range t.Subtasks {
if !subtask.executed() {
allFinished = false
} else if !haveExecutedTask && subtask.executed() {
haveExecutedTask = true
}
}
if haveExecutedTask && !allFinished {
executing = true
} else if allFinished {
finished = true
}
} else {
finished = t.executed()
}
var fill = " "
var note string
if finished {
fill = "x"
if t.TaskSummary != "" {
note = fmt.Sprintf(" (Finished:%s)", t.TaskSummary)
}
} else if executing {
fill = "~"
note = " (部分完成)"
}
if t.executing() {
fill = "-"
note = " (执行中)"
if ret := t.SingleLineStatusSummary(); ret != "" {
note += fmt.Sprintf(" (status:%s)", ret)
}
}
taskNameShow := strconv.Quote(t.Name)
if details {
if t.Goal != "" {
taskNameShow = taskNameShow + "(目标:" + strconv.Quote(t.Goal) + ")"
}
if t.Index != "" {
taskNameShow = t.Index + ". " + taskNameShow
}
}
if strings.TrimSpace(note) == "" {
note = "(未开始)"
}
fmt.Fprintf(w, "%s -[%v] %s %v\n", prefix, fill, taskNameShow, note)
// Recursively render children
if len(t.Subtasks) > 0 {
for _, subtask := range t.Subtasks {
subtask.dumpProgressEx(i+1, w, details)
}
}
}
Sample output visualization:
-[x] 1. "Analyze Security Requirements" (Finished: Identified 5 critical areas)
-[x] 1-1. "Review Authentication" (Finished: OAuth2 implementation verified)
-[-] 1-2. "Audit Authorization" (执行中) (status: Checking role permissions)
-[ ] 1-3. "Check Encryption" (未开始)
-[~] 2. "Implement Fixes" (部分完成)
-[x] 2-1. "Update Auth Module" (Finished: Added MFA support)
-[ ] 2-2. "Apply Security Patches" (未开始)
8.2 Context Injection for Progress Awareness
Each ReAct iteration receives current progress context through reactive data building:
// From task_execute.go - Progress context injection
reactloops.WithReactiveDataBuilder(func(loop *reactloops.ReActLoop,
feedback *bytes.Buffer, nonce string) (string, error) {
reactiveData := utils.MustRenderTemplate(`
当前 Plan-Execution 模式进度信息:
<|PROGRESS_TASK_{{.Nonce}}|>
{{ .Progress }}
--- CURRENT_TASK ---
{{ .CurrentProgress }}
--- CURRENT_TASK_END ---
<|PROGRESS_TASK_END_{{ .Nonce }}|>
- 进度信息语义约定:
1) 任务树状态约定
- 标记含义:
- [-] 表示该节点任务"执行中"
- [ ] 表示该节点任务"未开始"
- [x] 表示该节点任务"已完成"
- 层级缩进表示父子任务关系
2) 当前任务边界
- "当前任务(CURRENT_TASK)"指明你唯一允许推进的任务节点
3) 行为准则(必须遵守)
- 不要假设或回填未在进度信息中出现的状态
- 不要"预完成"尚未执行的步骤
4) 只读规则(重要)
- 进度信息对 AI 是只读的
- 框架会根据实际执行进度自动更新任务清单与状态
`, map[string]interface{}{
"Progress": t.rootTask.Progress(),
"CurrentProgress": t.Progress(),
"Nonce": nonce,
})
return reactiveData, nil
})
9. AI Blueprints (Forges): Pre-compiled Plan Templates
9.1 Concept
AI Blueprints (internally called "Forges") are pre-defined plan templates that can be invoked as specialized recursive Plan Executions. They enable domain-specific automation with pre-configured parameters:
// From loopinfra/action_require_ai_blueprint_forge.go
var loopAction_RequireAIBlueprintForge = &reactloops.LoopAction{
AsyncMode: true,
ActionType: schema.AI_REACT_LOOP_ACTION_REQUIRE_AI_BLUEPRINT,
Description: `Require an AI Blueprint to accomplish complex tasks
that need specialized AI capabilities.`,
Options: []aitool.ToolOption{
aitool.WithStringParam(
"blueprint_payload",
aitool.WithParam_Description("Provide the name of the AI Blueprint to use."),
),
},
ActionHandler: func(loop *reactloops.ReActLoop, action *aicommon.Action,
operator *reactloops.LoopActionHandlerOperator) {
forgeName := action.GetString("blueprint_payload")
invoker := loop.GetInvoker()
task := operator.GetTask()
invoker.RequireAIForgeAndAsyncExecute(task.GetContext(), forgeName, func(err error) {
loop.FinishAsyncTask(task, err)
})
},
}
9.2 Blueprint Invocation Flow
// From aireact/invoke_plan_and_execute.go - Blueprint invocation
func (r *ReAct) RequireAIForgeAndAsyncExecute(
ctx context.Context, forgeName string, onFinished func(error)) {
// Validate blueprint name
if forgeName == "" {
errMsg := "AI Blueprint name is empty, cannot execute"
r.AddToTimeline("[BLUEPRINT_EMPTY_NAME]", errMsg)
r.Emitter.EmitError(errMsg)
onFinished(utils.Error(errMsg))
return
}
// Record invocation attempt
r.AddToTimeline("[BLUEPRINT_INVOKE_START]",
fmt.Sprintf("Invoking AI Blueprint: %s", forgeName))
// Look up and validate blueprint
ins, forgeParams, err := r.invokeBlueprint(forgeName)
if err != nil {
r.AddToTimeline("[BLUEPRINT_INVOKE_FAILED]",
fmt.Sprintf("Failed to invoke '%s': %v", forgeName, err))
r.Emitter.EmitError(fmt.Sprintf("Failed to invoke AI Blueprint '%s'", forgeName))
onFinished(fmt.Errorf("failed to invoke ai-blueprint[%v]: %w", forgeName, err))
return
}
// Invoke the plan with blueprint parameters
r.invokePlanAndExecute(taskDone, ctx, "", forgeName, forgeParams)
}
9.3 Blueprint Lookup and Parameter Extraction
// From aireact/invoke_blueprint.go - Blueprint resolution
func (r *ReAct) invokeBlueprint(forgeName string) (*schema.AIForge, aitool.InvokeParams, error) {
manager := r.config.AiForgeManager
// Look up blueprint in registry
ins, err := manager.GetAIForge(forgeName)
if err != nil {
r.AddToTimeline("[BLUEPRINT_NOT_FOUND]",
fmt.Sprintf("AI Blueprint '%s' does not exist", forgeName))
return nil, nil, utils.Errorf("AI Blueprint '%s' not found: %v", forgeName, err)
}
if ins == nil {
r.AddToTimeline("[BLUEPRINT_NULL_INSTANCE]",
fmt.Sprintf("AI Blueprint '%s' returned nil instance", forgeName))
return nil, nil, utils.Errorf("AI Blueprint '%s' instance is nil", forgeName)
}
// Record successful lookup
r.AddToTimeline("[BLUEPRINT_FOUND]",
fmt.Sprintf("AI Blueprint: %s (%s)", ins.ForgeName, ins.ForgeVerboseName))
// Generate parameter schema
forgeSchema, err := manager.GenerateAIJSONSchemaFromSchemaAIForge(ins)
if err != nil {
return nil, nil, utils.Errorf("generate schema failed: %v", err)
}
// Use AI to extract parameters from current context
prompt, err := r.promptManager.GenerateAIBlueprintForgeParamsPrompt(ins, forgeSchema)
if err != nil {
return nil, nil, utils.Errorf("generate prompt failed: %v", err)
}
var forgeParams = make(aitool.InvokeParams)
err = aicommon.CallAITransaction(
r.config, prompt, r.config.CallAI,
func(rsp *aicommon.AIResponse) error {
action, err := aicommon.ExtractActionFromStream(
r.config.GetContext(),
rsp.GetOutputStreamReader("call-forge", false, r.config.GetEmitter()),
"call-ai-blueprint",
)
if err != nil {
return utils.Errorf("extract action failed: %v", err)
}
forgeParams = action.GetInvokeParams("params")
if len(forgeParams) <= 0 {
return utils.Error("forge params is empty")
}
return nil
},
)
if err != nil {
return nil, nil, err
}
return ins, forgeParams, nil
}
9.4 Use Cases
| Blueprint Type | Domain | Example Workflow |
|---|---|---|
| Java Audit Forge | Security | Code review → Vulnerability scan → Report generation |
| Penetration Testing Forge | Security | Reconnaissance → Scanning → Exploitation → Reporting |
| Documentation Forge | DevOps | Code analysis → API extraction → Doc generation |
| Migration Forge | DevOps | Dependency analysis → Code transformation → Testing |
10. Problems Solved by the Dual-Engine Architecture
10.1 The "Lost in the Weeds" Problem
Symptom: Pure ReAct agents often forget their high-level goal after 10+ steps of execution, becoming myopically focused on immediate sub-problems.
Root Cause: Lack of hierarchical goal representation; all goals flattened into immediate prompts.
Solution: The Plan Engine maintains the Global Task State. Even if a sub-task takes 50+ iterations, the parent Coordinator knows exactly where it fits in the overall roadmap. Progress is continuously fed back to each ReAct iteration through the reactive data builder.
// Each iteration receives full progress context
reactiveData := fmt.Sprintf(`
<|PROGRESS_TASK|>
%s
--- CURRENT_TASK ---
%s
<|PROGRESS_TASK_END|>
`, t.rootTask.Progress(), t.Progress())
10.2 The "Rigidity" Problem
Symptom: Pure Plan-Execute agents fail when the initial plan is flawed (e.g., "Scan server A" fails because server A is down).
Root Cause: No runtime adaptation capability; plans assumed to be perfect at generation time.
Solution: The ReAct Engine handles Runtime Adaptation. It can:
- Retry with different parameters
- Skip unavailable resources
- Trigger re-planning via
RequestPlanExecution - Request user guidance through interaction
10.3 The "Complexity Horizon" Problem
Symptom: No single prompt can handle a task requiring 100+ tools within LLM context limits.
Root Cause: LLM context window constraints; cognitive load limits on reasoning depth.
Solution: Fractal Decomposition. Complex tasks are broken down recursively until each leaf task fits within the context window and reasoning capability of the LLM.
Original Task (100 tools needed)
├── Subtask A (30 tools) → Still too complex
│ ├── Subtask A.1 (10 tools) ✓ Executable
│ ├── Subtask A.2 (10 tools) ✓ Executable
│ └── Subtask A.3 (10 tools) ✓ Executable
├── Subtask B (40 tools) → Still too complex
│ └── ... (recursively decomposed)
└── Subtask C (30 tools) → Still too complex
└── ... (recursively decomposed)
10.4 Comparative Analysis
| Problem | Traditional Agent | Memfit Dual-Engine |
|---|---|---|
| Lost in Weeds | Fails after ~10 steps | Maintains global context indefinitely |
| Rigid Planning | Single plan, no adaptation | Dynamic re-planning at any level |
| Complexity Horizon | ~20 tool limit | Unlimited through recursion |
| Error Recovery | Per-action retry only | Multi-level recovery strategies |
| Human Oversight | Difficult to intervene | Skip/Redo at any task level |
11. Performance Characteristics
11.1 Computational Complexity
| Operation | Time Complexity | Space Complexity |
|---|---|---|
| Task Tree Construction | O(n) | O(n) |
| DFS Traversal | O(n) | O(d) where d = max depth |
| Index Lookup | O(n) | O(1) |
| Event Mirroring | O(m) where m = registered mirrors | O(m) |
11.2 Memory Management
The system implements intelligent memory management through Timeline compression:
- Per-item Limits: Individual timeline items have content size limits
- Batch Compression: Multiple items can be compressed together when space is constrained
- Selective Retention: Important items (errors, user interactions) are preserved preferentially
11.3 Concurrency Model
Main Thread Async Operations
│ │
├── Coordinator.Run() │
│ │ │
│ ├── planRequest.Invoke() ───────► AI Call (goroutine)
│ │ │
│ ├── User Review (blocking) │
│ │ │
│ └── runtime.Invoke() │
│ │ │
│ ├── Task 1 ───────────► ReAct Loop (goroutine)
│ │ │ │
│ │ │ └── AsyncPlanAndExecute
│ │ │ │
│ │ │ └── Child Coordinator
│ │ │
│ └── Task 2 (after Task 1 completes)
│ │
└──────────────────────────────
12. Conclusion
The Recursive Dual-Engine Architecture represents a principled approach to building autonomous AI agents that can handle tasks of arbitrary complexity. By combining strategic planning with tactical execution and enabling recursive invocation between the two layers, Memfit AI achieves:
- Scalability: Tasks of any complexity can be decomposed until manageable through fractal recursion
- Robustness: Failures at any level trigger appropriate recovery mechanisms through self-reflection
- Transparency: Hierarchical progress tracking enables human oversight at every level
- Flexibility: Dynamic re-planning adapts to runtime conditions through action-as-planning
- Context Preservation: Shared Timeline ensures no information is lost across recursion levels
The architecture's grounding in both cognitive science (dual-process theory) and classical AI (hierarchical task networks) provides a solid theoretical foundation, while the code-backed implementation ensures practical deployability in real-world scenarios.
References
Academic Literature
- Yao, S. et al. (2022). ReAct: Synergizing Reasoning and Acting in Language Models. ICLR 2023.
- Wang, L. et al. (2023). Plan-and-Solve Prompting: Improving Zero-Shot Chain-of-Thought Reasoning. ACL 2023.
- Erol, K. et al. (1994). HTN Planning: Complexity and Expressivity. AAAI 1994.
- Kahneman, D. (2011). Thinking, Fast and Slow. Farrar, Straus and Giroux.
- Boyd, J. (1987). A Discourse on Winning and Losing. (OODA Loop conceptualization)
Codebase References
common/ai/aid/task.go— AiTask structure and hierarchical indexingcommon/ai/aid/task_dfs.go— DFS traversal algorithmscommon/ai/aid/coordinator.go— Coordinator orchestrationcommon/ai/aid/runtime.go— Task scheduling and executioncommon/ai/aid/plan.go— Plan generation and task extractioncommon/ai/aid/task_execute.go— Task execution with context propagationcommon/ai/aid/aireact/reactloops/reactloop.go— ReAct Loop implementationcommon/ai/aid/aireact/reactloops/exec.go— OODA execution loopcommon/ai/aid/aireact/invoke_plan_and_execute.go— Recursive invocationcommon/ai/aid/aireact/invoke_blueprint.go— AI Blueprint handlingcommon/ai/aid/aireact/reactloops/loopinfra/action_request_plan_and_execution.go— Planning as actioncommon/ai/aid/aireact/reactloops/loopinfra/action_require_ai_blueprint_forge.go— Blueprint actioncommon/ai/aid/aicommon/taskif.go— Task state machine