Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/configuration/index-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ index_id: "hdfs"

index_uri: "s3://my-bucket/hdfs"

extra_index_uris:
- "s3://my-second-bucket/hdfs"

doc_mapping:
mode: lenient
field_mappings:
Expand Down Expand Up @@ -82,6 +85,32 @@ By default, the `index-uri` will be computed by concatenating the `index-id` wit
The file storage will not work when running quickwit in distributed mode. Instead, AWS S3, Azure Blob Storage, Google Cloud Storage (in s3 interoperability mode) or other S3-compatible storage systems including Scaleway Object Storage and Garage should be used as storage when running several searcher nodes.
:::

## Extra index URIs

The `extra_index_uris` parameter is an optional list of additional [storage URIs](storage-config#storage-uris) where splits can be stored. When configured, new splits are distributed across all storage URIs (the primary `index_uri` plus the extra URIs) using a round-robin strategy.

This enables **multi-bucket split sharding** — spreading an index's data across multiple storage buckets for better throughput.


**Example:**

```yaml
version: 0.8
index_id: "hdfs"
index_uri: "s3://bucket-a/hdfs"
extra_index_uris:
- "s3://bucket-b/hdfs"
- "s3://bucket-c/hdfs"
doc_mapping:
field_mappings:
- name: body
type: text
```

:::caution
Once an index uses `extra_index_uris`, it cannot be read by older Quickwit versions that don't support this feature.
:::

## Doc mapping

The doc mapping defines how a document and the fields it contains are stored and indexed for a given index. A document is a collection of named fields, each having its own data type (text, bytes, datetime, bool, i64, u64, f64, ip, json).
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/storage-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Storage URIs refer to different storage providers identified by a URI "protocol"
- `gs://` for Google Cloud Storage

In general, you can use a storage URI or a file path anywhere you would intuitively expect a file path. For instance:
- when setting the `index_uri` of an index to specify the storage provider and location;
- when setting the `index_uri` or `extra_index_uris` of an index to specify the storage provider(s) and location(s);
- when setting the `metastore_uri` in a node config to set up a file-backed metastore;
- when passing a file path as a command line argument.

Expand Down
2 changes: 2 additions & 0 deletions docs/reference/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ Create an index by posting an `IndexConfig` payload. The API accepts JSON with `
| `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ |
| `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. | _required_ |
| `index_uri` | `String` | Defines where the index files are stored. This parameter expects a [storage URI](../configuration/storage-config.md#storage-uris). | `{default_index_root_uri}/{index_id}` |
| `extra_index_uris` | `Array<String>` | Additional storage URIs for [multi-bucket split sharding](../configuration/index-config.md#extra-index-uris). Must not contain `index_uri`. | `[]` |
| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). | _required_ |
| `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | |
| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | |
Expand Down Expand Up @@ -306,6 +307,7 @@ Updating the doc mapping doesn't reindex existing data. Queries and results are
| `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ |
| `index_id` | `String` | Index ID, must be the same index as in the request URI. | _required_ |
| `index_uri` | `String` | Defines where the index files are stored. Cannot be updated. | `{default_index_root_uri}/{index_id}` |
| `extra_index_uris` | `Array<String>` | Additional storage URIs for [multi-bucket split sharding](../configuration/index-config.md#extra-index-uris). New URIs can be added but existing ones cannot be removed. | `[]` |
| `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). | _required_ |
| `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | |
| `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | |
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ pub async fn run_index_checklist(
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
checks.push(("index storage", index_storage.check_connectivity().await));
for extra_uri in &index_metadata.index_config.extra_index_uris {
let extra_storage = storage_resolver.resolve(extra_uri).await?;
checks.push((
"extra index storage",
extra_storage.check_connectivity().await,
));
}

if let Some(source_config) = source_config_opt {
checks.push((
Expand Down
29 changes: 26 additions & 3 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,11 +897,34 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> {
.index_metadata(IndexMetadataRequest::for_index_id(args.index_id))
.await?
.deserialize_index_metadata()?;
let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?;
// Resolve the storage URI for this specific split.
// The split may live in a different bucket than the primary index_uri.
let index_uri = index_metadata.index_uri().clone();
let split_metadata = {
use quickwit_metastore::{
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt,
};
use quickwit_proto::metastore::ListSplitsRequest;
let query = ListSplitsQuery::for_index(index_metadata.index_uid.clone())
.with_split_ids(vec![args.split_id.clone()]);
let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?;
let splits = metastore
.list_splits(list_splits_request)
.await?
.collect_splits_metadata()
.await?;
splits.into_iter().next()
};
let effective_uri = match &split_metadata {
Some(meta) => meta.effective_storage_uri(&index_uri),
None => &index_uri,
};
let split_storage = storage_resolver.resolve(effective_uri).await?;

let split_file = PathBuf::from(format!("{}.split", args.split_id));
let split_data = index_storage.get_all(split_file.as_path()).await?;
let split_data = split_storage.get_all(split_file.as_path()).await?;
let (_hotcache_bytes, bundle_storage) = BundleStorage::open_from_split_data_with_owned_bytes(
index_storage,
split_storage,
split_file,
split_data,
)?;
Expand Down
14 changes: 13 additions & 1 deletion quickwit/quickwit-common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use regex::Regex;
use serde::de::Error;
use serde::{Deserialize, Serialize, Serializer};

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[repr(u8)]
pub enum Protocol {
Expand Down Expand Up @@ -116,6 +116,18 @@ pub struct Uri {
protocol: Protocol,
}

impl Ord for Uri {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.uri.cmp(&other.uri)
}
}

impl PartialOrd for Uri {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Uri {
/// This is only used for test. We artificially restrict the lifetime to 'static
/// to avoid misuses.
Expand Down
98 changes: 98 additions & 0 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ pub struct IndexConfig {
pub ingest_settings: IngestSettings,
pub search_settings: SearchSettings,
pub retention_policy_opt: Option<RetentionPolicy>,
/// Additional storage URIs for multi-bucket split sharding.
/// The effective list of all storage URIs is `[index_uri] + extra_index_uris`.
pub extra_index_uris: Vec<Uri>,
}

impl IndexConfig {
Expand All @@ -323,6 +326,7 @@ impl IndexConfig {
let mut hasher = SipHasher::new();
self.doc_mapping.doc_mapping_uid.hash(&mut hasher);
self.indexing_settings.hash(&mut hasher);
self.extra_index_uris.hash(&mut hasher);
hasher.finish()
}

Expand All @@ -336,6 +340,16 @@ impl IndexConfig {
self.indexing_params_fingerprint() == other.indexing_params_fingerprint()
}

/// Returns the full list of storage URIs: the primary `index_uri` followed
/// by any additional URIs from `extra_index_uris`.
///
/// The primary `index_uri` is always the first element.
pub fn all_index_uris(&self) -> Vec<&Uri> {
let mut uris = vec![&self.index_uri];
uris.extend(self.extra_index_uris.iter());
uris
}

#[cfg(any(test, feature = "testsuite"))]
pub fn for_test(index_id: &str, index_uri: &str) -> Self {
let index_uri = Uri::from_str(index_uri).unwrap();
Expand Down Expand Up @@ -420,6 +434,7 @@ impl IndexConfig {
ingest_settings: IngestSettings::default(),
search_settings,
retention_policy_opt: None,
extra_index_uris: Vec::new(),
}
}
}
Expand Down Expand Up @@ -533,6 +548,7 @@ impl crate::TestableForRegression for IndexConfig {
ingest_settings,
search_settings,
retention_policy_opt,
extra_index_uris: Vec::new(),
}
}

Expand All @@ -544,6 +560,7 @@ impl crate::TestableForRegression for IndexConfig {
assert_eq!(self.ingest_settings, other.ingest_settings);
assert_eq!(self.search_settings, other.search_settings);
assert_eq!(self.retention_policy_opt, other.retention_policy_opt);
assert_eq!(self.extra_index_uris, other.extra_index_uris);
}
}

Expand Down Expand Up @@ -1181,4 +1198,85 @@ mod tests {
assert_eq!(updated_doc_mapping.doc_mapping_uid, new_doc_mapping_uid);
assert_eq!(updated_doc_mapping.mode, Mode::Strict);
}

#[test]
fn test_extra_index_uris_parsed_from_yaml() {
let config_yaml = r#"
version: 0.8
index_id: test-index
index_uri: s3://bucket-a/test-index
extra_index_uris:
- s3://bucket-b/test-index
- s3://bucket-c/test-index
doc_mapping: {}
"#;
let index_config = load_index_config_from_user_config(
ConfigFormat::Yaml,
config_yaml.as_bytes(),
&Uri::for_test("s3://bucket-a"),
)
.unwrap();
assert_eq!(index_config.extra_index_uris.len(), 2);
assert_eq!(
index_config.extra_index_uris[0].as_str(),
"s3://bucket-b/test-index"
);
assert_eq!(
index_config.extra_index_uris[1].as_str(),
"s3://bucket-c/test-index"
);
}

#[test]
fn test_extra_index_uris_defaults_to_empty() {
let config_yaml = r#"
version: 0.8
index_id: test-index
index_uri: s3://bucket-a/test-index
doc_mapping: {}
"#;
let index_config = load_index_config_from_user_config(
ConfigFormat::Yaml,
config_yaml.as_bytes(),
&Uri::for_test("s3://bucket-a"),
)
.unwrap();
assert!(index_config.extra_index_uris.is_empty());
}

#[test]
fn test_extra_index_uris_rejects_duplicate_of_index_uri() {
let config_yaml = r#"
version: 0.8
index_id: test-index
index_uri: s3://bucket-a/test-index
extra_index_uris:
- s3://bucket-a/test-index
doc_mapping: {}
"#;
let result = load_index_config_from_user_config(
ConfigFormat::Yaml,
config_yaml.as_bytes(),
&Uri::for_test("s3://bucket-a"),
);
assert!(result.is_err());
let error_msg = format!("{:?}", result.unwrap_err());
assert!(error_msg.contains("extra_index_uris"));
}

#[test]
fn test_all_index_uris() {
let mut index_config = IndexConfig::for_test("test-index", "s3://bucket-a/test-index");
assert_eq!(index_config.all_index_uris().len(), 1);
assert_eq!(
index_config.all_index_uris()[0].as_str(),
"s3://bucket-a/test-index"
);

index_config.extra_index_uris = vec![Uri::for_test("s3://bucket-b/test-index")];
let uris = index_config.all_index_uris();
assert_eq!(uris.len(), 2);
assert_eq!(uris[0].as_str(), "s3://bucket-a/test-index");
assert_eq!(uris[1].as_str(), "s3://bucket-b/test-index");
}
}
Loading
Loading