Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,32 @@ vector ssl nudge <env_id> [--retry]
```bash
# Import session for large files
vector db import-session create <site_id> [--filename <name>] [--content-length <bytes>] [--drop-tables] [--disable-foreign-keys] [--search-replace-from <from>] [--search-replace-to <to>]
vector db import-session run <site_id> <import_id>
vector db import-session run <site_id> <import_id> [--parts '<json>']
vector db import-session status <site_id> <import_id>

# Export
vector db export create <site_id>
vector db export status <site_id> <export_id>
```

When `--content-length` exceeds 5GB, the API returns multipart upload details
instead of a single upload URL. Use `--json` to see all part URLs. After
uploading each part, pass the ETags to the run command:

```bash
vector db import-session run <site_id> <import_id> --parts '[{"part_number":1,"etag":"\"abc...\""},...]'
```

### Archives

```bash
vector archive import <site_id> <file.tar.gz> [--drop-tables] [--disable-foreign-keys] [--search-replace-from <from>] [--search-replace-to <to>] [--wait] [--poll-interval <seconds>]
```

Files larger than 5GB are automatically uploaded using S3 multipart upload.
The CLI handles splitting the file into parts, uploading each one, and
finalizing the upload — no additional flags are needed.

### Backups

```bash
Expand Down
14 changes: 11 additions & 3 deletions man/man1/vector.1
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,15 @@ Create an import session. Options:
.B \-\-search\-replace\-to
.IR TO .
.TP
.B db import\-session run \fISITE_ID\fR \fIIMPORT_ID\fR
Run an import session.
.B db import\-session run \fISITE_ID\fR \fIIMPORT_ID\fR \fR[\fB\-\-parts\fR \fIJSON\fR]
Run an import session. For multipart uploads, provide the completed parts as a
JSON array with
.BR \-\-parts .
Each element must contain
.B part_number
(integer) and
.B etag
(string, as returned by S3).
.TP
.B db import\-session status \fISITE_ID\fR \fIIMPORT_ID\fR
Check the status of an import session.
Expand All @@ -229,7 +236,8 @@ Check the status of a database export.
.TP
.B archive import \fISITE_ID\fR \fIFILE\fR \fR[\fIOPTIONS\fR]
Import an archive (.tar.gz) to a site. The file is uploaded and processed
on the server. Options:
on the server. Files larger than 5GB are automatically uploaded using S3
multipart upload (streamed in parts, no additional flags needed). Options:
.B \-\-drop\-tables\fR,
.B \-\-disable\-foreign\-keys\fR,
.B \-\-search\-replace\-from
Expand Down
46 changes: 43 additions & 3 deletions src/api/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use reqwest::blocking::{Client, Response};
use std::io::Read;

use reqwest::blocking::{Body, Client, Response};
use reqwest::header::{
ACCEPT, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, HeaderMap, HeaderValue,
};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};

use super::error::ApiError;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletedPart {
pub part_number: u64,
pub etag: String,
}

const DEFAULT_BASE_URL: &str = "https://api.builtfast.com";
const USER_AGENT: &str = concat!("vector-cli/", env!("CARGO_PKG_VERSION"));

Expand Down Expand Up @@ -161,7 +169,7 @@ impl ApiClient {
.put(url)
.header(CONTENT_TYPE, "application/gzip")
.header(CONTENT_LENGTH, content_length)
.body(reqwest::blocking::Body::from(file))
.body(Body::from(file))
.send()
.map_err(ApiError::NetworkError)?;

Expand All @@ -177,6 +185,38 @@ impl ApiClient {
}
}

pub fn put_file_part<R: Read + Send + 'static>(
&self,
url: &str,
reader: R,
content_length: u64,
) -> Result<String, ApiError> {
let response = self
.client
.put(url)
.header(CONTENT_TYPE, "application/gzip")
.body(Body::sized(reader, content_length))
.send()
.map_err(ApiError::NetworkError)?;

if response.status().is_success() {
let etag = response
.headers()
.get("etag")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string())
.ok_or_else(|| ApiError::Other("S3 response missing ETag header".to_string()))?;
Ok(etag)
} else {
let status = response.status();
let body = response.text().map_err(ApiError::NetworkError)?;
Err(ApiError::Other(format!(
"Part upload failed ({}): {}",
status, body
)))
}
}

pub fn delete<T: DeserializeOwned>(&self, path: &str) -> Result<T, ApiError> {
let url = format!("{}{}", self.base_url, path);
let response = self
Expand Down
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod client;
pub mod error;

pub use client::ApiClient;
pub use client::{ApiClient, CompletedPart};
pub use error::{ApiError, EXIT_SUCCESS};
3 changes: 3 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ pub enum DbImportSessionCommands {
site_id: String,
/// Import ID
import_id: String,
/// Completed parts JSON for multipart uploads (e.g., '[{"part_number":1,"etag":"\"...\""}]')
#[arg(long)]
parts: Option<String>,
},
/// Check archive import session status
Status {
Expand Down
123 changes: 109 additions & 14 deletions src/commands/archive.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::io::{Read, Seek, SeekFrom};
use std::path::Path;
use std::thread;
use std::time::Duration;

use serde::Serialize;
use serde_json::Value;

use crate::api::{ApiClient, ApiError};
use crate::api::{ApiClient, ApiError, CompletedPart};
use crate::output::{OutputFormat, format_option, print_json, print_key_value, print_message};

struct UploadPart {
part_number: u64,
url: String,
}

#[derive(Debug, Serialize)]
struct CreateImportSessionRequest {
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -97,24 +103,47 @@ pub fn import(
let import_id = data["id"]
.as_str()
.ok_or_else(|| ApiError::Other("Missing import ID in response".to_string()))?;
let upload_url = data["upload_url"]
.as_str()
.ok_or_else(|| ApiError::Other("Missing upload URL in response".to_string()))?;
let is_multipart = data["is_multipart"].as_bool().unwrap_or(false);

if format == OutputFormat::Table {
print_message(&format!("Import ID: {}", import_id));
}

// Step 2: Upload file
let size_mb = content_length as f64 / 1_048_576.0;
if format == OutputFormat::Table {
print_message(&format!("Uploading {} ({:.1} MB)...", filename, size_mb));
}
let completed_parts = if is_multipart {
let upload_parts = parse_upload_parts(data)?;
let part_count = upload_parts.len();

if format == OutputFormat::Table {
print_message(&format!(
"Uploading {} ({:.1} MB) in {} parts...",
filename, size_mb, part_count
));
}

Some(upload_multipart(
client,
path,
content_length,
&upload_parts,
format,
)?)
} else {
let upload_url = data["upload_url"]
.as_str()
.ok_or_else(|| ApiError::Other("Missing upload URL in response".to_string()))?;

let file_handle = std::fs::File::open(path)
.map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?;
if format == OutputFormat::Table {
print_message(&format!("Uploading {} ({:.1} MB)...", filename, size_mb));
}

client.put_file(upload_url, file_handle, content_length)?;
let file_handle = std::fs::File::open(path)
.map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?;

client.put_file(upload_url, file_handle, content_length)?;
None
};

if format == OutputFormat::Table {
print_message("Upload complete.");
Expand All @@ -125,10 +154,14 @@ pub fn import(
print_message("Starting import...");
}

let run_response: Value = client.post_empty(&format!(
"/api/v1/vector/sites/{}/imports/{}/run",
site_id, import_id
))?;
let run_path = format!("/api/v1/vector/sites/{}/imports/{}/run", site_id, import_id);

let run_response: Value = if let Some(ref parts) = completed_parts {
let body = serde_json::json!({ "parts": parts });
client.post(&run_path, &body)?
} else {
client.post_empty(&run_path)?
};

if format == OutputFormat::Table {
print_message("Import started.");
Expand Down Expand Up @@ -204,3 +237,65 @@ pub fn import(

Ok(())
}

fn parse_upload_parts(data: &Value) -> Result<Vec<UploadPart>, ApiError> {
let parts_array = data["upload_parts"]
.as_array()
.ok_or_else(|| ApiError::Other("Missing upload_parts in response".to_string()))?;

parts_array
.iter()
.map(|p| {
let part_number = p["part_number"]
.as_u64()
.ok_or_else(|| ApiError::Other("Missing part_number".to_string()))?;
let url = p["url"]
.as_str()
.ok_or_else(|| ApiError::Other("Missing part url".to_string()))?
.to_string();
Ok(UploadPart { part_number, url })
})
.collect()
}

fn upload_multipart(
client: &ApiClient,
file_path: &Path,
content_length: u64,
parts: &[UploadPart],
format: OutputFormat,
) -> Result<Vec<CompletedPart>, ApiError> {
let part_count = parts.len() as u64;
let base_size = content_length / part_count;
let last_size = content_length - base_size * (part_count - 1);

let mut completed = Vec::with_capacity(parts.len());

for (i, part) in parts.iter().enumerate() {
let chunk_size = if (i as u64) < part_count - 1 {
base_size
} else {
last_size
};
let offset = base_size * i as u64;

if format == OutputFormat::Table {
print_message(&format!("Uploading part {}/{}...", i + 1, part_count));
}

let mut file = std::fs::File::open(file_path)
.map_err(|e| ApiError::Other(format!("Cannot open file: {}", e)))?;
file.seek(SeekFrom::Start(offset))
.map_err(|e| ApiError::Other(format!("Cannot seek file: {}", e)))?;
let reader = file.take(chunk_size);

let etag = client.put_file_part(&part.url, reader, chunk_size)?;

completed.push(CompletedPart {
part_number: part.part_number,
etag,
});
}

Ok(completed)
}
Loading