From 541f3cc6d0f835b4eace44a3aa7f35ffd420a79d Mon Sep 17 00:00:00 2001 From: Robbie McKinstry Date: Wed, 24 Jun 2026 22:29:07 -0400 Subject: [PATCH] feat(check): streaming Kameo actor pipeline for `multi check` (MULTI-1368) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Re-architect the `multi check` pipeline from its barrier-based shape into a streaming, fire-and-forget actor pipeline using Kameo (v0.20). Each phase becomes its own actor wired with one-way `tell`: DiscoveryActor --CheckDiscovered--> ExecutionActor --CheckCompleted--> ReportingActor This removes the two synchronization barriers between phases: a check begins executing the moment Discovery has validated it, and a result is folded into Reporting the moment its agent returns — no waiting on an owned `Vec`. - messages.rs: the inter-actor message protocol (CheckJob carried end-to-end, plus DiscoveryComplete/ExecutionComplete end-of-stream sentinels). - DiscoveryActor: runs the whole-suite parse + validation gate (discover()), then streams one CheckDiscovered per check with a run-unique CheckId; any invalid CHECKS.md aborts the whole run via DiscoveryFailed with no agents spawned (strict whole-run abort). - ExecutionActor: offloads each agent run onto a spawned task bounded by a shared Semaphore(cfg.concurrency) acquired inside the task (the semaphore is the cap, not the mailbox); retries reframed as RetryCheck self-messages. - ReportingActor: folds verdicts incrementally, then buffers and sorts by (req_index, declaration order) before the final render — output ordering, AND-aggregation, and exit codes are byte-for-byte unchanged. - Coordinator in run(): spawns the actors, kicks discovery, awaits the terminal result over a oneshot, and maps a dead actor to a diagnostic rather than hanging the run. The CheckExecutor/Sandbox DI seams are reused unchanged. Tests drive the actor pipeline deterministically (FakeExecutor + NoopSandbox), with new coverage for concurrent (non-barriered) execution, the retry self-message path, and the strict abort-without-spawning-agents path. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 39 ++++- Cargo.toml | 1 + src/checks/discovery/mod.rs | 75 ++++++++- src/checks/e2e.rs | 117 +++++++++++-- src/checks/execution.rs | 321 ++++++++++++++++++++---------------- src/checks/executor/fake.rs | 45 ++++- src/checks/messages.rs | 81 +++++++++ src/checks/mod.rs | 179 ++++++++++++++++++-- src/checks/reporting.rs | 143 +++++++++++++++- 9 files changed, 824 insertions(+), 177 deletions(-) create mode 100644 src/checks/messages.rs diff --git a/Cargo.lock b/Cargo.lock index 4338c75..743f2be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1661,6 +1661,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" +[[package]] +name = "downcast-rs" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117240f60069e65410b3ae1bb213295bd828f707b5bec6596a1afc8793ce0cbc" + [[package]] name = "dunce" version = "1.0.5" @@ -2692,6 +2698,33 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kameo" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1dfd134d7a2c6ec05ee696dcbf3f7a034bdb97ecc623e981014652dcd124d77" +dependencies = [ + "downcast-rs 2.0.2", + "dyn-clone", + "futures", + "kameo_macros", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "kameo_macros" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16c9002c9ecd16e1636f566c0bf62db48e70d86ed0d0a91b398955e883217a23" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.109", +] + [[package]] name = "kqueue" version = "1.2.0" @@ -3062,6 +3095,7 @@ dependencies = [ "futures-util", "ignore", "indexmap 2.12.0", + "kameo", "libc", "miette", "mockall", @@ -5252,7 +5286,7 @@ dependencies = [ "census", "crc32fast", "crossbeam-channel", - "downcast-rs", + "downcast-rs 1.2.1", "fastdivide", "fnv", "fs4", @@ -5304,7 +5338,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12722224ffbe346c7fec3275c699e508fd0d4710e629e933d5736ec524a1f44e" dependencies = [ - "downcast-rs", + "downcast-rs 1.2.1", "fastdivide", "itertools 0.12.1", "serde", @@ -5569,6 +5603,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.6.1", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] diff --git a/Cargo.toml b/Cargo.toml index 33fd2fa..2b2cd8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ failsafe = "1.3.0" futures-core = "0.3.31" futures-util = "0.3.31" indexmap = { version = "2.1.0", features = ["serde"] } +kameo = "0.20" miette = { workspace = true, features = ["fancy"] } mockall = "0.13.1" multitool-sdk = { git = "https://github.com/wack/multitool-rust-sdk.git", branch = "trunk" } diff --git a/src/checks/discovery/mod.rs b/src/checks/discovery/mod.rs index bdd5dba..fc100a0 100644 --- a/src/checks/discovery/mod.rs +++ b/src/checks/discovery/mod.rs @@ -10,12 +10,18 @@ mod parse; mod walk; -use std::path::Path; +use std::path::{Path, PathBuf}; +use kameo::Actor; +use kameo::actor::ActorRef; +use kameo::message::{Context, Message}; use miette::{IntoDiagnostic, Result}; use multi_core::ManyError; +use crate::checks::execution::ExecutionActor; +use crate::checks::messages::{BeginDiscovery, DiscoveryFailed}; use crate::checks::model::Requirement; +use crate::checks::reporting::ReportingActor; /// Run the full discovery phase rooted at `root`. /// @@ -47,6 +53,73 @@ pub async fn discover(root: &Path) -> Result> { Ok(requirements) } +/// The discovery actor: on a [`BeginDiscovery`] kick it runs the whole-suite +/// parse + validation gate ([`discover`]) and then **streams** the validated +/// checks downstream — assigning each a run-unique [`CheckId`] and `tell`ing one +/// [`CheckDiscovered`] per check, followed by a [`DiscoveryComplete`] sentinel. +/// +/// If the suite is invalid it streams **nothing** and instead tells reporting to +/// abort the whole run (strict whole-run abort, decision #3), so no agents are +/// ever spawned for a suite that will not run. +/// +/// [`CheckId`]: crate::checks::model::CheckId +/// [`CheckDiscovered`]: crate::checks::messages::CheckDiscovered +/// [`DiscoveryComplete`]: crate::checks::messages::DiscoveryComplete +pub(crate) struct DiscoveryActor { + root: PathBuf, + execution: ActorRef, + reporting: ActorRef, +} + +impl Actor for DiscoveryActor { + type Args = Self; + type Error = std::convert::Infallible; + + async fn on_start( + args: Self::Args, + _actor_ref: ActorRef, + ) -> std::result::Result { + Ok(args) + } +} + +impl DiscoveryActor { + pub(crate) fn new( + root: PathBuf, + execution: ActorRef, + reporting: ActorRef, + ) -> Self { + Self { + root, + execution, + reporting, + } + } +} + +impl Message for DiscoveryActor { + type Reply = (); + + async fn handle(&mut self, _msg: BeginDiscovery, _ctx: &mut Context) -> Self::Reply { + match discover(&self.root).await { + Ok(requirements) => { + if let Err(err) = + crate::checks::stream_requirements(&self.execution, &requirements).await + { + tracing::error!(?err, "failed to stream discovered checks to execution"); + } + } + Err(report) => { + // Strict whole-run abort: no `CheckDiscovered` is emitted, so no + // agents run; reporting turns this into the run's `Err`. + if let Err(err) = self.reporting.tell(DiscoveryFailed { report }).await { + tracing::error!(?err, "reporting actor unavailable for discovery failure"); + } + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/checks/e2e.rs b/src/checks/e2e.rs index c6120c7..13a55b7 100644 --- a/src/checks/e2e.rs +++ b/src/checks/e2e.rs @@ -1,22 +1,29 @@ -//! End-to-end pipeline test (MULTI-1355). +//! End-to-end pipeline test (MULTI-1355; actor pipeline in MULTI-1368). //! -//! Drives the real discovery → execution → reporting → exit-code pipeline over a -//! fixture tree, with the fake executor and a no-op sandbox injected so verdicts -//! are scripted. Runs deterministically in CI with no `claude`, network, or APFS -//! dependency, guarding the end-to-end contract against regressions. +//! Drives the real discovery → execution → reporting → exit-code actor pipeline +//! over a fixture tree, with the fake executor and a no-op sandbox injected so +//! verdicts are scripted. Runs deterministically in CI with no `claude`, +//! network, or APFS dependency, guarding the end-to-end contract against +//! regressions. use std::fs; use std::sync::Arc; +use std::time::Duration; +use async_trait::async_trait; use clap::Parser; +use miette::Result; use tempfile::TempDir; +use tokio::sync::Barrier; use crate::checks::config::configuration; use crate::checks::discovery::discover; -use crate::checks::execution::execute; -use crate::checks::executor::FakeExecutor; +use crate::checks::executor::{ + AgentOutcome, AgentRunRequest, CheckExecutor, CheckReport, FakeExecutor, +}; use crate::checks::reporting::report; use crate::checks::sandbox::NoopSandbox; +use crate::checks::{run_pipeline, run_to_outcomes}; use crate::{Cli, Terminal}; /// A terminal with color forced off, so reporting emits deterministic text. @@ -84,7 +91,7 @@ async fn pipeline_satisfied_failed_multi_and_anonymous() { let fake = Arc::new(fake); let cfg = configuration(); - let outcomes = execute(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path(), &reqs) + let outcomes = run_to_outcomes(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path(), &reqs) .await .unwrap(); @@ -120,7 +127,7 @@ async fn all_satisfied_exits_zero() { let fake = Arc::new(FakeExecutor::new().with_report(0, true, None)); let cfg = configuration(); - let outcomes = execute(&cfg, fake, Arc::new(NoopSandbox), dir.path(), &reqs) + let outcomes = run_to_outcomes(&cfg, fake, Arc::new(NoopSandbox), dir.path(), &reqs) .await .unwrap(); assert!(outcomes[0].satisfied); @@ -136,7 +143,7 @@ async fn empty_tree_exits_zero() { assert!(reqs.is_empty()); let cfg = configuration(); - let outcomes = execute( + let outcomes = run_to_outcomes( &cfg, Arc::new(FakeExecutor::new()), Arc::new(NoopSandbox), @@ -150,3 +157,93 @@ async fn empty_tree_exits_zero() { let code = report(&plain_terminal(), &outcomes).unwrap(); assert_eq!(code, 0); } + +/// A check executor that only reports a verdict once *every* check in the suite +/// is running simultaneously: each run blocks on a shared [`Barrier`] sized to +/// the whole suite. If the pipeline executed checks one-at-a-time (a barrier +/// between phases, or a serialized actor handler), the barrier would never trip +/// and every run would time out into a no-verdict (errored) outcome. That all +/// checks instead come back satisfied is a direct demonstration that they +/// execute concurrently — i.e. a check starts the moment discovery streams it, +/// without waiting for the others. +struct InterleavingExecutor { + barrier: Arc, +} + +#[async_trait] +impl CheckExecutor for InterleavingExecutor { + async fn run_check(&self, req: AgentRunRequest) -> Result { + // Proceed only once all checks have reached this point concurrently. + let interleaved = tokio::time::timeout(Duration::from_secs(5), self.barrier.wait()) + .await + .is_ok(); + Ok(AgentOutcome { + verdict: interleaved.then(|| CheckReport { + success: true, + evidence: Some(format!("check {} ran concurrently", req.check_id)), + }), + stop_reason: Some("interleaving probe".into()), + turns: 1, + error: None, + }) + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn checks_execute_concurrently_not_in_a_barrier() { + // Two independent requirements (two checks total). The default concurrency + // is 2, so both should run at once. + let dir = TempDir::new().unwrap(); + fs::write( + dir.path().join("CHECKS.md"), + "# Requirement One\ncheck one\n\n# Requirement Two\ncheck two\n", + ) + .unwrap(); + let reqs = discover(dir.path()).await.unwrap(); + assert_eq!(reqs.len(), 2); + + let executor = Arc::new(InterleavingExecutor { + barrier: Arc::new(Barrier::new(2)), + }); + let cfg = configuration(); + assert_eq!(cfg.concurrency, 2, "test assumes a concurrency of 2"); + + let outcomes = run_to_outcomes(&cfg, executor, Arc::new(NoopSandbox), dir.path(), &reqs) + .await + .unwrap(); + + // Both satisfied ⇒ both ran simultaneously (the barrier tripped). + assert!( + outcomes.iter().all(|o| o.satisfied), + "checks did not interleave: {outcomes:?}" + ); + let code = report(&plain_terminal(), &outcomes).unwrap(); + assert_eq!(code, 0); +} + +#[tokio::test] +async fn invalid_suite_aborts_run_without_spawning_agents() { + // An orphan `## Check` (no preceding requirement) makes the suite invalid. + let dir = TempDir::new().unwrap(); + fs::write( + dir.path().join("CHECKS.md"), + "## Check Orphan\nno requirement above me\n", + ) + .unwrap(); + + // Drive the *full* pipeline (with the discovery actor) so the strict + // whole-run abort path is exercised end-to-end. + let fake = Arc::new(FakeExecutor::new()); + let cfg = configuration(); + let result = run_pipeline(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path()).await; + + // The run aborts as an `Err` diagnostic (so CI distinguishes "tool errored" + // from "checks failed"), and — crucially — no agent was ever spawned. + let err = result.expect_err("an invalid suite must abort the run"); + assert!(format!("{err:?}").contains("orphan"), "got: {err:?}"); + assert!( + fake.seen().is_empty(), + "no agents should run for an invalid suite, saw: {:?}", + fake.seen() + ); +} diff --git a/src/checks/execution.rs b/src/checks/execution.rs index 4a75fa9..be31433 100644 --- a/src/checks/execution.rs +++ b/src/checks/execution.rs @@ -1,134 +1,178 @@ -//! The execution phase (M5; in-process rework in MULTI-1367). +//! The execution phase (M5; in-process rework in MULTI-1367; actor rework in +//! MULTI-1368). //! -//! Run every check **in parallel** (bounded), each in its own CoW sandbox, via -//! an in-process agent that reports its verdict through a per-check judge tool; -//! then reconcile each check's verdict (the reported `success` is authoritative) -//! and aggregate checks into per-requirement outcomes via logical AND. +//! [`ExecutionActor`] receives one [`CheckDiscovered`] per validated check and +//! **immediately** offloads the (long-running) agent run onto a spawned task — +//! never `await`-ing it inline, since a Kameo actor processes one message at a +//! time to completion and awaiting here would serialize the pipeline. Each task +//! creates a CoW sandbox, runs the check's agent via the injected +//! [`CheckExecutor`] (which returns the verdict inline), drops the sandbox, and +//! either forwards a [`CheckCompleted`] to reporting or — if the agent did not +//! report — asks the actor to [`RetryCheck`] it. +//! +//! Bounded concurrency is enforced by a shared [`Semaphore`] acquired *inside* +//! each task (the semaphore is the cap, not the mailbox depth — decision #5). +//! Retries (`cfg.max_attempts`) are reframed as self-messages. -use std::collections::HashMap; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use miette::{IntoDiagnostic, Result}; +use kameo::Actor; +use kameo::actor::ActorRef; +use kameo::message::{Context, Message}; +use miette::Result; use tokio::sync::Semaphore; -use tokio::task::JoinSet; -use crate::checks::config::Config; -use crate::checks::executor::{ - AgentOutcome, AgentRunRequest, BoxedExecutor, CheckExecutor, CheckReport, -}; -use crate::checks::model::{ - Check, CheckId, CheckOutcome, Requirement, RequirementOutcome, Verdict, +use crate::checks::executor::{AgentOutcome, CheckExecutor, CheckReport}; +use crate::checks::messages::{ + CheckCompleted, CheckDiscovered, CheckJob, DiscoveryComplete, ExecutionComplete, RetryCheck, }; +use crate::checks::model::{Check, CheckId, CheckOutcome, Verdict}; +use crate::checks::reporting::ReportingActor; use crate::checks::sandbox::Sandbox; -/// A single check flattened out of the requirement set for execution, tagged -/// with its run-unique id and the requirement it rolls up into. -struct PlannedCheck { - id: CheckId, - req_index: usize, - check: Check, +/// The execution actor: turns a stream of discovered checks into a stream of +/// completed checks, fanning agent runs out onto bounded background tasks. +pub(crate) struct ExecutionActor { + executor: Arc, + sandbox: Arc, + working_dir: PathBuf, + /// The concurrency cap, shared into every spawned task. + semaphore: Arc, + /// How many times to (re)run a check whose agent fails to report (≥1). + max_attempts: usize, + /// The downstream reporting actor. + reporting: ActorRef, } -/// Convenience wrapper used by the pipeline orchestrator: build the sandbox from -/// configuration (DI) and run [`execute`] with the injected executor. -pub async fn execution_phase( - cfg: &Config, - executor: BoxedExecutor, - working_dir: &Path, - requirements: &[Requirement], -) -> Result> { - let executor: Arc = Arc::from(executor); - let sandbox: Arc = - Arc::from(crate::checks::sandbox::select_sandbox()); - execute(cfg, executor, sandbox, working_dir, requirements).await +impl Actor for ExecutionActor { + type Args = Self; + type Error = std::convert::Infallible; + + async fn on_start( + args: Self::Args, + _actor_ref: ActorRef, + ) -> std::result::Result { + Ok(args) + } } -/// Run all checks and produce per-requirement outcomes. -/// -/// `executor` and `sandbox` are injected so tests can substitute fakes. -pub async fn execute( - cfg: &Config, - executor: Arc, - sandbox: Arc, - working_dir: &Path, - requirements: &[Requirement], -) -> Result> { - // Flatten checks, assigning each a run-unique id. - let mut planned: Vec = Vec::new(); - for (req_index, req) in requirements.iter().enumerate() { - for check in &req.checks { - planned.push(PlannedCheck { - id: planned.len(), - req_index, - check: check.clone(), - }); +impl ExecutionActor { + /// Build the actor. `concurrency` and `max_attempts` are clamped to ≥1. + pub(crate) fn new( + executor: Arc, + sandbox: Arc, + working_dir: PathBuf, + concurrency: usize, + max_attempts: usize, + reporting: ActorRef, + ) -> Self { + Self { + executor, + sandbox, + working_dir, + semaphore: Arc::new(Semaphore::new(concurrency.max(1))), + max_attempts: max_attempts.max(1), + reporting, } } - // Nothing to run: every requirement trivially aggregates (empty AND = true, - // though validation guarantees ≥1 check in practice). - if planned.is_empty() { - return Ok(aggregate(requirements, HashMap::new())); + /// Offload one attempt of `job` onto a background task. Returns immediately, + /// keeping the actor mailbox responsive; the permit is acquired *inside* the + /// task so the semaphore — not the mailbox — is the concurrency cap. + fn dispatch(&self, ctx: &mut Context, job: CheckJob, attempt: usize) { + let executor = self.executor.clone(); + let sandbox = self.sandbox.clone(); + let semaphore = self.semaphore.clone(); + let working_dir = self.working_dir.clone(); + let reporting = self.reporting.clone(); + let me = ctx.actor_ref().clone(); + + tokio::spawn(async move { + // Acquire the permit inside the task. If the semaphore was closed the + // pipeline is tearing down, so just drop the work. + let Ok(_permit) = semaphore.acquire_owned().await else { + return; + }; + + let result = run_one(executor, sandbox, job.id, job.check.clone(), &working_dir).await; + + if has_verdict(Some(&result)) { + // The agent reported: reconcile and forward straight to reporting. + let outcome = reconcile(Some(&result), &job.check.title); + if let Err(err) = reporting.tell(CheckCompleted { job, outcome }).await { + tracing::debug!(?err, "reporting actor unavailable for completed check"); + } + } else if let Err(err) = me + .tell(RetryCheck { + job, + attempt, + last: result, + }) + .await + { + tracing::debug!(?err, "execution actor unavailable for retry"); + } + }); } - // The most recent agent outcome per check. The reported verdict lives in - // `AgentOutcome::verdict`. - let mut last_outcome: HashMap> = HashMap::new(); - - // Re-run any check whose agent fails to report a verdict, up to - // `max_attempts`. Agents are nondeterministic and occasionally hit the turn - // cap, error, or time out without reporting; a fresh attempt usually - // succeeds. - let mut pending: Vec<&PlannedCheck> = planned.iter().collect(); - let attempts = cfg.max_attempts.max(1); - for attempt in 1..=attempts { - if pending.is_empty() { - break; + /// Forward a terminal (attempts-exhausted, no-verdict) check to reporting as + /// an errored outcome. + async fn finish_errored(&self, job: CheckJob, last: Result) { + let outcome = reconcile(Some(&last), &job.check.title); + if let Err(err) = self.reporting.tell(CheckCompleted { job, outcome }).await { + tracing::debug!(?err, "reporting actor unavailable for errored check"); } - if attempt > 1 { + } +} + +impl Message for ExecutionActor { + type Reply = (); + + async fn handle(&mut self, msg: CheckDiscovered, ctx: &mut Context) -> Self::Reply { + self.dispatch(ctx, msg.job, 1); + } +} + +impl Message for ExecutionActor { + type Reply = (); + + async fn handle(&mut self, msg: RetryCheck, ctx: &mut Context) -> Self::Reply { + let RetryCheck { job, attempt, last } = msg; + if attempt < self.max_attempts { tracing::info!( - attempt, - checks = pending.len(), - "retrying checks whose agent did not report" + check = job.id, + attempt = attempt + 1, + "retrying check whose agent did not report" ); + self.dispatch(ctx, job, attempt + 1); + } else { + self.finish_errored(job, last).await; } - - // Dispatch the pending checks concurrently, bounded by the limit. - let permits = Arc::new(Semaphore::new(cfg.concurrency.max(1))); - let mut set: JoinSet<(CheckId, Result)> = JoinSet::new(); - for p in &pending { - let permit = permits.clone().acquire_owned().await.into_diagnostic()?; - let executor = executor.clone(); - let sandbox = sandbox.clone(); - let id = p.id; - let check = p.check.clone(); - let working_dir = working_dir.to_path_buf(); - set.spawn(async move { - let _permit = permit; - let outcome = run_one(executor, sandbox, id, check, &working_dir).await; - (id, outcome) - }); - } - while let Some(joined) = set.join_next().await { - let (id, outcome) = joined.into_diagnostic()?; - last_outcome.insert(id, outcome); - } - - // Whatever still has no reported verdict is retried in the next round. - pending = planned - .iter() - .filter(|p| !has_verdict(last_outcome.get(&p.id))) - .collect(); } +} + +impl Message for ExecutionActor { + type Reply = (); - // Reconcile each check, then aggregate per requirement. - let mut outcomes: HashMap = HashMap::new(); - for p in &planned { - let outcome = reconcile(last_outcome.get(&p.id), &p.check.title); - outcomes.insert(p.id, outcome); + async fn handle( + &mut self, + msg: DiscoveryComplete, + _ctx: &mut Context, + ) -> Self::Reply { + // Tell reporting how many completed checks to expect. Reporting gates + // finalization on its own count, so this is race-free regardless of + // whether agents have settled yet. + if let Err(err) = self + .reporting + .tell(ExecutionComplete { + total_checks: msg.total_checks, + }) + .await + { + tracing::debug!(?err, "reporting actor unavailable for execution-complete"); + } } - Ok(aggregate_planned(requirements, &planned, outcomes)) } /// Whether an agent outcome carries a reported verdict. @@ -149,7 +193,7 @@ async fn run_one( ) -> Result { let handle = sandbox.create(working_dir).await?; - let request = AgentRunRequest { + let request = crate::checks::executor::AgentRunRequest { check_id: id, check, working_dir: handle.path().to_path_buf(), @@ -219,47 +263,15 @@ fn turns_suffix(turns: u32) -> String { } } -/// Aggregate when there are reconciled per-check outcomes. -fn aggregate_planned( - requirements: &[Requirement], - planned: &[PlannedCheck], - mut outcomes: HashMap, -) -> Vec { - let mut buckets: Vec> = vec![Vec::new(); requirements.len()]; - for p in planned { - if let Some(outcome) = outcomes.remove(&p.id) { - buckets[p.req_index].push(outcome); - } - } - requirements - .iter() - .zip(buckets) - .map(|(req, checks)| { - RequirementOutcome::aggregate(req.title.clone(), req.filepath.clone(), checks) - }) - .collect() -} - -/// Aggregate when there are no checks to run (degenerate suites). -fn aggregate( - requirements: &[Requirement], - _outcomes: HashMap, -) -> Vec { - requirements - .iter() - .map(|req| { - RequirementOutcome::aggregate(req.title.clone(), req.filepath.clone(), Vec::new()) - }) - .collect() -} - #[cfg(test)] mod tests { - use super::*; use crate::checks::config::configuration; use crate::checks::executor::FakeExecutor; + use crate::checks::model::{Check, Requirement, Verdict}; + use crate::checks::run_to_outcomes; use crate::checks::sandbox::NoopSandbox; use std::path::PathBuf; + use std::sync::Arc; fn req(title: &str, checks: Vec<(&str, &str)>) -> Requirement { Requirement { @@ -290,7 +302,7 @@ mod tests { .with_report(2, false, Some("c failed")), ); let cfg = configuration(); - let out = execute( + let out = run_to_outcomes( &cfg, executor.clone(), Arc::new(NoopSandbox), @@ -318,7 +330,7 @@ mod tests { let executor = FakeExecutor::new().with_silent(0); let mut cfg = configuration(); cfg.max_attempts = 1; - let out = execute( + let out = run_to_outcomes( &cfg, Arc::new(executor), Arc::new(NoopSandbox), @@ -330,4 +342,25 @@ mod tests { assert!(!out[0].satisfied); assert_eq!(out[0].check_outcomes[0].verdict, Verdict::Errored); } + + #[tokio::test] + async fn check_is_retried_until_it_reports() { + // A check that is silent on its first attempt but reports on a later one + // must end up satisfied — exercising the retry self-message path. + let reqs = vec![req("Eventually", vec![("c", "prompt")])]; + let executor = Arc::new(FakeExecutor::new().with_silent_until(0, 2, true, Some("ok now"))); + let cfg = configuration(); // max_attempts = 3 + let out = run_to_outcomes( + &cfg, + executor.clone(), + Arc::new(NoopSandbox), + &PathBuf::from("."), + &reqs, + ) + .await + .unwrap(); + assert!(out[0].satisfied); + // Ran twice: one silent attempt, then one reporting attempt. + assert_eq!(executor.seen().len(), 2); + } } diff --git a/src/checks/executor/fake.rs b/src/checks/executor/fake.rs index e701551..7eaa68f 100644 --- a/src/checks/executor/fake.rs +++ b/src/checks/executor/fake.rs @@ -18,6 +18,10 @@ pub struct FakeExecutor { scripted: HashMap, /// Check ids that should simulate an agent finishing without reporting. silent: HashSet, + /// Check ids that stay silent until the Nth attempt, then report. Keyed by + /// id to `(report_on_attempt, report)`. Exercises the retry path: the same + /// `CheckId` is re-run, so the fake counts attempts per id. + silent_until: HashMap, seen: Mutex>, } @@ -44,6 +48,28 @@ impl FakeExecutor { self } + /// Make `id` stay silent until its `report_on_attempt`-th run (1-based), then + /// report `success`/`evidence`. Used to drive the retry path deterministically. + pub fn with_silent_until( + mut self, + id: CheckId, + report_on_attempt: usize, + success: bool, + evidence: Option<&str>, + ) -> Self { + self.silent_until.insert( + id, + ( + report_on_attempt, + CheckReport { + success, + evidence: evidence.map(str::to_string), + }, + ), + ); + self + } + /// The check ids the fake was asked to run, in call order. pub fn seen(&self) -> Vec { self.seen.lock().unwrap().clone() @@ -53,7 +79,12 @@ impl FakeExecutor { #[async_trait] impl CheckExecutor for FakeExecutor { async fn run_check(&self, req: AgentRunRequest) -> Result { - self.seen.lock().unwrap().push(req.check_id); + let attempt = { + let mut seen = self.seen.lock().unwrap(); + seen.push(req.check_id); + seen.iter().filter(|id| **id == req.check_id).count() + }; + if self.silent.contains(&req.check_id) { return Ok(AgentOutcome { verdict: None, @@ -62,6 +93,18 @@ impl CheckExecutor for FakeExecutor { error: None, }); } + + // Silent until the configured attempt, then report. + if let Some((report_on, report)) = self.silent_until.get(&req.check_id) { + let verdict = (attempt >= *report_on).then(|| report.clone()); + return Ok(AgentOutcome { + verdict, + stop_reason: Some("fake: silent-until".into()), + turns: 1, + error: None, + }); + } + Ok(AgentOutcome { verdict: self.scripted.get(&req.check_id).cloned(), stop_reason: Some("fake: reported".into()), diff --git a/src/checks/messages.rs b/src/checks/messages.rs new file mode 100644 index 0000000..14e1aca --- /dev/null +++ b/src/checks/messages.rs @@ -0,0 +1,81 @@ +//! The messages exchanged between the three pipeline actors (MULTI-1368). +//! +//! The pipeline is wired as a one-way, fire-and-forget chain — each edge is an +//! [`ActorRef::tell`](kameo::actor::ActorRef::tell), never `ask`: +//! +//! ```text +//! DiscoveryActor --CheckDiscovered--> ExecutionActor --CheckCompleted--> ReportingActor +//! ``` +//! +//! With the old `Vec`-collection barriers gone, "done" is no longer implicit, so +//! each stage carries an explicit end-of-stream sentinel ([`DiscoveryComplete`] / +//! [`ExecutionComplete`]) so the next stage knows how many items to expect. + +use std::path::PathBuf; + +use miette::Result; + +use crate::checks::executor::AgentOutcome; +use crate::checks::model::{Check, CheckId, CheckOutcome}; + +/// One validated check, ready to run, carried end-to-end through the pipeline so +/// reporting can group + order results without consulting the original suite. +#[derive(Clone)] +pub struct CheckJob { + /// Run-unique id (also the cersei `session_id` suffix `multi-check-{id}`). + pub id: CheckId, + /// Which requirement (in declaration order) this check rolls up into. + pub req_index: usize, + /// The requirement's title, for the final per-requirement render. + pub req_title: String, + /// The `CHECKS.md` that declared the requirement. + pub filepath: PathBuf, + /// The check itself (title + prompt). + pub check: Check, +} + +/// Coordinator -> Discovery: kick off the walk/parse/validate. +pub struct BeginDiscovery; + +/// Discovery -> Execution: a single validated check is ready to execute. +pub struct CheckDiscovered { + pub job: CheckJob, +} + +/// Discovery -> Execution: discovery finished; exactly `total_checks` were +/// streamed (the end-of-stream sentinel for the discovery→execution edge). +pub struct DiscoveryComplete { + pub total_checks: usize, +} + +/// Execution self-message: a spawned agent run finished without reporting a +/// verdict; re-enqueue the same check (reframing the old retry loop as messages). +pub struct RetryCheck { + pub job: CheckJob, + /// The 1-based attempt number that just finished without a verdict. + pub attempt: usize, + /// The outcome of that attempt, kept so the terminal (attempts-exhausted) + /// case can synthesize an accurate "errored" reason. + pub last: Result, +} + +/// Execution -> Reporting: a single check reached a terminal verdict. +pub struct CheckCompleted { + pub job: CheckJob, + /// The reconciled verdict + evidence for this check. + pub outcome: CheckOutcome, +} + +/// Execution -> Reporting: how many checks Reporting should expect in total (the +/// end-of-stream sentinel for the execution→reporting edge). Reporting finalizes +/// once it has folded exactly this many [`CheckCompleted`]s. +pub struct ExecutionComplete { + pub total_checks: usize, +} + +/// Discovery -> Reporting: the suite contained an invalid `CHECKS.md`; abort the +/// whole run (strict whole-run abort, decision #3). No checks were ever streamed, +/// so no agents are spawned. +pub struct DiscoveryFailed { + pub report: miette::Report, +} diff --git a/src/checks/mod.rs b/src/checks/mod.rs index 00ae9c3..bb6f77a 100644 --- a/src/checks/mod.rs +++ b/src/checks/mod.rs @@ -1,18 +1,34 @@ //! `multi check`: validate declared non-functional ("ility") requirements by -//! running AI-agent checks, reporting results through an in-process MCP server. +//! running AI-agent checks. //! -//! The feature is a four-phase pipeline, each phase living in its own submodule: +//! The feature is a **streaming actor pipeline** (MULTI-1368): three [Kameo] +//! actors wired one-way with fire-and-forget `tell`, so work flows through +//! incrementally rather than across `Vec`-collection barriers — a check begins +//! executing the moment discovery has validated it, and a result is folded into +//! reporting the moment its agent returns. +//! +//! ```text +//! DiscoveryActor --CheckDiscovered--> ExecutionActor --CheckCompleted--> ReportingActor +//! ``` +//! +//! The phases, each in its own submodule: //! //! 1. [`config`] — the (hardcoded, dependency-injected) configuration phase. -//! 2. [`discovery`] — find/parse `CHECKS.md` files into a `Vec`. +//! 2. [`discovery`] — find/parse/validate `CHECKS.md` files, then stream each +//! validated check downstream (strict whole-run abort on any invalid file). //! 3. [`execution`] — run each check in a CoW [`sandbox`] via a boxed -//! [`executor`], capturing verdicts through each agent's in-process judge tool. -//! 4. [`reporting`] — render verdicts and produce the process exit code. +//! [`executor`], offloaded onto bounded background tasks; capture verdicts +//! through each agent's in-process judge tool. +//! 4. [`reporting`] — fold verdicts incrementally, render, and produce the exit +//! code. +//! +//! [Kameo]: https://docs.rs/kameo pub mod config; mod discovery; mod execution; pub mod executor; +mod messages; pub mod model; mod reporting; pub mod sandbox; @@ -21,11 +37,21 @@ pub mod sandbox; mod e2e; use std::path::Path; +use std::sync::Arc; -use miette::Result; +use kameo::actor::{ActorRef, Spawn}; +use miette::{IntoDiagnostic, Result, miette}; +use tokio::sync::oneshot; use crate::Terminal; -use crate::checks::config::CliOverrides; +use crate::checks::config::{CliOverrides, Config}; +use crate::checks::discovery::DiscoveryActor; +use crate::checks::execution::ExecutionActor; +use crate::checks::executor::CheckExecutor; +use crate::checks::messages::{BeginDiscovery, CheckDiscovered, CheckJob, DiscoveryComplete}; +use crate::checks::model::{Requirement, RequirementOutcome}; +use crate::checks::reporting::{ReportingActor, RunResult}; +use crate::checks::sandbox::Sandbox; /// Run the full `multi check` pipeline rooted at `working_dir`. /// @@ -46,15 +72,136 @@ pub async fn run(terminal: &Terminal, working_dir: &Path, overrides: CliOverride "resolved checks configuration and provider registry", ); - // Phase 2: discovery. - let requirements = discovery::discover(working_dir).await?; + // Build the injected executor (default: in-process cersei) and sandbox. + let executor: Arc = Arc::from(resolved.build_executor()?); + let sandbox: Arc = Arc::from(sandbox::select_sandbox()); - // Phase 3: execution — the selected executor is built from config and - // injected (default: the in-process cersei agent). - let executor = resolved.build_executor()?; - let outcomes = - execution::execution_phase(&resolved.config, executor, working_dir, &requirements).await?; - - // Phase 4: reporting + exit code. + // Phases 2–4: drive the actor pipeline to its terminal result, then render. + let outcomes = run_pipeline(&resolved.config, executor, sandbox, working_dir).await?; reporting::report(terminal, &outcomes) } + +/// Spawn the reporting + execution actors and return their refs plus the channel +/// the terminal result arrives on. Refs must be kept alive by the caller until +/// the result is received (dropping the last ref stops the actor). +fn spawn_core( + cfg: &Config, + executor: Arc, + sandbox: Arc, + working_dir: &Path, +) -> ( + ActorRef, + ActorRef, + oneshot::Receiver, +) { + let (tx, rx) = oneshot::channel(); + let reporting = ReportingActor::spawn(ReportingActor::new(tx)); + let execution = ExecutionActor::spawn(ExecutionActor::new( + executor, + sandbox, + working_dir.to_path_buf(), + cfg.concurrency, + cfg.max_attempts, + reporting.clone(), + )); + (execution, reporting, rx) +} + +/// Stream a (validated) requirement set into the execution actor: one +/// [`CheckDiscovered`] per check — assigning each a run-unique [`CheckId`] from a +/// monotonic counter, in `(req_index, check)` declaration order — then a final +/// [`DiscoveryComplete`] sentinel. +/// +/// Shared by [`DiscoveryActor`] (after its parse-all gate) and the test harness. +/// +/// [`CheckId`]: crate::checks::model::CheckId +async fn stream_requirements( + execution: &ActorRef, + requirements: &[Requirement], +) -> Result<()> { + let mut id = 0; + let mut total = 0; + for (req_index, req) in requirements.iter().enumerate() { + for check in &req.checks { + let job = CheckJob { + id, + req_index, + req_title: req.title.clone(), + filepath: req.filepath.clone(), + check: check.clone(), + }; + execution + .tell(CheckDiscovered { job }) + .await + .map_err(|e| miette!("failed to enqueue discovered check: {e}"))?; + id += 1; + total += 1; + } + } + execution + .tell(DiscoveryComplete { + total_checks: total, + }) + .await + .map_err(|e| miette!("failed to signal discovery completion: {e}"))?; + Ok(()) +} + +/// Await the pipeline's terminal result, mapping a dropped channel (a dead +/// reporting actor) to a diagnostic so a crashed actor fails the run rather than +/// hanging it. +async fn await_result(rx: oneshot::Receiver) -> Result> { + match rx.await { + Ok(result) => result, + Err(_) => Err(miette!( + "the reporting actor terminated before producing a result" + )), + } +} + +/// Drive the full pipeline (all three actors) over `working_dir` and return the +/// ordered per-requirement outcomes (or the abort diagnostic from an invalid +/// suite). The actor refs are held alive until the terminal result arrives. +async fn run_pipeline( + cfg: &Config, + executor: Arc, + sandbox: Arc, + working_dir: &Path, +) -> Result> { + let (execution, reporting, rx) = spawn_core(cfg, executor, sandbox, working_dir); + let discovery = DiscoveryActor::spawn(DiscoveryActor::new( + working_dir.to_path_buf(), + execution.clone(), + reporting.clone(), + )); + discovery + .tell(BeginDiscovery) + .await + .into_diagnostic() + .map_err(|e| miette!("failed to start discovery: {e}"))?; + + let outcomes = await_result(rx).await; + // Keep refs alive across the await; dropping them earlier would stop the + // actors mid-run. + drop((discovery, execution, reporting)); + outcomes +} + +/// Test-only entrypoint: drive **execution + reporting** over an in-memory, +/// already-validated requirement set (bypassing discovery), returning the +/// ordered outcomes. Lets execution-phase tests inject fakes without writing +/// `CHECKS.md` files. +#[cfg(test)] +async fn run_to_outcomes( + cfg: &Config, + executor: Arc, + sandbox: Arc, + working_dir: &Path, + requirements: &[Requirement], +) -> Result> { + let (execution, reporting, rx) = spawn_core(cfg, executor, sandbox, working_dir); + stream_requirements(&execution, requirements).await?; + let outcomes = await_result(rx).await; + drop((execution, reporting)); + outcomes +} diff --git a/src/checks/reporting.rs b/src/checks/reporting.rs index d2c4fdf..558cce6 100644 --- a/src/checks/reporting.rs +++ b/src/checks/reporting.rs @@ -1,6 +1,14 @@ -//! The reporting phase + exit code (M6). +//! The reporting phase + exit code (M6; actor rework in MULTI-1368). //! -//! Renders each requirement's verdict and returns the process exit code: +//! [`ReportingActor`] folds each [`CheckCompleted`] into per-requirement +//! accumulators **incrementally** as agents return (no barrier). Because +//! streaming arrival is nondeterministic, it **buffers and sorts** by +//! `(req_index, check declaration order)` before producing the final +//! `Vec`, which it hands back to the coordinator over a +//! `oneshot`. The coordinator renders it through [`report`]. +//! +//! [`report`] renders each requirement's verdict and returns the process exit +//! code: //! //! * requirement title in **green** (satisfied) / **red** (not); //! * failing checks printed in **red** with their evidence; @@ -10,10 +18,139 @@ //! Output is routed through the existing [`Terminal`] and honors the global //! `--enable-colors` setting; with color disabled it degrades to plain text. +use std::collections::HashMap; +use std::path::PathBuf; + +use kameo::Actor; +use kameo::message::{Context, Message}; use miette::Result; +use tokio::sync::oneshot; use crate::Terminal; -use crate::checks::model::{CheckOutcome, RequirementOutcome}; +use crate::checks::messages::{CheckCompleted, DiscoveryFailed, ExecutionComplete}; +use crate::checks::model::{CheckId, CheckOutcome, RequirementOutcome}; + +/// The terminal result of a run: the ordered per-requirement outcomes, or an +/// abort diagnostic (an invalid suite from discovery). +pub(crate) type RunResult = Result>; + +/// Per-requirement accumulator. Checks arrive out of order (streaming), so each +/// is tagged with its [`CheckId`] and sorted back into declaration order at the +/// end (ids are assigned monotonically in `(req_index, check)` order). +struct ReqAccum { + title: String, + filepath: PathBuf, + checks: Vec<(CheckId, CheckOutcome)>, +} + +/// The terminal actor: folds streamed check outcomes into per-requirement +/// verdicts and, once it has seen every expected outcome, fires the ordered +/// result back to the coordinator. +pub(crate) struct ReportingActor { + /// Fires exactly once with the terminal result; `None` after it has fired. + result: Option>, + /// The total number of checks to expect (`None` until [`ExecutionComplete`]). + expected: Option, + /// How many [`CheckCompleted`]s have been folded so far. + received: usize, + /// Per-requirement accumulators, keyed by `req_index`. + accum: HashMap, +} + +impl Actor for ReportingActor { + type Args = Self; + type Error = std::convert::Infallible; + + async fn on_start( + args: Self::Args, + _actor_ref: kameo::actor::ActorRef, + ) -> std::result::Result { + Ok(args) + } +} + +impl ReportingActor { + /// Build the actor over the channel it will fire the terminal result on. + pub(crate) fn new(result: oneshot::Sender) -> Self { + Self { + result: Some(result), + expected: None, + received: 0, + accum: HashMap::new(), + } + } + + /// If the expected count is known and every outcome has arrived, build the + /// ordered outcomes and fire the result. Idempotent: only fires once. + fn try_finalize(&mut self) { + let done = matches!(self.expected, Some(total) if self.received >= total); + if !done { + return; + } + let Some(tx) = self.result.take() else { + return; + }; + let _ = tx.send(Ok(self.build_outcomes())); + } + + /// Drain the accumulators into a deterministic `Vec`: + /// requirements in `req_index` order, checks within each in declaration + /// (id) order. Matches the old `aggregate_planned` ordering byte-for-byte. + fn build_outcomes(&mut self) -> Vec { + let mut indices: Vec = self.accum.keys().copied().collect(); + indices.sort_unstable(); + indices + .into_iter() + .map(|i| { + let mut acc = self.accum.remove(&i).expect("index came from the map"); + acc.checks.sort_by_key(|(id, _)| *id); + let check_outcomes = acc.checks.into_iter().map(|(_, o)| o).collect(); + RequirementOutcome::aggregate(acc.title, acc.filepath, check_outcomes) + }) + .collect() + } +} + +impl Message for ReportingActor { + type Reply = (); + + async fn handle(&mut self, msg: CheckCompleted, _ctx: &mut Context) -> Self::Reply { + let CheckCompleted { job, outcome } = msg; + let acc = self.accum.entry(job.req_index).or_insert_with(|| ReqAccum { + title: job.req_title, + filepath: job.filepath, + checks: Vec::new(), + }); + acc.checks.push((job.id, outcome)); + self.received += 1; + self.try_finalize(); + } +} + +impl Message for ReportingActor { + type Reply = (); + + async fn handle( + &mut self, + msg: ExecutionComplete, + _ctx: &mut Context, + ) -> Self::Reply { + self.expected = Some(msg.total_checks); + self.try_finalize(); + } +} + +impl Message for ReportingActor { + type Reply = (); + + async fn handle(&mut self, msg: DiscoveryFailed, _ctx: &mut Context) -> Self::Reply { + // Strict whole-run abort: surface the suite's validation diagnostic and + // never produce a partial report. + if let Some(tx) = self.result.take() { + let _ = tx.send(Err(msg.report)); + } + } +} /// Render `outcomes` and return the exit code (0 = all satisfied, 1 = any not). pub fn report(terminal: &Terminal, outcomes: &[RequirementOutcome]) -> Result {