diff --git a/src/audit/scanner.rs b/src/audit/scanner.rs index bd61690..17769d0 100644 --- a/src/audit/scanner.rs +++ b/src/audit/scanner.rs @@ -190,29 +190,22 @@ pub async fn run_truffle_scan( json: bool, target_repos: Option>, ) -> Result<(TruffleStatistics, HygieneStatistics)> { - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + // Pass target_repos to init_command for optimized filtering during discovery + let (start_time, repos) = init_command(SCANNING_MESSAGE, target_repos.clone()).await; if repos.is_empty() { + if target_repos.is_some() { + println!("\r❌ No matching repositories found"); + set_terminal_title_and_flush("✅ repos"); + return Ok((TruffleStatistics::new(), HygieneStatistics::new())); + } + println!("\r{NO_REPOS_MESSAGE}"); set_terminal_title_and_flush("✅ repos"); return Ok((TruffleStatistics::new(), HygieneStatistics::new())); } - // Filter repositories if specific targets are specified - let repos_to_scan = if let Some(targets) = target_repos { - repos - .into_iter() - .filter(|(name, _)| targets.contains(name)) - .collect() - } else { - repos - }; - - if repos_to_scan.is_empty() { - println!("\r❌ No matching repositories found"); - set_terminal_title_and_flush("✅ repos"); - return Ok((TruffleStatistics::new(), HygieneStatistics::new())); - } + let repos_to_scan = repos; let total_repos = repos_to_scan.len(); let repo_word = if total_repos == 1 { diff --git a/src/commands/config.rs b/src/commands/config.rs index 3cc0624..88203e5 100644 --- a/src/commands/config.rs +++ b/src/commands/config.rs @@ -227,7 +227,7 @@ pub async fn handle_config_command(args: ConfigArgs) -> Result<()> { args }; - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); diff --git a/src/commands/publish/mod.rs b/src/commands/publish/mod.rs index 1ab557d..05d5f23 100644 --- a/src/commands/publish/mod.rs +++ b/src/commands/publish/mod.rs @@ -22,7 +22,7 @@ pub async fn handle_publish_command( ) -> Result<()> { set_terminal_title("📦 repos"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); diff --git a/src/commands/staging.rs b/src/commands/staging.rs index a24300b..6533598 100644 --- a/src/commands/staging.rs +++ b/src/commands/staging.rs @@ -27,7 +27,7 @@ pub async fn handle_stage_command(pattern: String) -> Result<()> { // Set terminal title to indicate repos is running set_terminal_title("🚀 repos stage"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); @@ -71,7 +71,7 @@ pub async fn handle_unstage_command(pattern: String) -> Result<()> { // Set terminal title to indicate repos is running set_terminal_title("🚀 repos unstage"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); @@ -115,7 +115,7 @@ pub async fn handle_staging_status_command() -> Result<()> { // Set terminal title to indicate repos is running set_terminal_title("🚀 repos status"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); @@ -364,7 +364,7 @@ pub async fn handle_commit_command(message: String, include_empty: bool) -> Resu // Set terminal title to indicate repos is running set_terminal_title("🚀 repos commit"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); diff --git a/src/commands/sync.rs b/src/commands/sync.rs index b6d612d..a497e66 100644 --- a/src/commands/sync.rs +++ b/src/commands/sync.rs @@ -27,7 +27,7 @@ pub async fn handle_push_command( // Set terminal title to indicate repos is running set_terminal_title("🚀 repos"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); @@ -70,7 +70,7 @@ pub async fn handle_push_command( // Check for subrepo drift unless explicitly skipped if !no_drift_check { - check_and_display_drift(); + check_and_display_drift().await; } // Set terminal title to green checkbox to indicate completion @@ -342,9 +342,9 @@ async fn process_push_repositories( } /// Check for subrepo drift and display concise summary -fn check_and_display_drift() { +async fn check_and_display_drift() { // Try to analyze subrepos - if it fails (e.g., no subrepos), silently skip - if let Ok(statuses) = crate::subrepo::status::analyze_subrepos() { + if let Ok(statuses) = crate::subrepo::status::analyze_subrepos().await { // Only display if there's drift to report if statuses.iter().any(|s| s.has_drift) { crate::subrepo::status::display_drift_summary(&statuses); @@ -366,7 +366,7 @@ pub async fn handle_pull_command( // Set terminal title to indicate repos is running set_terminal_title("🔽 repos"); - let (start_time, repos) = init_command(SCANNING_MESSAGE).await; + let (start_time, repos) = init_command(SCANNING_MESSAGE, None).await; if repos.is_empty() { println!("\r{NO_REPOS_MESSAGE}"); @@ -410,7 +410,7 @@ pub async fn handle_pull_command( // Check for subrepo drift unless explicitly skipped if !no_drift_check { - check_and_display_drift(); + check_and_display_drift().await; } // Set terminal title to green checkbox to indicate completion diff --git a/src/core/api.rs b/src/core/api.rs index 6eaa875..17db04c 100644 --- a/src/core/api.rs +++ b/src/core/api.rs @@ -12,7 +12,7 @@ //! //! ```rust,no_run //! use goobits_repos::core::find_repos_from_path; -//! let repos = find_repos_from_path("/path/to/search"); +//! let repos = find_repos_from_path("/path/to/search", None); //! ``` // Core types diff --git a/src/core/discovery.rs b/src/core/discovery.rs index 74606f8..d8b2e8f 100644 --- a/src/core/discovery.rs +++ b/src/core/discovery.rs @@ -36,7 +36,10 @@ fn is_git_file(path: &Path) -> bool { /// This function uses parallel directory walking for significantly better performance /// with large directory trees (5-10x faster than sequential walking). /// Uses `DashMap` for lock-free concurrent access, eliminating mutex contention. -pub fn find_repos_from_path(search_path: impl AsRef) -> Vec<(String, PathBuf)> { +pub fn find_repos_from_path( + search_path: impl AsRef, + filter: Option<&[String]>, +) -> Vec<(String, PathBuf)> { let search_path = search_path.as_ref(); // Use DashMap for lock-free concurrent access (20-40% faster than Mutex) @@ -47,6 +50,9 @@ pub fn find_repos_from_path(search_path: impl AsRef) -> Vec<(String, PathB let name_counts = Arc::new(DashMap::with_capacity(ESTIMATED_REPO_COUNT)); let search_path_buf = search_path.to_path_buf(); + // Convert filter to Arc for sharing across threads + let filter = filter.map(|f| Arc::new(f.to_vec())); + // Build parallel walker with optimizations let walker = WalkBuilder::new(search_path) .follow_links(true) // Follow symlinks to find symlinked repos @@ -75,6 +81,7 @@ pub fn find_repos_from_path(search_path: impl AsRef) -> Vec<(String, PathB let repos_map = Arc::clone(&repos_map); let name_counts = Arc::clone(&name_counts); let search_path_buf = search_path_buf.clone(); + let filter = filter.clone(); Box::new(move |result| { use ignore::WalkState; @@ -141,6 +148,12 @@ pub fn find_repos_from_path(search_path: impl AsRef) -> Vec<(String, PathB } }; + if let Some(ref f) = filter { + if !f.contains(&repo_name) { + return WalkState::Continue; + } + } + entry.insert(repo_name); } } @@ -172,20 +185,23 @@ pub fn find_repos_from_path(search_path: impl AsRef) -> Vec<(String, PathB /// /// This is a convenience wrapper around `find_repos_from_path()` that searches /// from the current working directory. -pub fn find_repos() -> Vec<(String, PathBuf)> { - find_repos_from_path(".") +pub fn find_repos(filter: Option<&[String]>) -> Vec<(String, PathBuf)> { + find_repos_from_path(".", filter) } /// Common initialization for commands that scan repositories #[must_use] -pub async fn init_command(scanning_msg: &str) -> (std::time::Instant, Vec<(String, PathBuf)>) { +pub async fn init_command( + scanning_msg: &str, + filter: Option>, +) -> (std::time::Instant, Vec<(String, PathBuf)>) { println!(); print!("{scanning_msg}"); // Flush stdout - ignore errors as this is non-critical let _ = std::io::stdout().flush(); let start_time = std::time::Instant::now(); - let repos = tokio::task::spawn_blocking(find_repos) + let repos = tokio::task::spawn_blocking(move || find_repos(filter.as_deref())) .await .unwrap_or_else(|e| { eprintln!("Error in repository discovery: {e}"); @@ -308,7 +324,7 @@ mod tests { } // Run discovery - let repos = find_repos_from_path(root); + let repos = find_repos_from_path(root, None); assert_eq!(repos.len(), 3); diff --git a/src/lib.rs b/src/lib.rs index 1b777b4..ef7977a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,7 @@ //! //! #[tokio::main] //! async fn main() { -//! let repos = find_repos_from_path("."); +//! let repos = find_repos_from_path(".", None); //! for (name, path) in repos { //! println!("{}: {}", name, path.display()); //! } diff --git a/src/main.rs b/src/main.rs index 6411e8e..0e48240 100644 --- a/src/main.rs +++ b/src/main.rs @@ -224,15 +224,15 @@ struct Cli { } /// Handles subrepo subcommands -fn handle_subrepo_command(subcommand: SubrepoCommand) -> Result<()> { +async fn handle_subrepo_command(subcommand: SubrepoCommand) -> Result<()> { match subcommand { SubrepoCommand::Validate => { - let report = subrepo::validation::validate_subrepos()?; + let report = subrepo::validation::validate_subrepos().await?; subrepo::validation::display_report(&report); Ok(()) } SubrepoCommand::Status { all } => { - let statuses = subrepo::status::analyze_subrepos()?; + let statuses = subrepo::status::analyze_subrepos().await?; subrepo::status::display_status(&statuses, all); Ok(()) } @@ -241,8 +241,10 @@ fn handle_subrepo_command(subcommand: SubrepoCommand) -> Result<()> { to, stash, force, - } => subrepo::sync::sync_subrepo(&name, &to, stash, force), - SubrepoCommand::Update { name, force } => subrepo::sync::update_subrepo(&name, force), + } => subrepo::sync::sync_subrepo(&name, &to, stash, force).await, + SubrepoCommand::Update { name, force } => { + subrepo::sync::update_subrepo(&name, force).await + } } } @@ -362,7 +364,9 @@ async fn main() -> Result<()> { ) .await } - Some(Commands::Subrepo { subcommand }) => handle_subrepo_command(subcommand.clone()), + Some(Commands::Subrepo { subcommand }) => { + handle_subrepo_command(subcommand.clone()).await + } None => { // Default behavior - show help use clap::CommandFactory; diff --git a/src/subrepo/status.rs b/src/subrepo/status.rs index 16607a5..7868083 100644 --- a/src/subrepo/status.rs +++ b/src/subrepo/status.rs @@ -67,8 +67,8 @@ impl SubrepoStatus { } /// Analyze all subrepos and return status for shared ones -pub fn analyze_subrepos() -> Result> { - let report = super::validation::validate_subrepos()?; +pub async fn analyze_subrepos() -> Result> { + let report = super::validation::validate_subrepos().await?; let mut statuses = Vec::new(); for (remote_url, instances) in report.by_remote { diff --git a/src/subrepo/sync.rs b/src/subrepo/sync.rs index 75c6ae7..10a78d1 100644 --- a/src/subrepo/sync.rs +++ b/src/subrepo/sync.rs @@ -131,8 +131,8 @@ fn fetch_latest_commit(path: &Path) -> Result { } /// Sync a subrepo to a specific commit across all parent repositories -pub fn sync_subrepo(name: &str, target_commit: &str, stash: bool, force: bool) -> Result<()> { - let report = super::validation::validate_subrepos()?; +pub async fn sync_subrepo(name: &str, target_commit: &str, stash: bool, force: bool) -> Result<()> { + let report = super::validation::validate_subrepos().await?; sync_subrepo_with_report(name, target_commit, stash, force, &report) } @@ -223,8 +223,8 @@ pub fn sync_subrepo_with_report( } /// Update a subrepo to the latest commit from remote -pub fn update_subrepo(name: &str, force: bool) -> Result<()> { - let report = super::validation::validate_subrepos()?; +pub async fn update_subrepo(name: &str, force: bool) -> Result<()> { + let report = super::validation::validate_subrepos().await?; update_subrepo_with_report(name, force, &report) } diff --git a/src/subrepo/validation.rs b/src/subrepo/validation.rs index 73d1d8c..f465bf4 100644 --- a/src/subrepo/validation.rs +++ b/src/subrepo/validation.rs @@ -11,39 +11,43 @@ use std::collections::HashMap; use std::path::Path; /// Discover all nested repositories and generate a validation report -pub fn validate_subrepos() -> Result { - let parent_repos = crate::core::discovery::find_repos(); - let mut all_nested = Vec::new(); +pub async fn validate_subrepos() -> Result { + tokio::task::spawn_blocking(move || { + let parent_repos = crate::core::discovery::find_repos(None); + let mut all_nested = Vec::new(); - println!( - "🔍 Scanning {} parent repositories for nested repos...\n", - parent_repos.len() - ); + println!( + "🔍 Scanning {} parent repositories for nested repos...\n", + parent_repos.len() + ); - for (parent_name, parent_path) in parent_repos { - let nested = find_nested_in_parent(&parent_name, &parent_path)?; - all_nested.extend(nested); - } + for (parent_name, parent_path) in parent_repos { + let nested = find_nested_in_parent(&parent_name, &parent_path)?; + all_nested.extend(nested); + } - // Group by remote URL - let mut by_remote: HashMap> = HashMap::new(); - let mut no_remote = Vec::new(); + // Group by remote URL + let mut by_remote: HashMap> = HashMap::new(); + let mut no_remote = Vec::new(); - for instance in all_nested { - if let Some(ref remote) = instance.remote_url { - by_remote.entry(remote.clone()).or_default().push(instance); - } else { - no_remote.push(instance); + for instance in all_nested { + if let Some(ref remote) = instance.remote_url { + by_remote.entry(remote.clone()).or_default().push(instance); + } else { + no_remote.push(instance); + } } - } - let total_nested = by_remote.values().map(std::vec::Vec::len).sum::() + no_remote.len(); + let total_nested = + by_remote.values().map(std::vec::Vec::len).sum::() + no_remote.len(); - Ok(ValidationReport { - total_nested, - by_remote, - no_remote, + Ok(ValidationReport { + total_nested, + by_remote, + no_remote, + }) }) + .await? } /// Find nested repositories within a parent repository diff --git a/tests/blocking_check.rs b/tests/blocking_check.rs index 55f3e0e..c793985 100644 --- a/tests/blocking_check.rs +++ b/tests/blocking_check.rs @@ -54,7 +54,7 @@ async fn test_blocking_discovery() { // Run the command under test println!("Running init_command..."); // This is synchronous and will block the single thread - let _ = init_command("Scanning...").await; + let _ = init_command("Scanning...", None).await; let duration = start.elapsed(); println!("init_command took: {:?}", duration); @@ -125,7 +125,7 @@ fn test_blocking_discovery_measure() { println!("Running init_command..."); // This will block the single thread // std::thread::sleep(Duration::from_millis(100)); - let _ = init_command("Scanning...").await; + let _ = init_command("Scanning...", None).await; let duration = start.elapsed(); // Yield to let the heartbeat task process (it should process now) diff --git a/tests/subrepo_blocking_check.rs b/tests/subrepo_blocking_check.rs new file mode 100644 index 0000000..a468a76 --- /dev/null +++ b/tests/subrepo_blocking_check.rs @@ -0,0 +1,85 @@ +use goobits_repos::subrepo::validation::validate_subrepos; +use std::fs; +use std::time::{Duration, Instant}; +use tempfile::TempDir; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +#[test] +fn test_subrepo_validation_blocking() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + println!("Main thread: {:?}", std::thread::current().id()); + // Setup - Create many repos to make discovery slow + let temp_dir = TempDir::new().unwrap(); + let root = temp_dir.path(); + + println!("Setting up 1000 repositories..."); + for i in 0..1000 { + let repo_path = root.join(format!("repo-{}", i)); + fs::create_dir(&repo_path).unwrap(); + // Initialize as git repo so find_repos detects it + // We can just make a .git directory + fs::create_dir(repo_path.join(".git")).unwrap(); + } + + let _orig_dir = std::env::current_dir().unwrap(); + std::env::set_current_dir(root).unwrap(); + + let start = Instant::now(); + + // Use an Atomic to track max delay in the heartbeat + let max_delay_ms = Arc::new(AtomicU64::new(0)); + let max_delay_clone = max_delay_ms.clone(); + + let heartbeat_handle = tokio::spawn(async move { + println!("Heartbeat thread: {:?}", std::thread::current().id()); + let mut last_tick = Instant::now(); + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + let now = Instant::now(); + let delay = now.duration_since(last_tick); + let delay_ms = delay.as_millis() as u64; + + // Expected is ~10ms. If it's > 20ms, we have delay. + // We track max delay to catch the blocking period. + let current_max = max_delay_clone.load(Ordering::Relaxed); + if delay_ms > current_max { + max_delay_clone.store(delay_ms, Ordering::Relaxed); + } + last_tick = now; + } + }); + + // Yield to let heartbeat start + tokio::time::sleep(Duration::from_millis(5)).await; + + println!("Running validate_subrepos (async)..."); + + // This should NOT BLOCK the single thread because it uses spawn_blocking + let _ = validate_subrepos().await; + + let duration = start.elapsed(); + println!("validate_subrepos took: {:?}", duration); + + // Yield to let the heartbeat task process + tokio::time::sleep(Duration::from_millis(50)).await; + + heartbeat_handle.abort(); + std::env::set_current_dir(_orig_dir).unwrap(); + + let delay = max_delay_ms.load(Ordering::Relaxed); + println!("Max heartbeat delay: {} ms", delay); + + if delay > 100 { + println!("Confirmed: BLOCKING behavior detected (Failed)."); + panic!("Expected non-blocking behavior but saw blocking"); + } else { + println!("Observed: NON-BLOCKING behavior (Success)."); + } + }); +} diff --git a/tests/test_discovery.rs b/tests/test_discovery.rs index 58d3e77..be1e449 100644 --- a/tests/test_discovery.rs +++ b/tests/test_discovery.rs @@ -26,7 +26,7 @@ fn test_find_single_repo() { fs::rename(repo.path(), &repo_path).expect("Failed to move repo"); // Find repositories from the repos directory - let found_repos = find_repos_from_path(&repos_dir); + let found_repos = find_repos_from_path(&repos_dir, None); assert_eq!(found_repos.len(), 1, "Should find exactly one repository"); assert_eq!(found_repos[0].0, "my-repo"); @@ -45,7 +45,7 @@ fn test_find_multiple_repos() { create_multiple_repos(temp_dir.path(), 5).expect("Failed to create repos"); // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); assert_eq!(found_repos.len(), 5, "Should find all 5 repositories"); @@ -82,7 +82,7 @@ fn test_find_repos_with_duplicate_names() { setup_git_repo(&repo2).expect("Failed to setup repo2"); // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); assert_eq!(found_repos.len(), 2, "Should find both repositories"); @@ -115,7 +115,7 @@ fn test_skips_node_modules() { setup_git_repo(&nested_repo).expect("Failed to setup nested repo"); // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); // Should only find the valid repo, not the one in node_modules assert_eq!(found_repos.len(), 1, "Should skip repo in node_modules"); @@ -147,7 +147,7 @@ fn test_max_depth_limit() { setup_git_repo(&shallow_repo).expect("Failed to setup shallow repo"); // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); // Should only find the shallow repo, not the deep one assert_eq!( @@ -179,7 +179,7 @@ fn test_handles_symlinks() { let symlink_path = temp_dir.path().join("symlink-repo"); if symlink(&real_repo, &symlink_path).is_ok() { // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); // Should find both the real repo and symlink (with deduplication) // Depending on implementation, might find 1 or 2 @@ -202,7 +202,7 @@ fn test_current_directory_as_repo() { setup_git_repo(temp_dir.path()).expect("Failed to setup repo"); // Find repositories - should find the directory itself as a repo - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); // Should find current directory as a repo with appropriate name assert_eq!( @@ -235,7 +235,7 @@ fn test_alphabetical_sorting() { } // Find repositories from the temp directory - let found_repos = find_repos_from_path(temp_dir.path()); + let found_repos = find_repos_from_path(temp_dir.path(), None); assert_eq!(found_repos.len(), 5); diff --git a/tests/test_stress.rs b/tests/test_stress.rs index 06b5ecb..c8b9716 100644 --- a/tests/test_stress.rs +++ b/tests/test_stress.rs @@ -38,7 +38,7 @@ async fn test_stress_discovery_and_analysis() -> Result<()> { let path = temp_dir.path(); // 1. Discovery - let repos = find_repos_from_path(path); + let repos = find_repos_from_path(path, None); assert_eq!(repos.len(), count); // 2. Parallel analysis @@ -67,7 +67,7 @@ async fn test_stress_discovery_scaling_500() -> Result<()> { let path = temp_dir.path(); let start = std::time::Instant::now(); - let repos = find_repos_from_path(path); + let repos = find_repos_from_path(path, None); let duration = start.elapsed(); assert_eq!(repos.len(), count);