Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 78 additions & 52 deletions s3_bucket_downloader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ struct Args {
/// File containing list of files to download (one per line)
#[arg(short, long)]
file_list: Option<String>,

/// Redownload mode: check existing files and only download missing ones
#[arg(short = 'd', long, default_value_t = false)]
redownload: bool,
}

#[tokio::main]
Expand All @@ -69,6 +73,7 @@ async fn main() {
let local_dir = PathBuf::from(args.output);
let num_workers = args.workers;
let max_retries = args.retries;
let redownload = args.redownload;

if !local_dir.exists() {
fs::create_dir_all(&local_dir).expect("Failed to create output directory");
Expand Down Expand Up @@ -111,11 +116,34 @@ async fn main() {
keys
};

println!(
"Found {} files. Starting downloads with {} threads...",
keys.len(),
num_workers
);
// Filter out existing files in redownload mode
let keys_to_download = if redownload {
let mut missing_keys = Vec::new();
let mut existing_count = 0;
for key in &keys {
let local_path = local_dir.join(key);
if local_path.exists() {
existing_count += 1;
} else {
missing_keys.push(key.clone());
}
}
println!(
"Found {} existing files, {} files to download",
existing_count,
missing_keys.len()
);
missing_keys
} else {
keys
};

if keys_to_download.is_empty() {
println!("No files to download.");
return;
}

println!("Starting downloads with {} threads...", num_workers);
rayon::ThreadPoolBuilder::new()
.num_threads(num_workers)
.build_global()
Expand All @@ -127,7 +155,7 @@ async fn main() {
let downloaded_size = Arc::new(AtomicUsize::new(0));

// Create overall progress bar
let total_pb = m.add(ProgressBar::new(keys.len() as u64));
let total_pb = m.add(ProgressBar::new(keys_to_download.len() as u64));
total_pb.set_style(
ProgressStyle::with_template(
"{spinner} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({percent}%) {msg}",
Expand All @@ -136,7 +164,7 @@ async fn main() {
);

// Calculate files per thread
let files_per_thread = keys.len().div_ceil(num_workers);
let files_per_thread = keys_to_download.len().div_ceil(num_workers);

// Create fixed progress bars for each thread
let thread_pbs: Vec<_> = (0..num_workers)
Expand All @@ -151,54 +179,52 @@ async fn main() {
})
.collect();

keys.par_iter().enumerate().for_each(|(i, key)| {
let client = Arc::clone(&client);
let bucket = bucket_name.clone();
let dir = local_dir.clone();
let key = key.clone();
let downloaded = Arc::clone(&downloaded);
let failed = Arc::clone(&failed);
let downloaded_size = Arc::clone(&downloaded_size);
let total_pb = total_pb.clone();
let thread_num = i % num_workers;
let thread_pb = thread_pbs[thread_num].clone();

let rt = Runtime::new().unwrap();
rt.block_on(async move {
let local_path = dir.join(&key);
if let Some(parent) = local_path.parent() {
fs::create_dir_all(parent).expect("Failed to create parent directory");
}

match download_object_with_retry(&client, &bucket, &key, max_retries).await {
Ok(bytes) => {
let mut file = File::create(&local_path).expect("Failed to create file");
file.write_all(&bytes).expect("Failed to write file");
downloaded.fetch_add(1, Ordering::SeqCst);
downloaded_size.fetch_add(bytes.len(), Ordering::SeqCst);
total_pb.inc(1);
thread_pb.inc(1);
thread_pb.set_message(format!(
"Thread {}: Downloaded {}/{} files",
thread_num + 1,
downloaded.load(Ordering::SeqCst),
files_per_thread
));
keys_to_download
.par_iter()
.enumerate()
.for_each(|(i, key)| {
let client = Arc::clone(&client);
let bucket = bucket_name.clone();
let dir = local_dir.clone();
let key = key.clone();
let downloaded = Arc::clone(&downloaded);
let failed = Arc::clone(&failed);
let downloaded_size = Arc::clone(&downloaded_size);
let total_pb = total_pb.clone();
let thread_num = i % num_workers;
let thread_pb = thread_pbs[thread_num].clone();

let rt = Runtime::new().unwrap();
rt.block_on(async move {
let local_path = dir.join(&key);
if let Some(parent) = local_path.parent() {
fs::create_dir_all(parent).expect("Failed to create parent directory");
}
Err(_e) => {
failed.fetch_add(1, Ordering::SeqCst);
total_pb.inc(1);
thread_pb.inc(1);
thread_pb.set_message(format!(
"Thread {}: Failed {}/{} files",
thread_num + 1,
failed.load(Ordering::SeqCst),
files_per_thread
));

match download_object_with_retry(&client, &bucket, &key, max_retries).await {
Ok(bytes) => {
let mut file = File::create(&local_path).expect("Failed to create file");
file.write_all(&bytes).expect("Failed to write file");
downloaded.fetch_add(1, Ordering::SeqCst);
downloaded_size.fetch_add(bytes.len(), Ordering::SeqCst);
total_pb.inc(1);
thread_pb.inc(1);
thread_pb.set_message(format!("by Thread {}", thread_num + 1));
}
Err(_e) => {
failed.fetch_add(1, Ordering::SeqCst);
total_pb.inc(1);
thread_pb.inc(1);
thread_pb.set_message(format!(
"Thread {}: Failed {}/{} files",
thread_num + 1,
failed.load(Ordering::SeqCst),
files_per_thread
));
}
}
}
});
});
});

// Clean up all progress bars
for pb in thread_pbs {
Expand Down
Loading