diff --git a/s3_bucket_downloader/src/main.rs b/s3_bucket_downloader/src/main.rs index 402542b..f5e1328 100644 --- a/s3_bucket_downloader/src/main.rs +++ b/s3_bucket_downloader/src/main.rs @@ -59,6 +59,10 @@ struct Args { /// File containing list of files to download (one per line) #[arg(short, long)] file_list: Option, + + /// Redownload mode: check existing files and only download missing ones + #[arg(short = 'd', long, default_value_t = false)] + redownload: bool, } #[tokio::main] @@ -69,6 +73,7 @@ async fn main() { let local_dir = PathBuf::from(args.output); let num_workers = args.workers; let max_retries = args.retries; + let redownload = args.redownload; if !local_dir.exists() { fs::create_dir_all(&local_dir).expect("Failed to create output directory"); @@ -111,11 +116,34 @@ async fn main() { keys }; - println!( - "Found {} files. Starting downloads with {} threads...", - keys.len(), - num_workers - ); + // Filter out existing files in redownload mode + let keys_to_download = if redownload { + let mut missing_keys = Vec::new(); + let mut existing_count = 0; + for key in &keys { + let local_path = local_dir.join(key); + if local_path.exists() { + existing_count += 1; + } else { + missing_keys.push(key.clone()); + } + } + println!( + "Found {} existing files, {} files to download", + existing_count, + missing_keys.len() + ); + missing_keys + } else { + keys + }; + + if keys_to_download.is_empty() { + println!("No files to download."); + return; + } + + println!("Starting downloads with {} threads...", num_workers); rayon::ThreadPoolBuilder::new() .num_threads(num_workers) .build_global() @@ -127,7 +155,7 @@ async fn main() { let downloaded_size = Arc::new(AtomicUsize::new(0)); // Create overall progress bar - let total_pb = m.add(ProgressBar::new(keys.len() as u64)); + let total_pb = m.add(ProgressBar::new(keys_to_download.len() as u64)); total_pb.set_style( ProgressStyle::with_template( "{spinner} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) {msg}", @@ -136,7 +164,7 @@ async fn main() { ); // Calculate files per thread - let files_per_thread = keys.len().div_ceil(num_workers); + let files_per_thread = keys_to_download.len().div_ceil(num_workers); // Create fixed progress bars for each thread let thread_pbs: Vec<_> = (0..num_workers) @@ -151,54 +179,52 @@ async fn main() { }) .collect(); - keys.par_iter().enumerate().for_each(|(i, key)| { - let client = Arc::clone(&client); - let bucket = bucket_name.clone(); - let dir = local_dir.clone(); - let key = key.clone(); - let downloaded = Arc::clone(&downloaded); - let failed = Arc::clone(&failed); - let downloaded_size = Arc::clone(&downloaded_size); - let total_pb = total_pb.clone(); - let thread_num = i % num_workers; - let thread_pb = thread_pbs[thread_num].clone(); - - let rt = Runtime::new().unwrap(); - rt.block_on(async move { - let local_path = dir.join(&key); - if let Some(parent) = local_path.parent() { - fs::create_dir_all(parent).expect("Failed to create parent directory"); - } - - match download_object_with_retry(&client, &bucket, &key, max_retries).await { - Ok(bytes) => { - let mut file = File::create(&local_path).expect("Failed to create file"); - file.write_all(&bytes).expect("Failed to write file"); - downloaded.fetch_add(1, Ordering::SeqCst); - downloaded_size.fetch_add(bytes.len(), Ordering::SeqCst); - total_pb.inc(1); - thread_pb.inc(1); - thread_pb.set_message(format!( - "Thread {}: Downloaded {}/{} files", - thread_num + 1, - downloaded.load(Ordering::SeqCst), - files_per_thread - )); + keys_to_download + .par_iter() + .enumerate() + .for_each(|(i, key)| { + let client = Arc::clone(&client); + let bucket = bucket_name.clone(); + let dir = local_dir.clone(); + let key = key.clone(); + let downloaded = Arc::clone(&downloaded); + let failed = Arc::clone(&failed); + let downloaded_size = Arc::clone(&downloaded_size); + let total_pb = total_pb.clone(); + let thread_num = i % num_workers; + let thread_pb = thread_pbs[thread_num].clone(); + + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + let local_path = dir.join(&key); + if let Some(parent) = local_path.parent() { + fs::create_dir_all(parent).expect("Failed to create parent directory"); } - Err(_e) => { - failed.fetch_add(1, Ordering::SeqCst); - total_pb.inc(1); - thread_pb.inc(1); - thread_pb.set_message(format!( - "Thread {}: Failed {}/{} files", - thread_num + 1, - failed.load(Ordering::SeqCst), - files_per_thread - )); + + match download_object_with_retry(&client, &bucket, &key, max_retries).await { + Ok(bytes) => { + let mut file = File::create(&local_path).expect("Failed to create file"); + file.write_all(&bytes).expect("Failed to write file"); + downloaded.fetch_add(1, Ordering::SeqCst); + downloaded_size.fetch_add(bytes.len(), Ordering::SeqCst); + total_pb.inc(1); + thread_pb.inc(1); + thread_pb.set_message(format!("by Thread {}", thread_num + 1)); + } + Err(_e) => { + failed.fetch_add(1, Ordering::SeqCst); + total_pb.inc(1); + thread_pb.inc(1); + thread_pb.set_message(format!( + "Thread {}: Failed {}/{} files", + thread_num + 1, + failed.load(Ordering::SeqCst), + files_per_thread + )); + } } - } + }); }); - }); // Clean up all progress bars for pb in thread_pbs {