Skip to content

feat: Wave-barrier execution model with convergence loops #379

@jafreck

Description

@jafreck

Summary

Add a WaveBarrierExecutor to the engine that executes dependency-ordered waves of work items with inter-wave validation barriers and per-wave convergence loops.

Motivation

The framework provides ParallelExecutor (concurrent within a batch) and SerialExecutor (one at a time), but neither supports the common pattern of dependency-ordered waves with validation barriers:

  1. Topologically sort tasks into dependency waves (wave 0 = no deps, wave 1 = depends only on wave 0, etc.)
  2. Execute all tasks in a wave concurrently
  3. After each wave completes, run a validation function (e.g., build the project, run tests) against the accumulated output
  4. If validation fails, enter a convergence loop — re-execute failed tasks with enriched error context, then re-validate, up to N iterations
  5. Only advance to the next wave once the barrier passes or the convergence limit is reached

This pattern is broadly applicable to any multi-agent pipeline where:

  • Later tasks depend on earlier tasks' output being correct
  • Correctness can only be verified by running commands against the aggregate state
  • Individual task failures are recoverable by re-running with error feedback

Examples beyond code migration: infrastructure-as-code rollouts (deploy wave → smoke test → next wave), document generation pipelines (draft → review gate → publish), multi-service integration testing.

Proposed API

WaveBarrierExecutor

interface WaveBarrierConfig<TTask, TValidationResult> {
  /** Maximum concurrent tasks within a single wave. */
  concurrency: number;

  /** Maximum convergence iterations per wave before moving on or aborting. */
  maxConvergenceIterations: number;

  /** Execute a single task. Returns success/failure. */
  executeTask: (task: TTask, context: WaveTaskContext) => Promise<WaveTaskResult>;

  /** Validate the accumulated state after a wave completes. Called at the barrier. */
  validate: (wave: number, completedTasks: TTask[]) => Promise<TValidationResult>;

  /** Determine if validation passed. */
  isValid: (result: TValidationResult) => boolean;

  /** Extract error context from a failed validation to feed back into task re-execution. */
  extractErrorContext?: (result: TValidationResult) => Record<string, unknown>;

  /** Called when a wave's convergence loop completes (converged or limit reached). */
  onWaveComplete?: (wave: number, converged: boolean, iterations: number) => void;

  /** Called when convergence limit is reached without passing validation. */
  onConvergenceExhausted?: (wave: number, lastResult: TValidationResult) => 'continue' | 'abort';
}

interface WaveTaskContext {
  wave: number;
  convergenceIteration: number;
  /** Error context from the previous failed validation (if in a convergence loop). */
  previousValidationError?: Record<string, unknown>;
}

interface WaveTaskResult {
  success: boolean;
  error?: string;
}

interface WaveBarrierResult<TTask> {
  completedWaves: number;
  totalWaves: number;
  completedTasks: TTask[];
  failedTasks: Array<{ task: TTask; error: string }>;
  /** Per-wave convergence stats. */
  waveStats: Array<{
    wave: number;
    converged: boolean;
    iterations: number;
    taskCount: number;
  }>;
  aborted: boolean;
  abortReason?: string;
}

class WaveBarrierExecutor<TTask, TValidationResult> {
  constructor(config: WaveBarrierConfig<TTask, TValidationResult>, logger: LoggerLike);

  /**
   * Execute all tasks in dependency-wave order with validation barriers.
   * @param waves - Pre-computed waves (arrays of tasks, ordered by dependency depth).
   */
  execute(waves: TTask[][]): Promise<WaveBarrierResult<TTask>>;
}

Integration with SessionQueue

Add a helper to compute waves from the existing SessionQueue:

class SessionQueue {
  // ... existing methods ...

  /** Compute topological waves: wave[0] = roots, wave[1] = depends only on wave[0], etc. */
  computeWaves(): AgentSession[][];
}

Events

interface WaveStartedEvent {
  type: 'wave-started';
  wave: number;
  taskCount: number;
}

interface WaveBarrierEnteredEvent {
  type: 'wave-barrier-entered';
  wave: number;
}

interface WaveBarrierReleasedEvent {
  type: 'wave-barrier-released';
  wave: number;
  converged: boolean;
  iterations: number;
  duration: number;
}

interface WaveConvergenceStatusEvent {
  type: 'wave-convergence-status';
  wave: number;
  iteration: number;
  converged: boolean;
  remainingFailures: number;
}

Implementation Notes

  • WaveBarrierExecutor should delegate to ParallelExecutor for intra-wave concurrency — composition, not replacement
  • Checkpoint integration: after each wave completes, emit a checkpoint-safe event so consumers can persist progress
  • The onConvergenceExhausted callback returning 'continue' allows "best-effort" mode where validation failures are logged but execution continues to the next wave
  • Wave computation from SessionQueue should handle the existing detectBatchCollisions / selectNonOverlappingBatch logic for file-overlap-aware batch selection within waves
  • The executor is generic over TTask — it doesn't require AgentSession specifically, making it reusable for non-agent workflows

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions