Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ failsafe = "1.3.0"
futures-core = "0.3.31"
futures-util = "0.3.31"
indexmap = { version = "2.1.0", features = ["serde"] }
kameo = "0.20"
miette = { workspace = true, features = ["fancy"] }
mockall = "0.13.1"
multitool-sdk = { git = "https://github.com/wack/multitool-rust-sdk.git", branch = "trunk" }
Expand Down
75 changes: 74 additions & 1 deletion src/checks/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
mod parse;
mod walk;

use std::path::Path;
use std::path::{Path, PathBuf};

use kameo::Actor;
use kameo::actor::ActorRef;
use kameo::message::{Context, Message};
use miette::{IntoDiagnostic, Result};
use multi_core::ManyError;

use crate::checks::execution::ExecutionActor;
use crate::checks::messages::{BeginDiscovery, DiscoveryFailed};
use crate::checks::model::Requirement;
use crate::checks::reporting::ReportingActor;

/// Run the full discovery phase rooted at `root`.
///
Expand Down Expand Up @@ -47,6 +53,73 @@ pub async fn discover(root: &Path) -> Result<Vec<Requirement>> {
Ok(requirements)
}

/// The discovery actor: on a [`BeginDiscovery`] kick it runs the whole-suite
/// parse + validation gate ([`discover`]) and then **streams** the validated
/// checks downstream — assigning each a run-unique [`CheckId`] and `tell`ing one
/// [`CheckDiscovered`] per check, followed by a [`DiscoveryComplete`] sentinel.
///
/// If the suite is invalid it streams **nothing** and instead tells reporting to
/// abort the whole run (strict whole-run abort, decision #3), so no agents are
/// ever spawned for a suite that will not run.
///
/// [`CheckId`]: crate::checks::model::CheckId
/// [`CheckDiscovered`]: crate::checks::messages::CheckDiscovered
/// [`DiscoveryComplete`]: crate::checks::messages::DiscoveryComplete
pub(crate) struct DiscoveryActor {
root: PathBuf,
execution: ActorRef<ExecutionActor>,
reporting: ActorRef<ReportingActor>,
}

impl Actor for DiscoveryActor {
type Args = Self;
type Error = std::convert::Infallible;

async fn on_start(
args: Self::Args,
_actor_ref: ActorRef<Self>,
) -> std::result::Result<Self, Self::Error> {
Ok(args)
}
}

impl DiscoveryActor {
pub(crate) fn new(
root: PathBuf,
execution: ActorRef<ExecutionActor>,
reporting: ActorRef<ReportingActor>,
) -> Self {
Self {
root,
execution,
reporting,
}
}
}

impl Message<BeginDiscovery> for DiscoveryActor {
type Reply = ();

async fn handle(&mut self, _msg: BeginDiscovery, _ctx: &mut Context<Self, ()>) -> Self::Reply {
match discover(&self.root).await {
Ok(requirements) => {
if let Err(err) =
crate::checks::stream_requirements(&self.execution, &requirements).await
{
tracing::error!(?err, "failed to stream discovered checks to execution");
}
}
Err(report) => {
// Strict whole-run abort: no `CheckDiscovered` is emitted, so no
// agents run; reporting turns this into the run's `Err`.
if let Err(err) = self.reporting.tell(DiscoveryFailed { report }).await {
tracing::error!(?err, "reporting actor unavailable for discovery failure");
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
117 changes: 107 additions & 10 deletions src/checks/e2e.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
//! End-to-end pipeline test (MULTI-1355).
//! End-to-end pipeline test (MULTI-1355; actor pipeline in MULTI-1368).
//!
//! Drives the real discovery → execution → reporting → exit-code pipeline over a
//! fixture tree, with the fake executor and a no-op sandbox injected so verdicts
//! are scripted. Runs deterministically in CI with no `claude`, network, or APFS
//! dependency, guarding the end-to-end contract against regressions.
//! Drives the real discovery → execution → reporting → exit-code actor pipeline
//! over a fixture tree, with the fake executor and a no-op sandbox injected so
//! verdicts are scripted. Runs deterministically in CI with no `claude`,
//! network, or APFS dependency, guarding the end-to-end contract against
//! regressions.

use std::fs;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use clap::Parser;
use miette::Result;
use tempfile::TempDir;
use tokio::sync::Barrier;

use crate::checks::config::configuration;
use crate::checks::discovery::discover;
use crate::checks::execution::execute;
use crate::checks::executor::FakeExecutor;
use crate::checks::executor::{
AgentOutcome, AgentRunRequest, CheckExecutor, CheckReport, FakeExecutor,
};
use crate::checks::reporting::report;
use crate::checks::sandbox::NoopSandbox;
use crate::checks::{run_pipeline, run_to_outcomes};
use crate::{Cli, Terminal};

/// A terminal with color forced off, so reporting emits deterministic text.
Expand Down Expand Up @@ -84,7 +91,7 @@ async fn pipeline_satisfied_failed_multi_and_anonymous() {
let fake = Arc::new(fake);

let cfg = configuration();
let outcomes = execute(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path(), &reqs)
let outcomes = run_to_outcomes(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path(), &reqs)
.await
.unwrap();

Expand Down Expand Up @@ -120,7 +127,7 @@ async fn all_satisfied_exits_zero() {

let fake = Arc::new(FakeExecutor::new().with_report(0, true, None));
let cfg = configuration();
let outcomes = execute(&cfg, fake, Arc::new(NoopSandbox), dir.path(), &reqs)
let outcomes = run_to_outcomes(&cfg, fake, Arc::new(NoopSandbox), dir.path(), &reqs)
.await
.unwrap();
assert!(outcomes[0].satisfied);
Expand All @@ -136,7 +143,7 @@ async fn empty_tree_exits_zero() {
assert!(reqs.is_empty());

let cfg = configuration();
let outcomes = execute(
let outcomes = run_to_outcomes(
&cfg,
Arc::new(FakeExecutor::new()),
Arc::new(NoopSandbox),
Expand All @@ -150,3 +157,93 @@ async fn empty_tree_exits_zero() {
let code = report(&plain_terminal(), &outcomes).unwrap();
assert_eq!(code, 0);
}

/// A check executor that only reports a verdict once *every* check in the suite
/// is running simultaneously: each run blocks on a shared [`Barrier`] sized to
/// the whole suite. If the pipeline executed checks one-at-a-time (a barrier
/// between phases, or a serialized actor handler), the barrier would never trip
/// and every run would time out into a no-verdict (errored) outcome. That all
/// checks instead come back satisfied is a direct demonstration that they
/// execute concurrently — i.e. a check starts the moment discovery streams it,
/// without waiting for the others.
struct InterleavingExecutor {
barrier: Arc<Barrier>,
}

#[async_trait]
impl CheckExecutor for InterleavingExecutor {
async fn run_check(&self, req: AgentRunRequest) -> Result<AgentOutcome> {
// Proceed only once all checks have reached this point concurrently.
let interleaved = tokio::time::timeout(Duration::from_secs(5), self.barrier.wait())
.await
.is_ok();
Ok(AgentOutcome {
verdict: interleaved.then(|| CheckReport {
success: true,
evidence: Some(format!("check {} ran concurrently", req.check_id)),
}),
stop_reason: Some("interleaving probe".into()),
turns: 1,
error: None,
})
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn checks_execute_concurrently_not_in_a_barrier() {
// Two independent requirements (two checks total). The default concurrency
// is 2, so both should run at once.
let dir = TempDir::new().unwrap();
fs::write(
dir.path().join("CHECKS.md"),
"# Requirement One\ncheck one\n\n# Requirement Two\ncheck two\n",
)
.unwrap();
let reqs = discover(dir.path()).await.unwrap();
assert_eq!(reqs.len(), 2);

let executor = Arc::new(InterleavingExecutor {
barrier: Arc::new(Barrier::new(2)),
});
let cfg = configuration();
assert_eq!(cfg.concurrency, 2, "test assumes a concurrency of 2");

let outcomes = run_to_outcomes(&cfg, executor, Arc::new(NoopSandbox), dir.path(), &reqs)
.await
.unwrap();

// Both satisfied ⇒ both ran simultaneously (the barrier tripped).
assert!(
outcomes.iter().all(|o| o.satisfied),
"checks did not interleave: {outcomes:?}"
);
let code = report(&plain_terminal(), &outcomes).unwrap();
assert_eq!(code, 0);
}

#[tokio::test]
async fn invalid_suite_aborts_run_without_spawning_agents() {
// An orphan `## Check` (no preceding requirement) makes the suite invalid.
let dir = TempDir::new().unwrap();
fs::write(
dir.path().join("CHECKS.md"),
"## Check Orphan\nno requirement above me\n",
)
.unwrap();

// Drive the *full* pipeline (with the discovery actor) so the strict
// whole-run abort path is exercised end-to-end.
let fake = Arc::new(FakeExecutor::new());
let cfg = configuration();
let result = run_pipeline(&cfg, fake.clone(), Arc::new(NoopSandbox), dir.path()).await;

// The run aborts as an `Err` diagnostic (so CI distinguishes "tool errored"
// from "checks failed"), and — crucially — no agent was ever spawned.
let err = result.expect_err("an invalid suite must abort the run");
assert!(format!("{err:?}").contains("orphan"), "got: {err:?}");
assert!(
fake.seen().is_empty(),
"no agents should run for an invalid suite, saw: {:?}",
fake.seen()
);
}
Loading