From 9f548158a019bc682865b015ac596189b5012145 Mon Sep 17 00:00:00 2001 From: Josh Priddle Date: Thu, 26 Mar 2026 10:34:11 -0400 Subject: [PATCH] Use multipart uploads for files > 5gb --- README.md | 14 ++++- man/man1/vector.1 | 14 ++++- src/api/client.rs | 46 ++++++++++++++- src/api/mod.rs | 2 +- src/cli.rs | 3 + src/commands/archive.rs | 123 +++++++++++++++++++++++++++++++++++----- src/commands/db.rs | 82 +++++++++++++++++++-------- src/main.rs | 8 ++- 8 files changed, 244 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 6e91b4b..eea91b3 100644 --- a/README.md +++ b/README.md @@ -155,7 +155,7 @@ vector ssl nudge [--retry] ```bash # Import session for large files vector db import-session create [--filename ] [--content-length ] [--drop-tables] [--disable-foreign-keys] [--search-replace-from ] [--search-replace-to ] -vector db import-session run +vector db import-session run [--parts ''] vector db import-session status # Export @@ -163,12 +163,24 @@ vector db export create vector db export status ``` +When `--content-length` exceeds 5GB, the API returns multipart upload details +instead of a single upload URL. Use `--json` to see all part URLs. After +uploading each part, pass the ETags to the run command: + +```bash +vector db import-session run --parts '[{"part_number":1,"etag":"\"abc...\""},...]' +``` + ### Archives ```bash vector archive import [--drop-tables] [--disable-foreign-keys] [--search-replace-from ] [--search-replace-to ] [--wait] [--poll-interval ] ``` +Files larger than 5GB are automatically uploaded using S3 multipart upload. +The CLI handles splitting the file into parts, uploading each one, and +finalizing the upload — no additional flags are needed. + ### Backups ```bash diff --git a/man/man1/vector.1 b/man/man1/vector.1 index 727dc15..fa22077 100644 --- a/man/man1/vector.1 +++ b/man/man1/vector.1 @@ -211,8 +211,15 @@ Create an import session. Options: .B \-\-search\-replace\-to .IR TO . .TP -.B db import\-session run \fISITE_ID\fR \fIIMPORT_ID\fR -Run an import session. +.B db import\-session run \fISITE_ID\fR \fIIMPORT_ID\fR \fR[\fB\-\-parts\fR \fIJSON\fR] +Run an import session. For multipart uploads, provide the completed parts as a +JSON array with +.BR \-\-parts . +Each element must contain +.B part_number +(integer) and +.B etag +(string, as returned by S3). .TP .B db import\-session status \fISITE_ID\fR \fIIMPORT_ID\fR Check the status of an import session. @@ -229,7 +236,8 @@ Check the status of a database export. .TP .B archive import \fISITE_ID\fR \fIFILE\fR \fR[\fIOPTIONS\fR] Import an archive (.tar.gz) to a site. The file is uploaded and processed -on the server. Options: +on the server. Files larger than 5GB are automatically uploaded using S3 +multipart upload (streamed in parts, no additional flags needed). Options: .B \-\-drop\-tables\fR, .B \-\-disable\-foreign\-keys\fR, .B \-\-search\-replace\-from diff --git a/src/api/client.rs b/src/api/client.rs index e327968..347df0d 100644 --- a/src/api/client.rs +++ b/src/api/client.rs @@ -1,12 +1,20 @@ -use reqwest::blocking::{Client, Response}; +use std::io::Read; + +use reqwest::blocking::{Body, Client, Response}; use reqwest::header::{ ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, HeaderMap, HeaderValue, }; -use serde::Serialize; use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use super::error::ApiError; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompletedPart { + pub part_number: u64, + pub etag: String, +} + const DEFAULT_BASE_URL: &str = "https://api.builtfast.com"; const USER_AGENT: &str = concat!("vector-cli/", env!("CARGO_PKG_VERSION")); @@ -161,7 +169,7 @@ impl ApiClient { .put(url) .header(CONTENT_TYPE, "application/gzip") .header(CONTENT_LENGTH, content_length) - .body(reqwest::blocking::Body::from(file)) + .body(Body::from(file)) .send() .map_err(ApiError::NetworkError)?; @@ -177,6 +185,38 @@ impl ApiClient { } } + pub fn put_file_part( + &self, + url: &str, + reader: R, + content_length: u64, + ) -> Result { + let response = self + .client + .put(url) + .header(CONTENT_TYPE, "application/gzip") + .body(Body::sized(reader, content_length)) + .send() + .map_err(ApiError::NetworkError)?; + + if response.status().is_success() { + let etag = response + .headers() + .get("etag") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()) + .ok_or_else(|| ApiError::Other("S3 response missing ETag header".to_string()))?; + Ok(etag) + } else { + let status = response.status(); + let body = response.text().map_err(ApiError::NetworkError)?; + Err(ApiError::Other(format!( + "Part upload failed ({}): {}", + status, body + ))) + } + } + pub fn delete(&self, path: &str) -> Result { let url = format!("{}{}", self.base_url, path); let response = self diff --git a/src/api/mod.rs b/src/api/mod.rs index 55e9172..a19a8ad 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,5 +1,5 @@ pub mod client; pub mod error; -pub use client::ApiClient; +pub use client::{ApiClient, CompletedPart}; pub use error::{ApiError, EXIT_SUCCESS}; diff --git a/src/cli.rs b/src/cli.rs index 2254cff..b2fa2ca 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -571,6 +571,9 @@ pub enum DbImportSessionCommands { site_id: String, /// Import ID import_id: String, + /// Completed parts JSON for multipart uploads (e.g., '[{"part_number":1,"etag":"\"...\""}]') + #[arg(long)] + parts: Option, }, /// Check archive import session status Status { diff --git a/src/commands/archive.rs b/src/commands/archive.rs index 9cde764..1533f3d 100644 --- a/src/commands/archive.rs +++ b/src/commands/archive.rs @@ -1,3 +1,4 @@ +use std::io::{Read, Seek, SeekFrom}; use std::path::Path; use std::thread; use std::time::Duration; @@ -5,9 +6,14 @@ use std::time::Duration; use serde::Serialize; use serde_json::Value; -use crate::api::{ApiClient, ApiError}; +use crate::api::{ApiClient, ApiError, CompletedPart}; use crate::output::{OutputFormat, format_option, print_json, print_key_value, print_message}; +struct UploadPart { + part_number: u64, + url: String, +} + #[derive(Debug, Serialize)] struct CreateImportSessionRequest { #[serde(skip_serializing_if = "Option::is_none")] @@ -97,9 +103,7 @@ pub fn import( let import_id = data["id"] .as_str() .ok_or_else(|| ApiError::Other("Missing import ID in response".to_string()))?; - let upload_url = data["upload_url"] - .as_str() - .ok_or_else(|| ApiError::Other("Missing upload URL in response".to_string()))?; + let is_multipart = data["is_multipart"].as_bool().unwrap_or(false); if format == OutputFormat::Table { print_message(&format!("Import ID: {}", import_id)); @@ -107,14 +111,39 @@ pub fn import( // Step 2: Upload file let size_mb = content_length as f64 / 1_048_576.0; - if format == OutputFormat::Table { - print_message(&format!("Uploading {} ({:.1} MB)...", filename, size_mb)); - } + let completed_parts = if is_multipart { + let upload_parts = parse_upload_parts(data)?; + let part_count = upload_parts.len(); + + if format == OutputFormat::Table { + print_message(&format!( + "Uploading {} ({:.1} MB) in {} parts...", + filename, size_mb, part_count + )); + } + + Some(upload_multipart( + client, + path, + content_length, + &upload_parts, + format, + )?) + } else { + let upload_url = data["upload_url"] + .as_str() + .ok_or_else(|| ApiError::Other("Missing upload URL in response".to_string()))?; - let file_handle = std::fs::File::open(path) - .map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?; + if format == OutputFormat::Table { + print_message(&format!("Uploading {} ({:.1} MB)...", filename, size_mb)); + } - client.put_file(upload_url, file_handle, content_length)?; + let file_handle = std::fs::File::open(path) + .map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?; + + client.put_file(upload_url, file_handle, content_length)?; + None + }; if format == OutputFormat::Table { print_message("Upload complete."); @@ -125,10 +154,14 @@ pub fn import( print_message("Starting import..."); } - let run_response: Value = client.post_empty(&format!( - "/api/v1/vector/sites/{}/imports/{}/run", - site_id, import_id - ))?; + let run_path = format!("/api/v1/vector/sites/{}/imports/{}/run", site_id, import_id); + + let run_response: Value = if let Some(ref parts) = completed_parts { + let body = serde_json::json!({ "parts": parts }); + client.post(&run_path, &body)? + } else { + client.post_empty(&run_path)? + }; if format == OutputFormat::Table { print_message("Import started."); @@ -204,3 +237,65 @@ pub fn import( Ok(()) } + +fn parse_upload_parts(data: &Value) -> Result, ApiError> { + let parts_array = data["upload_parts"] + .as_array() + .ok_or_else(|| ApiError::Other("Missing upload_parts in response".to_string()))?; + + parts_array + .iter() + .map(|p| { + let part_number = p["part_number"] + .as_u64() + .ok_or_else(|| ApiError::Other("Missing part_number".to_string()))?; + let url = p["url"] + .as_str() + .ok_or_else(|| ApiError::Other("Missing part url".to_string()))? + .to_string(); + Ok(UploadPart { part_number, url }) + }) + .collect() +} + +fn upload_multipart( + client: &ApiClient, + file_path: &Path, + content_length: u64, + parts: &[UploadPart], + format: OutputFormat, +) -> Result, ApiError> { + let part_count = parts.len() as u64; + let base_size = content_length / part_count; + let last_size = content_length - base_size * (part_count - 1); + + let mut completed = Vec::with_capacity(parts.len()); + + for (i, part) in parts.iter().enumerate() { + let chunk_size = if (i as u64) < part_count - 1 { + base_size + } else { + last_size + }; + let offset = base_size * i as u64; + + if format == OutputFormat::Table { + print_message(&format!("Uploading part {}/{}...", i + 1, part_count)); + } + + let mut file = std::fs::File::open(file_path) + .map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?; + file.seek(SeekFrom::Start(offset)) + .map_err(|e| ApiError::Other(format!("Cannot seek file: {}", e)))?; + let reader = file.take(chunk_size); + + let etag = client.put_file_part(&part.url, reader, chunk_size)?; + + completed.push(CompletedPart { + part_number: part.part_number, + etag, + }); + } + + Ok(completed) +} diff --git a/src/commands/db.rs b/src/commands/db.rs index 0663eeb..94fcaa2 100644 --- a/src/commands/db.rs +++ b/src/commands/db.rs @@ -1,7 +1,7 @@ use serde::Serialize; use serde_json::Value; -use crate::api::{ApiClient, ApiError}; +use crate::api::{ApiClient, ApiError, CompletedPart}; use crate::output::{OutputFormat, format_option, print_json, print_key_value, print_message}; #[derive(Debug, Serialize)] @@ -81,25 +81,54 @@ pub fn import_session_create( } let data = &response["data"]; - print_key_value(vec![ - ("Import ID", data["id"].as_str().unwrap_or("-").to_string()), - ("Status", data["status"].as_str().unwrap_or("-").to_string()), - ( - "Upload URL", - format_option(&data["upload_url"].as_str().map(String::from)), - ), - ( - "Expires", - format_option(&data["upload_expires_at"].as_str().map(String::from)), - ), - ]); + let is_multipart = data["is_multipart"].as_bool().unwrap_or(false); + let import_id = data["id"].as_str().unwrap_or("-"); - print_message("\nUpload your SQL file to the URL above, then run:"); - print_message(&format!( - " vector db import-session run {} {}", - site_id, - data["id"].as_str().unwrap_or("IMPORT_ID") - )); + if is_multipart { + print_key_value(vec![ + ("Import ID", import_id.to_string()), + ("Status", data["status"].as_str().unwrap_or("-").to_string()), + ("Multipart", "Yes".to_string()), + ( + "Upload ID", + format_option(&data["upload_id"].as_str().map(String::from)), + ), + ( + "Part Count", + format_option(&data["part_count"].as_u64().map(|v| v.to_string())), + ), + ( + "Expires", + format_option(&data["upload_expires_at"].as_str().map(String::from)), + ), + ]); + + print_message("\nUse --json to see all part URLs."); + print_message("After uploading each part, run with the ETags:"); + print_message(&format!( + " vector db import-session run {} {} --parts '[{{\"part_number\":1,\"etag\":\"...\"}},...]'", + site_id, import_id + )); + } else { + print_key_value(vec![ + ("Import ID", import_id.to_string()), + ("Status", data["status"].as_str().unwrap_or("-").to_string()), + ( + "Upload URL", + format_option(&data["upload_url"].as_str().map(String::from)), + ), + ( + "Expires", + format_option(&data["upload_expires_at"].as_str().map(String::from)), + ), + ]); + + print_message("\nUpload your SQL file to the URL above, then run:"); + print_message(&format!( + " vector db import-session run {} {}", + site_id, import_id + )); + }; Ok(()) } @@ -108,12 +137,19 @@ pub fn import_session_run( client: &ApiClient, site_id: &str, import_id: &str, + parts: Option, format: OutputFormat, ) -> Result<(), ApiError> { - let response: Value = client.post_empty(&format!( - "/api/v1/vector/sites/{}/imports/{}/run", - site_id, import_id - ))?; + let run_path = format!("/api/v1/vector/sites/{}/imports/{}/run", site_id, import_id); + + let response: Value = if let Some(parts_json) = parts { + let completed_parts: Vec = serde_json::from_str(&parts_json) + .map_err(|e| ApiError::Other(format!("Invalid parts JSON: {}", e)))?; + let body = serde_json::json!({ "parts": completed_parts }); + client.post(&run_path, &body)? + } else { + client.post_empty(&run_path)? + }; if format == OutputFormat::Json { print_json(&response); diff --git a/src/main.rs b/src/main.rs index 1f95c52..fb64dca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -334,9 +334,11 @@ fn run_db_import_session( search_replace_to, format, ), - DbImportSessionCommands::Run { site_id, import_id } => { - db::import_session_run(client, &site_id, &import_id, format) - } + DbImportSessionCommands::Run { + site_id, + import_id, + parts, + } => db::import_session_run(client, &site_id, &import_id, parts, format), DbImportSessionCommands::Status { site_id, import_id } => { db::import_session_status(client, &site_id, &import_id, format) }