diff --git a/docs/configuration/index-config.md b/docs/configuration/index-config.md index c8f26ded709..5cfb5e203c0 100644 --- a/docs/configuration/index-config.md +++ b/docs/configuration/index-config.md @@ -28,6 +28,9 @@ index_id: "hdfs" index_uri: "s3://my-bucket/hdfs" +extra_index_uris: + - "s3://my-second-bucket/hdfs" + doc_mapping: mode: lenient field_mappings: @@ -82,6 +85,32 @@ By default, the `index-uri` will be computed by concatenating the `index-id` wit The file storage will not work when running quickwit in distributed mode. Instead, AWS S3, Azure Blob Storage, Google Cloud Storage (in s3 interoperability mode) or other S3-compatible storage systems including Scaleway Object Storage and Garage should be used as storage when running several searcher nodes. ::: +## Extra index URIs + +The `extra_index_uris` parameter is an optional list of additional [storage URIs](storage-config#storage-uris) where splits can be stored. When configured, new splits are distributed across all storage URIs (the primary `index_uri` plus the extra URIs) using a round-robin strategy. + +This enables **multi-bucket split sharding** — spreading an index's data across multiple storage buckets for better throughput. + + +**Example:** + +```yaml +version: 0.8 +index_id: "hdfs" +index_uri: "s3://bucket-a/hdfs" +extra_index_uris: + - "s3://bucket-b/hdfs" + - "s3://bucket-c/hdfs" +doc_mapping: + field_mappings: + - name: body + type: text +``` + +:::caution +Once an index uses `extra_index_uris`, it cannot be read by older Quickwit versions that don't support this feature. +::: + ## Doc mapping The doc mapping defines how a document and the fields it contains are stored and indexed for a given index. A document is a collection of named fields, each having its own data type (text, bytes, datetime, bool, i64, u64, f64, ip, json). diff --git a/docs/configuration/storage-config.md b/docs/configuration/storage-config.md index 916480dad63..c7d0494a4e3 100644 --- a/docs/configuration/storage-config.md +++ b/docs/configuration/storage-config.md @@ -20,7 +20,7 @@ Storage URIs refer to different storage providers identified by a URI "protocol" - `gs://` for Google Cloud Storage In general, you can use a storage URI or a file path anywhere you would intuitively expect a file path. For instance: -- when setting the `index_uri` of an index to specify the storage provider and location; +- when setting the `index_uri` or `extra_index_uris` of an index to specify the storage provider(s) and location(s); - when setting the `metastore_uri` in a node config to set up a file-backed metastore; - when passing a file path as a command line argument. diff --git a/docs/reference/rest-api.md b/docs/reference/rest-api.md index 54ac05c040b..2730b6d79e2 100644 --- a/docs/reference/rest-api.md +++ b/docs/reference/rest-api.md @@ -190,6 +190,7 @@ Create an index by posting an `IndexConfig` payload. The API accepts JSON with ` | `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ | | `index_id` | `String` | Index ID, see its [validation rules](../configuration/index-config.md#index-id) on identifiers. | _required_ | | `index_uri` | `String` | Defines where the index files are stored. This parameter expects a [storage URI](../configuration/storage-config.md#storage-uris). | `{default_index_root_uri}/{index_id}` | +| `extra_index_uris` | `Array` | Additional storage URIs for [multi-bucket split sharding](../configuration/index-config.md#extra-index-uris). Must not contain `index_uri`. | `[]` | | `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). | _required_ | | `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | | | `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | @@ -306,6 +307,7 @@ Updating the doc mapping doesn't reindex existing data. Queries and results are | `version` | `String` | Config format version, use the same as your Quickwit version. | _required_ | | `index_id` | `String` | Index ID, must be the same index as in the request URI. | _required_ | | `index_uri` | `String` | Defines where the index files are stored. Cannot be updated. | `{default_index_root_uri}/{index_id}` | +| `extra_index_uris` | `Array` | Additional storage URIs for [multi-bucket split sharding](../configuration/index-config.md#extra-index-uris). New URIs can be added but existing ones cannot be removed. | `[]` | | `doc_mapping` | `DocMapping` | Doc mapping object as specified in the [index config docs](../configuration/index-config.md#doc-mapping). | _required_ | | `indexing_settings` | `IndexingSettings` | Indexing settings object as specified in the [index config docs](../configuration/index-config.md#indexing-settings). | | | `search_settings` | `SearchSettings` | Search settings object as specified in the [index config docs](../configuration/index-config.md#search-settings). | | diff --git a/quickwit/quickwit-cli/src/lib.rs b/quickwit/quickwit-cli/src/lib.rs index aaeb4da7e9d..e91c3875aa1 100644 --- a/quickwit/quickwit-cli/src/lib.rs +++ b/quickwit/quickwit-cli/src/lib.rs @@ -292,6 +292,13 @@ pub async fn run_index_checklist( .deserialize_index_metadata()?; let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?; checks.push(("index storage", index_storage.check_connectivity().await)); + for extra_uri in &index_metadata.index_config.extra_index_uris { + let extra_storage = storage_resolver.resolve(extra_uri).await?; + checks.push(( + "extra index storage", + extra_storage.check_connectivity().await, + )); + } if let Some(source_config) = source_config_opt { checks.push(( diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 26bed948189..3601ab60d22 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -897,11 +897,34 @@ async fn extract_split_cli(args: ExtractSplitArgs) -> anyhow::Result<()> { .index_metadata(IndexMetadataRequest::for_index_id(args.index_id)) .await? .deserialize_index_metadata()?; - let index_storage = storage_resolver.resolve(index_metadata.index_uri()).await?; + // Resolve the storage URI for this specific split. + // The split may live in a different bucket than the primary index_uri. + let index_uri = index_metadata.index_uri().clone(); + let split_metadata = { + use quickwit_metastore::{ + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, + }; + use quickwit_proto::metastore::ListSplitsRequest; + let query = ListSplitsQuery::for_index(index_metadata.index_uid.clone()) + .with_split_ids(vec![args.split_id.clone()]); + let list_splits_request = ListSplitsRequest::try_from_list_splits_query(&query)?; + let splits = metastore + .list_splits(list_splits_request) + .await? + .collect_splits_metadata() + .await?; + splits.into_iter().next() + }; + let effective_uri = match &split_metadata { + Some(meta) => meta.effective_storage_uri(&index_uri), + None => &index_uri, + }; + let split_storage = storage_resolver.resolve(effective_uri).await?; + let split_file = PathBuf::from(format!("{}.split", args.split_id)); - let split_data = index_storage.get_all(split_file.as_path()).await?; + let split_data = split_storage.get_all(split_file.as_path()).await?; let (_hotcache_bytes, bundle_storage) = BundleStorage::open_from_split_data_with_owned_bytes( - index_storage, + split_storage, split_file, split_data, )?; diff --git a/quickwit/quickwit-common/src/uri.rs b/quickwit/quickwit-common/src/uri.rs index a094ae34ee8..7e1f8a1a4ac 100644 --- a/quickwit/quickwit-common/src/uri.rs +++ b/quickwit/quickwit-common/src/uri.rs @@ -25,7 +25,7 @@ use regex::Regex; use serde::de::Error; use serde::{Deserialize, Serialize, Serializer}; -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] #[repr(u8)] pub enum Protocol { @@ -116,6 +116,18 @@ pub struct Uri { protocol: Protocol, } +impl Ord for Uri { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.uri.cmp(&other.uri) + } +} + +impl PartialOrd for Uri { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl Uri { /// This is only used for test. We artificially restrict the lifetime to 'static /// to avoid misuses. diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 1f8af60aa57..50ad73e4bbe 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -311,6 +311,9 @@ pub struct IndexConfig { pub ingest_settings: IngestSettings, pub search_settings: SearchSettings, pub retention_policy_opt: Option, + /// Additional storage URIs for multi-bucket split sharding. + /// The effective list of all storage URIs is `[index_uri] + extra_index_uris`. + pub extra_index_uris: Vec, } impl IndexConfig { @@ -323,6 +326,7 @@ impl IndexConfig { let mut hasher = SipHasher::new(); self.doc_mapping.doc_mapping_uid.hash(&mut hasher); self.indexing_settings.hash(&mut hasher); + self.extra_index_uris.hash(&mut hasher); hasher.finish() } @@ -336,6 +340,16 @@ impl IndexConfig { self.indexing_params_fingerprint() == other.indexing_params_fingerprint() } + /// Returns the full list of storage URIs: the primary `index_uri` followed + /// by any additional URIs from `extra_index_uris`. + /// + /// The primary `index_uri` is always the first element. + pub fn all_index_uris(&self) -> Vec<&Uri> { + let mut uris = vec![&self.index_uri]; + uris.extend(self.extra_index_uris.iter()); + uris + } + #[cfg(any(test, feature = "testsuite"))] pub fn for_test(index_id: &str, index_uri: &str) -> Self { let index_uri = Uri::from_str(index_uri).unwrap(); @@ -420,6 +434,7 @@ impl IndexConfig { ingest_settings: IngestSettings::default(), search_settings, retention_policy_opt: None, + extra_index_uris: Vec::new(), } } } @@ -533,6 +548,7 @@ impl crate::TestableForRegression for IndexConfig { ingest_settings, search_settings, retention_policy_opt, + extra_index_uris: Vec::new(), } } @@ -544,6 +560,7 @@ impl crate::TestableForRegression for IndexConfig { assert_eq!(self.ingest_settings, other.ingest_settings); assert_eq!(self.search_settings, other.search_settings); assert_eq!(self.retention_policy_opt, other.retention_policy_opt); + assert_eq!(self.extra_index_uris, other.extra_index_uris); } } @@ -1181,4 +1198,85 @@ mod tests { assert_eq!(updated_doc_mapping.doc_mapping_uid, new_doc_mapping_uid); assert_eq!(updated_doc_mapping.mode, Mode::Strict); } + + #[test] + fn test_extra_index_uris_parsed_from_yaml() { + let config_yaml = r#" + version: 0.8 + index_id: test-index + index_uri: s3://bucket-a/test-index + extra_index_uris: + - s3://bucket-b/test-index + - s3://bucket-c/test-index + doc_mapping: {} + "#; + let index_config = load_index_config_from_user_config( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Uri::for_test("s3://bucket-a"), + ) + .unwrap(); + assert_eq!(index_config.extra_index_uris.len(), 2); + assert_eq!( + index_config.extra_index_uris[0].as_str(), + "s3://bucket-b/test-index" + ); + assert_eq!( + index_config.extra_index_uris[1].as_str(), + "s3://bucket-c/test-index" + ); + } + + #[test] + fn test_extra_index_uris_defaults_to_empty() { + let config_yaml = r#" + version: 0.8 + index_id: test-index + index_uri: s3://bucket-a/test-index + doc_mapping: {} + "#; + let index_config = load_index_config_from_user_config( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Uri::for_test("s3://bucket-a"), + ) + .unwrap(); + assert!(index_config.extra_index_uris.is_empty()); + } + + #[test] + fn test_extra_index_uris_rejects_duplicate_of_index_uri() { + let config_yaml = r#" + version: 0.8 + index_id: test-index + index_uri: s3://bucket-a/test-index + extra_index_uris: + - s3://bucket-a/test-index + doc_mapping: {} + "#; + let result = load_index_config_from_user_config( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &Uri::for_test("s3://bucket-a"), + ); + assert!(result.is_err()); + let error_msg = format!("{:?}", result.unwrap_err()); + assert!(error_msg.contains("extra_index_uris")); + } + + #[test] + fn test_all_index_uris() { + let mut index_config = IndexConfig::for_test("test-index", "s3://bucket-a/test-index"); + assert_eq!(index_config.all_index_uris().len(), 1); + assert_eq!( + index_config.all_index_uris()[0].as_str(), + "s3://bucket-a/test-index" + ); + + index_config.extra_index_uris = vec![Uri::for_test("s3://bucket-b/test-index")]; + let uris = index_config.all_index_uris(); + assert_eq!(uris.len(), 2); + assert_eq!(uris[0].as_str(), "s3://bucket-a/test-index"); + assert_eq!(uris[1].as_str(), "s3://bucket-b/test-index"); + } } diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 01b3692a85d..7d9b9b46e13 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -89,6 +89,13 @@ pub fn load_index_config_update( current_index_config.index_uri, new_index_config.index_uri ); + for current_uri in ¤t_index_config.extra_index_uris { + ensure!( + new_index_config.extra_index_uris.contains(current_uri), + "`extra_index_uris` cannot have URIs removed during an update, missing URI: {}", + current_uri, + ); + } let (updated_doc_mapping, _mutation_occurred) = prepare_doc_mapping_update( new_index_config.doc_mapping, ¤t_index_config.doc_mapping, @@ -134,6 +141,7 @@ impl IndexConfigForSerialization { ingest_settings: self.ingest_settings, search_settings: self.search_settings, retention_policy_opt: self.retention_policy_opt, + extra_index_uris: self.extra_index_uris, }; validate_index_config( &index_config.doc_mapping, @@ -141,6 +149,23 @@ impl IndexConfigForSerialization { &index_config.search_settings, &index_config.retention_policy_opt, )?; + // Validate that extra_index_uris don't contain the primary index_uri. + for extra_uri in &index_config.extra_index_uris { + ensure!( + *extra_uri != index_config.index_uri, + "`extra_index_uris` must not contain the primary `index_uri` ({})", + index_config.index_uri, + ); + } + // Validate that extra_index_uris doesn't contain duplicates. + let mut seen = std::collections::HashSet::new(); + for extra_uri in &index_config.extra_index_uris { + ensure!( + seen.insert(extra_uri), + "`extra_index_uris` contains duplicate URI ({})", + extra_uri, + ); + } Ok(index_config) } } @@ -184,6 +209,11 @@ pub struct IndexConfigV0_8 { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, + /// Additional storage URIs where splits can be sharded across multiple buckets. + /// The primary `index_uri` is always included implicitly. + #[schema(value_type = Vec)] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub extra_index_uris: Vec, } impl From for IndexConfigV0_8 { @@ -196,6 +226,7 @@ impl From for IndexConfigV0_8 { ingest_settings: index_config.ingest_settings, search_settings: index_config.search_settings, retention_policy_opt: index_config.retention_policy_opt, + extra_index_uris: index_config.extra_index_uris, } } } @@ -419,6 +450,176 @@ mod test { assert_eq!(updated_config.retention_policy_opt, None); } + #[test] + fn test_update_extra_index_uris() { + let original_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-b/hdfs-logs + "#; + let default_root = Uri::for_test("s3://mybucket"); + let original_config: IndexConfig = load_index_config_from_user_config( + ConfigFormat::Yaml, + original_config_yaml.as_bytes(), + &default_root, + ) + .unwrap(); + + // Adding a URI is allowed. + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-b/hdfs-logs + - s3://bucket-c/hdfs-logs + "#; + let updated_config = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &default_root, + &original_config, + ) + .unwrap(); + assert_eq!(updated_config.extra_index_uris.len(), 3); + + // Keeping the same URIs is allowed. + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-b/hdfs-logs + "#; + load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &default_root, + &original_config, + ) + .expect("keeping same extra_index_uris should succeed"); + + // Removing a URI is not allowed. + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + "#; + let err = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &default_root, + &original_config, + ) + .unwrap_err(); + assert!( + format!("{err:?}").contains("`extra_index_uris` cannot have URIs removed"), + "unexpected error: {err:?}" + ); + + // Removing all extra URIs is not allowed. + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + "#; + let err = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &default_root, + &original_config, + ) + .unwrap_err(); + assert!( + format!("{err:?}").contains("`extra_index_uris` cannot have URIs removed"), + "unexpected error: {err:?}" + ); + } + + #[test] + fn test_extra_index_uris_no_duplicates() { + let default_root = Uri::for_test("s3://mybucket"); + + // Duplicate URIs should be rejected. + let config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-a/hdfs-logs + "#; + let err = load_index_config_from_user_config( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &default_root, + ) + .unwrap_err(); + assert!( + format!("{err:?}").contains("`extra_index_uris` contains duplicate URI"), + "unexpected error: {err:?}" + ); + + // Duplicate URIs should also be rejected on update. + let original_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + "#; + let original_config: IndexConfig = load_index_config_from_user_config( + ConfigFormat::Yaml, + original_config_yaml.as_bytes(), + &default_root, + ) + .unwrap(); + + let updated_config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-b/hdfs-logs + - s3://bucket-b/hdfs-logs + "#; + let err = load_index_config_update( + ConfigFormat::Yaml, + updated_config_yaml.as_bytes(), + &default_root, + &original_config, + ) + .unwrap_err(); + assert!( + format!("{err:?}").contains("`extra_index_uris` contains duplicate URI"), + "unexpected error: {err:?}" + ); + + // Distinct URIs should be accepted. + let config_yaml = r#" + version: 0.8 + index_id: hdfs-logs + doc_mapping: {} + extra_index_uris: + - s3://bucket-a/hdfs-logs + - s3://bucket-b/hdfs-logs + "#; + load_index_config_from_user_config( + ConfigFormat::Yaml, + config_yaml.as_bytes(), + &default_root, + ) + .expect("distinct extra_index_uris should be valid"); + } + #[test] fn test_update_doc_mappings() { let original_config_yaml = r#" diff --git a/quickwit/quickwit-config/src/index_template/mod.rs b/quickwit/quickwit-config/src/index_template/mod.rs index 32fcad37e8d..56a639f2014 100644 --- a/quickwit/quickwit-config/src/index_template/mod.rs +++ b/quickwit/quickwit-config/src/index_template/mod.rs @@ -51,6 +51,8 @@ pub struct IndexTemplate { #[serde(rename = "retention")] #[serde(default)] pub retention_policy_opt: Option, + #[serde(default)] + pub extra_index_uris: Vec, } impl IndexTemplate { @@ -77,6 +79,7 @@ impl IndexTemplate { ingest_settings: self.ingest_settings.clone(), search_settings: self.search_settings.clone(), retention_policy_opt: self.retention_policy_opt.clone(), + extra_index_uris: self.extra_index_uris.clone(), }; Ok(index_config) } @@ -97,6 +100,15 @@ impl IndexTemplate { &self.search_settings, &self.retention_policy_opt, )?; + // Validate that extra_index_uris has no duplicates. + let mut seen_uris = std::collections::HashSet::new(); + for extra_uri in &self.extra_index_uris { + ensure!( + seen_uris.insert(extra_uri), + "`extra_index_uris` contains duplicate URI: {}", + extra_uri, + ); + } Ok(()) } @@ -134,6 +146,7 @@ impl IndexTemplate { ingest_settings: IngestSettings::default(), search_settings: SearchSettings::default(), retention_policy_opt: None, + extra_index_uris: Vec::new(), } } } @@ -179,6 +192,7 @@ impl crate::TestableForRegression for IndexTemplate { evaluation_schedule: "daily".to_string(), timestamp_type: Default::default(), }), + extra_index_uris: Vec::new(), } } @@ -275,6 +289,87 @@ mod tests { ); } + #[test] + fn test_index_template_serde_with_extra_index_uris() { + let index_template_yaml = r#" + version: 0.8 + + template_id: test-template + index_id_patterns: + - test-index-* + doc_mapping: + field_mappings: + - name: ts + type: datetime + fast: true + timestamp_field: ts + extra_index_uris: + - s3://bucket-b/indexes + - s3://bucket-c/indexes + "#; + let index_template: IndexTemplate = serde_yaml::from_str(index_template_yaml).unwrap(); + assert_eq!(index_template.extra_index_uris.len(), 2); + assert_eq!( + index_template.extra_index_uris[0].as_str(), + "s3://bucket-b/indexes" + ); + assert_eq!( + index_template.extra_index_uris[1].as_str(), + "s3://bucket-c/indexes" + ); + + // Round-trip through serde. + let serialized = serde_json::to_string(&index_template).unwrap(); + let deserialized: IndexTemplate = serde_json::from_str(&serialized).unwrap(); + assert_eq!( + deserialized.extra_index_uris, + index_template.extra_index_uris + ); + } + + #[test] + fn test_index_template_serde_without_extra_index_uris() { + let index_template_yaml = r#" + version: 0.8 + + template_id: test-template + index_id_patterns: + - test-index-* + doc_mapping: + field_mappings: + - name: ts + type: datetime + fast: true + timestamp_field: ts + "#; + let index_template: IndexTemplate = serde_yaml::from_str(index_template_yaml).unwrap(); + assert!(index_template.extra_index_uris.is_empty()); + } + + #[test] + fn test_index_template_apply_with_extra_index_uris() { + let mut index_template = IndexTemplate::for_test("test-template", &["test-index-*"], 0); + index_template.extra_index_uris = vec![ + Uri::for_test("s3://bucket-b/indexes"), + Uri::for_test("s3://bucket-c/indexes"), + ]; + + let default_index_root_uri = Uri::for_test("s3://bucket-a/indexes"); + let index_config = index_template + .apply_template("test-index-foo".to_string(), &default_index_root_uri) + .unwrap(); + + assert_eq!(index_config.extra_index_uris.len(), 2); + assert_eq!( + index_config.extra_index_uris[0].as_str(), + "s3://bucket-b/indexes" + ); + assert_eq!( + index_config.extra_index_uris[1].as_str(), + "s3://bucket-c/indexes" + ); + } + #[test] fn test_index_template_validate() { let index_template = IndexTemplate::for_test("", &[], 0); diff --git a/quickwit/quickwit-config/src/index_template/serialize.rs b/quickwit/quickwit-config/src/index_template/serialize.rs index 087d47b7856..0b2970e492f 100644 --- a/quickwit/quickwit-config/src/index_template/serialize.rs +++ b/quickwit/quickwit-config/src/index_template/serialize.rs @@ -55,6 +55,9 @@ pub struct IndexTemplateV0_8 { pub search_settings: SearchSettings, #[serde(default)] pub retention: Option, + #[schema(value_type = Vec)] + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub extra_index_uris: Vec, } impl From for IndexTemplate { @@ -84,6 +87,7 @@ impl From for IndexTemplate { ingest_settings: index_template_v0_8.ingest_settings, search_settings: index_template_v0_8.search_settings, retention_policy_opt: index_template_v0_8.retention, + extra_index_uris: index_template_v0_8.extra_index_uris, } } } @@ -101,6 +105,7 @@ impl From for IndexTemplateV0_8 { ingest_settings: index_template.ingest_settings, search_settings: index_template.search_settings, retention: index_template.retention_policy_opt, + extra_index_uris: index_template.extra_index_uris, } } } diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 2ca485026ee..81da6b7e8b6 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -32,7 +32,7 @@ use quickwit_proto::metastore::{ MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::{IndexUid, SplitId}; -use quickwit_storage::{BulkDeleteError, Storage}; +use quickwit_storage::{BulkDeleteError, Storage, StorageResolver}; use thiserror::Error; use time::OffsetDateTime; use tracing::{error, instrument}; @@ -66,7 +66,7 @@ impl RecordGcMetrics for Option { #[error("failed to delete splits from storage and/or metastore")] pub struct DeleteSplitsError { successes: Vec, - storage_error: Option, + storage_errors: Vec, storage_failures: Vec, metastore_error: Option, metastore_failures: Vec, @@ -103,6 +103,7 @@ pub struct SplitRemovalInfo { /// safely deleted. /// * `dry_run` - Should this only return a list of affected files without performing deletion. /// * `progress` - For reporting progress (useful when called from within a quickwit actor). +#[allow(clippy::too_many_arguments)] pub async fn run_garbage_collect( indexes: HashMap>, metastore: MetastoreServiceClient, @@ -111,6 +112,7 @@ pub async fn run_garbage_collect( dry_run: bool, progress_opt: Option<&Progress>, metrics: Option, + storage_resolver: &StorageResolver, ) -> anyhow::Result { let grace_period_timestamp = OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64; @@ -187,6 +189,7 @@ pub async fn run_garbage_collect( indexes, progress_opt, metrics, + storage_resolver, ) .await) } @@ -198,6 +201,7 @@ async fn delete_splits( progress_opt: Option<&Progress>, metrics: &Option, split_removal_info: &mut SplitRemovalInfo, + storage_resolver: &StorageResolver, ) -> Result<(), ()> { let mut delete_split_from_index_res_stream = futures::stream::iter(splits_metadata_to_delete_per_index) @@ -212,6 +216,7 @@ async fn delete_splits( metastore, splits_metadata_to_delete, progress_opt, + storage_resolver, ) .await } else { @@ -310,13 +315,14 @@ fn get_index_gc_concurrency() -> Option { /// /// The aim of this is to spread the load out across a longer period /// rather than short, heavy bursts on the metastore and storage system itself. -#[instrument(skip(storages, metastore, progress_opt, metrics), fields(num_indexes=%storages.len()))] +#[instrument(skip(storages, metastore, progress_opt, metrics, storage_resolver), fields(num_indexes=%storages.len()))] async fn delete_splits_marked_for_deletion_several_indexes( updated_before_timestamp: i64, metastore: MetastoreServiceClient, storages: HashMap>, progress_opt: Option<&Progress>, metrics: Option, + storage_resolver: &StorageResolver, ) -> SplitRemovalInfo { let mut split_removal_info = SplitRemovalInfo::default(); @@ -393,6 +399,7 @@ async fn delete_splits_marked_for_deletion_several_indexes( progress_opt, &metrics, &mut split_removal_info, + storage_resolver, ) .await; @@ -422,53 +429,97 @@ pub async fn delete_splits_from_storage_and_metastore( metastore: MetastoreServiceClient, splits: Vec, progress_opt: Option<&Progress>, + storage_resolver: &StorageResolver, ) -> Result, DeleteSplitsError> { - let mut split_infos: HashMap = HashMap::with_capacity(splits.len()); + let index_uri = storage.uri().clone(); + // Group splits by their effective storage URI so we can bulk-delete per bucket. + let mut splits_by_uri: HashMap> = HashMap::new(); for split in splits { - let split_info = split.as_split_info(); - split_infos.insert(split_info.file_name.clone(), split_info); + let effective_uri = split.effective_storage_uri(&index_uri).clone(); + splits_by_uri.entry(effective_uri).or_default().push(split); } - let split_paths = split_infos - .keys() - .map(|split_path_buf| split_path_buf.as_path()) - .collect::>(); - let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await; - - if let Some(progress) = progress_opt { - progress.record_progress(); - } - let mut successes = Vec::with_capacity(split_infos.len()); - let mut storage_error: Option = None; - let mut storage_failures = Vec::new(); - - match delete_result { - Ok(_) => successes.extend(split_infos.into_values()), - Err(bulk_delete_error) => { - let success_split_paths: HashSet<&PathBuf> = - bulk_delete_error.successes.iter().collect(); - for (split_path, split_info) in split_infos { - if success_split_paths.contains(&split_path) { - successes.push(split_info); - } else { - storage_failures.push(split_info); + + let total_split_count: usize = splits_by_uri.values().map(|v| v.len()).sum(); + let mut all_successes: Vec = Vec::with_capacity(total_split_count); + let mut all_storage_failures: Vec = Vec::new(); + let mut combined_storage_errors: Vec = Vec::new(); + + for (uri, group_splits) in splits_by_uri { + // Resolve the storage for this group of splits. + let group_storage = if uri == index_uri { + storage.clone() + } else { + match storage_resolver.resolve(&uri).await { + Ok(resolved) => resolved, + Err(resolve_err) => { + error!( + storage_uri=%uri, + index_id=%index_uid.index_id, + error=?resolve_err, + "failed to resolve storage URI for split group, marking splits as failed" + ); + let failed: Vec = group_splits + .into_iter() + .map(|s| s.as_split_info()) + .collect(); + all_storage_failures.extend(failed); + continue; } } - let failed_split_paths = storage_failures - .iter() - .map(|split_info| split_info.file_name.as_path()) - .collect::>(); - error!( - error=?bulk_delete_error.error, - index_id=index_uid.index_id, - "failed to delete split file(s) {:?} from storage", - PrettySample::new(&failed_split_paths, 5), - ); - storage_error = Some(bulk_delete_error); + }; + + let mut split_infos: HashMap = + HashMap::with_capacity(group_splits.len()); + for split in group_splits { + let split_info = split.as_split_info(); + split_infos.insert(split_info.file_name.clone(), split_info); } - }; - if !successes.is_empty() { - let split_ids: Vec = successes + let split_paths: Vec<&Path> = split_infos + .keys() + .map(|split_path_buf| split_path_buf.as_path()) + .collect(); + + let delete_result = + protect_future(progress_opt, group_storage.bulk_delete(&split_paths)).await; + + if let Some(progress) = progress_opt { + progress.record_progress(); + } + + match delete_result { + Ok(_) => all_successes.extend(split_infos.into_values()), + Err(bulk_delete_error) => { + let success_split_paths: HashSet<&PathBuf> = + bulk_delete_error.successes.iter().collect(); + let mut current_uri_failures: Vec = Vec::new(); + for (split_path, split_info) in split_infos { + if success_split_paths.contains(&split_path) { + all_successes.push(split_info); + } else { + current_uri_failures.push(split_info); + } + } + let failed_split_paths = current_uri_failures + .iter() + .map(|split_info| split_info.file_name.as_path()) + .collect::>(); + error!( + error=?bulk_delete_error.error, + index_id=index_uid.index_id, + storage_uri=%uri, + "failed to delete split file(s) {:?} from storage", + PrettySample::new(&failed_split_paths, 5), + ); + all_storage_failures.extend(current_uri_failures); + combined_storage_errors.push(bulk_delete_error); + } + } + } + + // Delete successful splits from the metastore. + if !all_successes.is_empty() { + let split_ids: Vec = all_successes .iter() .map(|split_info| split_info.split_id.to_string()) .collect(); @@ -488,25 +539,25 @@ pub async fn delete_splits_from_storage_and_metastore( ); let delete_splits_error = DeleteSplitsError { successes: Vec::new(), - storage_error, - storage_failures, + storage_errors: combined_storage_errors, + storage_failures: all_storage_failures, metastore_error: Some(metastore_error), - metastore_failures: successes, + metastore_failures: all_successes, }; return Err(delete_splits_error); } } - if !storage_failures.is_empty() { + if !all_storage_failures.is_empty() { let delete_splits_error = DeleteSplitsError { - successes, - storage_error, - storage_failures, + successes: all_successes, + storage_errors: combined_storage_errors, + storage_failures: all_storage_failures, metastore_error: None, metastore_failures: Vec::new(), }; return Err(delete_splits_error); } - Ok(successes) + Ok(all_successes) } #[cfg(test)] @@ -589,6 +640,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -617,6 +669,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -695,6 +748,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -723,6 +777,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -762,6 +817,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -822,6 +878,7 @@ mod tests { metastore.clone(), vec![split_metadata], None, + &StorageResolver::unconfigured(), ) .await .unwrap(); @@ -848,6 +905,11 @@ mod tests { #[tokio::test] async fn test_delete_splits_from_storage_and_metastore_storage_error() { let mut mock_storage = MockStorage::new(); + mock_storage + .expect_uri() + .return_const(quickwit_common::uri::Uri::for_test( + "ram:///indexes/test-delete-splits-storage-error--index", + )); mock_storage .expect_bulk_delete() .return_once(|split_paths| { @@ -922,6 +984,7 @@ mod tests { metastore.clone(), vec![split_metadata_0, split_metadata_1], None, + &StorageResolver::unconfigured(), ) .await .unwrap_err(); @@ -944,6 +1007,11 @@ mod tests { #[tokio::test] async fn test_delete_splits_from_storage_and_metastore_metastore_error() { let mut mock_storage = MockStorage::new(); + mock_storage + .expect_uri() + .return_const(quickwit_common::uri::Uri::for_test( + "ram:///indexes/test-delete-splits-storage-error--index", + )); mock_storage .expect_bulk_delete() .return_once(|split_paths| { @@ -995,6 +1063,7 @@ mod tests { MetastoreServiceClient::from_mock(mock_metastore), vec![split_metadata_0, split_metadata_1], None, + &StorageResolver::unconfigured(), ) .await .unwrap_err(); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index d5d493d0f18..9a8b2870767 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -178,6 +178,9 @@ impl IndexService { index_uid: IndexUid, index_config: IndexConfig, ) -> Result { + validate_storage_uri(&self.storage_resolver, &index_config) + .await + .map_err(IndexServiceError::InvalidConfig)?; let update_index_request = UpdateIndexRequest::try_from_updates( index_uid, &index_config.doc_mapping, @@ -185,6 +188,7 @@ impl IndexService { &index_config.ingest_settings, &index_config.search_settings, &index_config.retention_policy_opt, + &index_config.extra_index_uris, )?; let update_index_response = self.metastore.update_index(update_index_request).await?; let index_metadata = update_index_response.deserialize_index_metadata()?; @@ -258,6 +262,7 @@ impl IndexService { self.metastore.clone(), splits_metadata_to_delete, None, + &self.storage_resolver, ) .await?; let delete_index_request = DeleteIndexRequest { @@ -399,6 +404,7 @@ impl IndexService { dry_run, None, None, + &self.storage_resolver, ) .await?; @@ -449,6 +455,7 @@ impl IndexService { self.metastore.clone(), splits_metadata, None, + &self.storage_resolver, ) .await { @@ -582,6 +589,9 @@ pub async fn validate_storage_uri( index_config: &IndexConfig, ) -> anyhow::Result<()> { storage_resolver.resolve(&index_config.index_uri).await?; + for extra_uri in &index_config.extra_index_uris { + storage_resolver.resolve(extra_uri).await?; + } Ok(()) } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 65bd824b1b9..84c3ba7c4f7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -32,7 +32,7 @@ use quickwit_ingest::IngesterPool; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient}; use quickwit_proto::types::ShardId; -use quickwit_storage::{Storage, StorageResolver}; +use quickwit_storage::StorageResolver; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -578,7 +578,6 @@ impl Handler for IndexingPipeline { pub struct IndexingPipelineParams { pub pipeline_id: IndexingPipelineId, pub metastore: MetastoreServiceClient, - pub storage: Arc, // Indexing-related parameters pub doc_mapper: Arc, @@ -710,7 +709,7 @@ mod tests { let universe = Universe::new(); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); let pipeline_params = IndexingPipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -720,7 +719,6 @@ mod tests { indexing_settings: IndexingSettings::for_test(), ingester_pool: IngesterPool::default(), metastore: MetastoreServiceClient::from_mock(mock_metastore), - storage, split_store, merge_policy: default_merge_policy(), retention_policy: None, @@ -833,7 +831,7 @@ mod tests { let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let pipeline_params = IndexingPipelineParams { pipeline_id, @@ -845,7 +843,6 @@ mod tests { ingester_pool: IngesterPool::default(), metastore: MetastoreServiceClient::from_mock(mock_metastore), queues_dir_path: PathBuf::from("./queues"), - storage, split_store, merge_policy: default_merge_policy(), retention_policy: None, @@ -918,7 +915,7 @@ mod tests { let universe = Universe::with_accelerated_time(); let doc_mapper = Arc::new(default_doc_mapper_for_test()); let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); let merge_pipeline_params = MergePipelineParams { pipeline_id: pipeline_id.merge_pipeline_id(), doc_mapper: doc_mapper.clone(), @@ -946,7 +943,6 @@ mod tests { ingester_pool: IngesterPool::default(), metastore, queues_dir_path: PathBuf::from("./queues"), - storage, split_store, merge_policy: default_merge_policy(), retention_policy: None, @@ -1044,7 +1040,7 @@ mod tests { .returning(|_| Ok(EmptyResponse {})); let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); + let split_store = IndexingSplitStore::create_without_local_store_for_test(storage); let (merge_planner_mailbox, _) = universe.create_test_mailbox(); // Create a minimal mapper with wrong date format to ensure that all documents will fail let broken_mapper = serde_json::from_str::( @@ -1074,7 +1070,6 @@ mod tests { ingester_pool: IngesterPool::default(), metastore: MetastoreServiceClient::from_mock(mock_metastore), queues_dir_path: PathBuf::from("./queues"), - storage, split_store, merge_policy: default_merge_policy(), retention_policy: None, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 363c9891f0c..0fc21cd5b5d 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -282,15 +282,6 @@ impl IndexingService { let message = format!("failed to create indexing directory: {error}"); IndexingError::Internal(message) })?; - let storage = self - .storage_resolver - .resolve(&index_config.index_uri) - .await - .map_err(|error| { - let message = format!("failed to spawn indexing pipeline: {error}"); - IndexingError::Internal(message) - })?; - let mut indexing_settings = index_config.indexing_settings.clone(); if let SourceParams::Kafka(kafka_params) = &source_config.source_params && let Some(indexing_settings_value) = @@ -308,7 +299,25 @@ impl IndexingService { } let merge_policy = crate::merge_policy::merge_policy_from_settings(&indexing_settings); let retention_policy = index_config.retention_policy_opt.clone(); - let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); + let primary_uri = index_config.index_uri.clone(); + let all_uris = index_config.all_index_uris(); + let mut storages = HashMap::new(); + let mut selector_uris = Vec::with_capacity(all_uris.len()); + for uri in &all_uris { + let resolved_storage = self.storage_resolver.resolve(uri).await.map_err(|error| { + let message = format!("failed to resolve storage for URI {uri}: {error}"); + IndexingError::Internal(message) + })?; + storages.insert((*uri).clone(), resolved_storage); + selector_uris.push((*uri).clone()); + } + let bucket_selector = crate::split_store::default_bucket_selector(selector_uris); + let split_store = IndexingSplitStore::new( + storages, + bucket_selector, + self.local_split_store.clone(), + primary_uri, + ); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) .map_err(|error| IndexingError::Internal(error.to_string()))?; @@ -356,7 +365,6 @@ impl IndexingService { let pipeline_params = IndexingPipelineParams { pipeline_id: indexing_pipeline_id.clone(), metastore: self.metastore.clone(), - storage, // Indexing-related parameters doc_mapper, @@ -1061,7 +1069,7 @@ mod tests { use quickwit_common::rand::append_random_suffix; use quickwit_config::{ IngestApiConfig, KafkaSourceParams, SourceConfig, SourceInputFormat, SourceParams, - VecSourceParams, + VecSourceParams, indexing_pipeline_params_fingerprint, }; use quickwit_ingest::{CreateQueueIfNotExistsRequest, init_ingest_api}; use quickwit_metastore::{ @@ -1272,10 +1280,6 @@ mod tests { #[tokio::test] async fn test_indexing_service_apply_plan() { - const PARAMS_FINGERPRINT_INGEST_API: u64 = 1637744865450232394; - const PARAMS_FINGERPRINT_SOURCE_1: u64 = 1705211905504908791; - const PARAMS_FINGERPRINT_SOURCE_2: u64 = 8706667372658059428; - quickwit_common::setup_logging_for_tests(); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) @@ -1325,6 +1329,12 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; + let params_fingerprint_ingest_api = indexing_pipeline_params_fingerprint( + &index_config, + &SourceConfig::ingest_api_default(), + ); + let params_fingerprint_source_1 = + indexing_pipeline_params_fingerprint(&index_config, &source_config_1); { // Assign 2 indexing tasks // -> total: 1 source * 2 pipelines @@ -1338,14 +1348,14 @@ mod tests { source_id: source_config_1.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(0u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1, + params_fingerprint: params_fingerprint_source_1, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_1.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1, + params_fingerprint: params_fingerprint_source_1, }, ]; indexing_service @@ -1374,6 +1384,8 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; + let params_fingerprint_source_2 = + indexing_pipeline_params_fingerprint(&index_config, &source_config_2); { // Assign 2 more indexing tasks (1 new source + activate ingest API source) // -> total: 2 source * 1 pipeline + 1 source * 2 pipelines @@ -1388,28 +1400,28 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: PARAMS_FINGERPRINT_INGEST_API, + params_fingerprint: params_fingerprint_ingest_api, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_1.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1, + params_fingerprint: params_fingerprint_source_1, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_1.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(2u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1, + params_fingerprint: params_fingerprint_source_1, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2, + params_fingerprint: params_fingerprint_source_2, }, ]; indexing_service @@ -1451,21 +1463,21 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: PARAMS_FINGERPRINT_INGEST_API, + params_fingerprint: params_fingerprint_ingest_api, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_1.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(1u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_1, + params_fingerprint: params_fingerprint_source_1, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2, + params_fingerprint: params_fingerprint_source_2, }, ]; indexing_service @@ -1510,7 +1522,7 @@ mod tests { source_id: INGEST_API_SOURCE_ID.to_string(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(3u128)), - params_fingerprint: PARAMS_FINGERPRINT_INGEST_API, + params_fingerprint: params_fingerprint_ingest_api, }, IndexingTask { index_uid: Some(metadata.index_uid.clone()), @@ -1524,7 +1536,7 @@ mod tests { source_id: source_config_2.source_id.clone(), shard_ids: Vec::new(), pipeline_uid: Some(PipelineUid::for_test(4u128)), - params_fingerprint: PARAMS_FINGERPRINT_SOURCE_2, + params_fingerprint: params_fingerprint_source_2, }, ]; indexing_service diff --git a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs index 7d124288288..31622aa6cda 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_split_downloader.rs @@ -113,7 +113,7 @@ impl MergeSplitDownloader { let _protect_guard = ctx.protect_zone(); let tantivy_dir = self .split_store - .fetch_and_open_split(split.split_id(), download_directory, &io_controls) + .fetch_and_open_split(split, download_directory, &io_controls) .await .map_err(|error| { let split_id = split.split_id(); diff --git a/quickwit/quickwit-indexing/src/actors/uploader.rs b/quickwit/quickwit-indexing/src/actors/uploader.rs index 1d9e71d87ba..03af06def2c 100644 --- a/quickwit/quickwit-indexing/src/actors/uploader.rs +++ b/quickwit/quickwit-indexing/src/actors/uploader.rs @@ -327,7 +327,7 @@ impl Handler for Uploader { return; } }; - let split_metadata = create_split_metadata( + let mut split_metadata = create_split_metadata( &merge_policy, retention_policy.as_ref(), &packaged_split.split_attrs, @@ -335,8 +335,12 @@ impl Handler for Uploader { split_streamer.footer_range.start..split_streamer.footer_range.end, ); + // Select the target bucket and record it in the split metadata. + let target_uri = split_store.select_bucket().clone(); + split_metadata.storage_uri = Some(target_uri.clone()); + report_splits.push(ReportSplit { - storage_uri: split_store.remote_uri().to_string(), + storage_uri: target_uri.to_string(), split_id: packaged_split.split_id().to_string(), }); diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 9183fda3890..b299023386e 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -31,7 +31,10 @@ pub use crate::actors::{ }; pub use crate::controlled_directory::ControlledDirectory; use crate::models::IndexingStatistics; -pub use crate::split_store::{IndexingSplitStore, get_tantivy_directory_from_split_bundle}; +pub use crate::split_store::{ + IndexingSplitCache, IndexingSplitStore, default_bucket_selector, + get_tantivy_directory_from_split_bundle, +}; pub mod actors; mod controlled_directory; diff --git a/quickwit/quickwit-indexing/src/mature_merge.rs b/quickwit/quickwit-indexing/src/mature_merge.rs index de04b9c4ae2..2209aef27e0 100644 --- a/quickwit/quickwit-indexing/src/mature_merge.rs +++ b/quickwit/quickwit-indexing/src/mature_merge.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use anyhow::{Context, bail}; @@ -331,13 +332,25 @@ async fn merge_mature_single_index( }); } - let index_uri = index_metadata.index_uri(); - let remote_storage = storage_resolver - .resolve(index_uri) - .await - .context("failed to resolve index storage")?; - let split_store = - IndexingSplitStore::new(remote_storage, Arc::new(IndexingSplitCache::no_caching())); + let primary_uri = index_metadata.index_uri().clone(); + let all_uris = index_metadata.index_config.all_index_uris(); + let mut storages = HashMap::new(); + let mut selector_uris = Vec::with_capacity(all_uris.len()); + for uri in &all_uris { + let resolved = storage_resolver + .resolve(uri) + .await + .with_context(|| format!("failed to resolve storage for URI {uri}"))?; + storages.insert((*uri).clone(), resolved); + selector_uris.push((*uri).clone()); + } + let bucket_selector = crate::split_store::default_bucket_selector(selector_uris); + let split_store = IndexingSplitStore::new( + storages, + bucket_selector, + Arc::new(IndexingSplitCache::no_caching()), + primary_uri, + ); let outcome = run_mature_merges_for_index( &index_metadata, @@ -730,6 +743,7 @@ mod tests { &index_metadata_v1.index_config.ingest_settings, &index_metadata_v1.index_config.search_settings, &index_metadata_v1.index_config.retention_policy_opt, + &index_metadata_v1.index_config.extra_index_uris, )?; metastore.update_index(update_request).await?; diff --git a/quickwit/quickwit-indexing/src/models/split_attrs.rs b/quickwit/quickwit-indexing/src/models/split_attrs.rs index 4a8076c4ed6..163e6c4f9f3 100644 --- a/quickwit/quickwit-indexing/src/models/split_attrs.rs +++ b/quickwit/quickwit-indexing/src/models/split_attrs.rs @@ -153,6 +153,7 @@ pub fn create_split_metadata( delete_opstamp: split_attrs.delete_opstamp, num_merge_ops: split_attrs.num_merge_ops, soft_deleted_doc_ids: BTreeSet::new(), + storage_uri: None, } } diff --git a/quickwit/quickwit-indexing/src/split_store/bucket_selector.rs b/quickwit/quickwit-indexing/src/split_store/bucket_selector.rs new file mode 100644 index 00000000000..a33f0900078 --- /dev/null +++ b/quickwit/quickwit-indexing/src/split_store/bucket_selector.rs @@ -0,0 +1,104 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use quickwit_common::uri::Uri; + +/// Selects which storage bucket to write the next split to. +/// +/// Implementations own the list of available URIs and encapsulate the +/// selection algorithm. This keeps the `IndexingSplitStore` simple — it +/// just calls `select()` and writes to the returned URI. +pub trait BucketSelector: Send + Sync + 'static { + /// Returns the storage URI to use for writing the next split. + fn select(&self) -> &Uri; +} + +/// Round-robin bucket selector that cycles through the configured URIs. +pub struct RoundRobinBucketSelector { + uris: Vec, + counter: AtomicU64, +} + +impl RoundRobinBucketSelector { + /// Creates a new round-robin selector. + /// + /// # Panics + /// + /// Panics if `uris` is empty. + pub fn new(uris: Vec) -> Self { + assert!(!uris.is_empty(), "BucketSelector requires at least one URI"); + Self { + uris, + counter: AtomicU64::new(0), + } + } +} + +impl BucketSelector for RoundRobinBucketSelector { + fn select(&self) -> &Uri { + let idx = self.counter.fetch_add(1, Ordering::Relaxed) as usize % self.uris.len(); + &self.uris[idx] + } +} + +/// Creates the default bucket selector (round-robin) from a list of URIs. +pub fn default_bucket_selector(uris: Vec) -> Arc { + Arc::new(RoundRobinBucketSelector::new(uris)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_robin_single_uri() { + let uri = Uri::for_test("s3://bucket-a/index"); + let selector = RoundRobinBucketSelector::new(vec![uri.clone()]); + for _ in 0..10 { + assert_eq!(selector.select(), &uri); + } + } + + #[test] + fn test_round_robin_multiple_uris() { + let uri_a = Uri::for_test("s3://bucket-a/index"); + let uri_b = Uri::for_test("s3://bucket-b/index"); + let uri_c = Uri::for_test("s3://bucket-c/index"); + let selector = + RoundRobinBucketSelector::new(vec![uri_a.clone(), uri_b.clone(), uri_c.clone()]); + + assert_eq!(selector.select(), &uri_a); + assert_eq!(selector.select(), &uri_b); + assert_eq!(selector.select(), &uri_c); + assert_eq!(selector.select(), &uri_a); + assert_eq!(selector.select(), &uri_b); + assert_eq!(selector.select(), &uri_c); + } + + #[test] + #[should_panic(expected = "BucketSelector requires at least one URI")] + fn test_round_robin_empty_uris_panics() { + RoundRobinBucketSelector::new(Vec::new()); + } + + #[test] + fn test_default_bucket_selector() { + let uri = Uri::for_test("s3://bucket-a/index"); + let selector = default_bucket_selector(vec![uri.clone()]); + assert_eq!(selector.select(), &uri); + } +} diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs index 88904cbd6dd..668355ee9a4 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_store.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(any(test, feature = "testsuite"))] use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -24,7 +23,7 @@ use bytesize::ByteSize; use quickwit_common::io::{IoControls, IoControlsAccess}; use quickwit_common::uri::Uri; use quickwit_metastore::SplitMetadata; -use quickwit_storage::{PutPayload, Storage, StorageResult}; +use quickwit_storage::{PutPayload, Storage, StorageErrorKind, StorageResult}; use tantivy::Directory; use tantivy::directory::{Advice, MmapDirectory}; use time::OffsetDateTime; @@ -58,19 +57,34 @@ pub struct IndexingSplitStore { } struct InnerIndexingSplitStore { - /// The remote storage. - remote_storage: Arc, + /// Pre-resolved storages keyed by URI. + storages: HashMap>, + /// Selects which bucket to write the next split to. + bucket_selector: Arc, + /// Local disk cache for immature splits (shared, URI-agnostic since split IDs are ULIDs). split_cache: Arc, + /// The primary storage URI (the index's `index_uri`). Used as fallback for + /// splits that don't have a `storage_uri` set. + primary_uri: Uri, } impl IndexingSplitStore { - /// Creates an instance of [`IndexingSplitStore`] + /// Creates an instance of [`IndexingSplitStore`]. /// - /// It needs the remote storage to work with. - pub fn new(remote_storage: Arc, split_cache: Arc) -> Self { + /// `storages` maps each URI to its resolved storage. `bucket_selector` picks + /// which URI to use for each new split. `primary_uri` is the fallback URI + /// for reading splits that don't have a `storage_uri`. + pub fn new( + storages: HashMap>, + bucket_selector: Arc, + split_cache: Arc, + primary_uri: Uri, + ) -> Self { let inner = InnerIndexingSplitStore { - remote_storage, + storages, + bucket_selector, split_cache, + primary_uri, }; Self { inner: Arc::new(inner), @@ -80,17 +94,24 @@ impl IndexingSplitStore { /// Helper function to create a indexing split store for tests. /// The resulting store does not have any local cache. pub fn create_without_local_store_for_test(remote_storage: Arc) -> Self { + let primary_uri = remote_storage.uri().clone(); + let mut storages = HashMap::new(); + storages.insert(primary_uri.clone(), remote_storage); + let bucket_selector = super::default_bucket_selector(vec![primary_uri.clone()]); let inner = InnerIndexingSplitStore { - remote_storage, + storages, + bucket_selector, split_cache: Arc::new(IndexingSplitCache::no_caching()), + primary_uri, }; IndexingSplitStore { inner: Arc::new(inner), } } - pub fn remote_uri(&self) -> &Uri { - self.inner.remote_storage.uri() + /// Selects the next bucket URI for writing a split. + pub fn select_bucket(&self) -> &Uri { + self.inner.bucket_selector.select() } fn split_path(&self, split_id: &str) -> PathBuf { @@ -113,21 +134,29 @@ impl IndexingSplitStore { split_folder_path: &Path, put_payload: Box, ) -> anyhow::Result<()> { + let target_uri = split.storage_uri.as_ref().with_context(|| { + format!("split {} doesn't have a storage_uri set", split.split_id()) + })?; + let storage = self + .inner + .storages + .get(target_uri) + .with_context(|| format!("no storage configured for URI {target_uri}"))? + .clone(); let start = Instant::now(); let split_num_bytes = put_payload.len(); let key = self.split_path(split.split_id()); let is_mature = split.is_mature(OffsetDateTime::now_utc()); - self.inner - .remote_storage + storage .put(&key, put_payload) - .instrument(info_span!("store_split_in_remote_storage", split=?split.split_id(), is_mature=is_mature, num_bytes=split_num_bytes)) + .instrument(info_span!("store_split_in_remote_storage", split=?split.split_id(), is_mature=is_mature, num_bytes=split_num_bytes, %target_uri)) .await .with_context(|| { format!( "failed uploading key {} in bucket {}", key.display(), - self.inner.remote_storage.uri() + target_uri ) })?; @@ -178,10 +207,11 @@ impl IndexingSplitStore { #[instrument(skip(self, output_dir_path, io_controls), fields(cache_hit))] pub async fn fetch_and_open_split( &self, - split_id: &str, + split: &SplitMetadata, output_dir_path: &Path, io_controls: &IoControls, ) -> StorageResult> { + let split_id = split.split_id(); let path = PathBuf::from(quickwit_common::split_file(split_id)); if let Some(split_path) = self .inner @@ -198,13 +228,24 @@ impl IndexingSplitStore { } else { tracing::Span::current().record("cache_hit", false); } + let effective_uri = split.effective_storage_uri(&self.inner.primary_uri); + let storage = self + .inner + .storages + .get(effective_uri) + .cloned() + .ok_or_else(|| { + StorageErrorKind::Internal.with_error(anyhow::anyhow!( + "no storage configured for URI {effective_uri} (split {})", + split.split_id() + )) + })?; let dest_filepath = output_dir_path.join(&path); let dest_file = tokio::fs::File::create(&dest_filepath).await?; let mut dest_file_with_write_limit = io_controls.clone().wrap_write(dest_file); - self.inner - .remote_storage + storage .copy_to(&path, &mut dest_file_with_write_limit) - .instrument(info_span!("fetch_split_from_remote_storage", path=?path)) + .instrument(info_span!("fetch_split_from_remote_storage", path=?path, %effective_uri)) .await?; get_tantivy_directory_from_split_bundle(&dest_filepath) } @@ -218,13 +259,14 @@ impl IndexingSplitStore { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use bytesize::ByteSize; use quickwit_common::io::IoControls; use quickwit_metastore::{SplitMaturity, SplitMetadata}; - use quickwit_storage::{PutPayload, RamStorage, SplitPayloadBuilder}; + use quickwit_storage::{PutPayload, RamStorage, SplitPayloadBuilder, Storage}; use tempfile::tempdir; use time::OffsetDateTime; use tokio::fs; @@ -254,8 +296,17 @@ mod tests { SplitStoreQuota::default(), ) .await?; - let remote_storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::new(remote_storage, Arc::new(split_cache)); + let remote_storage: Arc = Arc::new(RamStorage::default()); + let primary_uri = remote_storage.uri().clone(); + let mut storages = HashMap::new(); + storages.insert(primary_uri.clone(), remote_storage); + let bucket_selector = super::super::default_bucket_selector(vec![primary_uri.clone()]); + let split_store = IndexingSplitStore::new( + storages, + bucket_selector, + Arc::new(split_cache), + primary_uri, + ); let split_id1 = Ulid::new().to_string(); let split_id2 = Ulid::new().to_string(); @@ -263,7 +314,8 @@ mod tests { { let split1_dir = temp_dir.path().join(&split_id1); fs::create_dir_all(&split1_dir).await?; - let split_metadata1 = create_test_split_metadata(&split_id1); + let mut split_metadata1 = create_test_split_metadata(&split_id1); + split_metadata1.storage_uri = Some(split_store.select_bucket().clone()); fs::write(split1_dir.join("splitfile"), b"1234").await?; split_store .store_split(&split_metadata1, &split1_dir, Box::new(b"1234".to_vec())) @@ -286,7 +338,8 @@ mod tests { let split2_dir = temp_dir.path().join(&split_id2); fs::create_dir_all(&split2_dir).await?; fs::write(split2_dir.join("splitfile"), b"567").await?; - let split_metadata2 = create_test_split_metadata(&split_id2); + let mut split_metadata2 = create_test_split_metadata(&split_id2); + split_metadata2.storage_uri = Some(split_store.select_bucket().clone()); split_store .store_split(&split_metadata2, &split2_dir, Box::new(b"567".to_vec())) .await?; @@ -313,8 +366,9 @@ mod tests { let io_controls = IoControls::default(); { let output = tempfile::tempdir()?; + let split_metadata1 = create_test_split_metadata(&split_id1); let split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split(&split_metadata1, output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 1); @@ -322,8 +376,9 @@ mod tests { } { let output = tempfile::tempdir()?; + let split_metadata2 = create_test_split_metadata(&split_id2); let split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split(&split_metadata2, output.path(), &io_controls) .await?; let local_store_stats = split_store.inspect_split_cache().await; assert_eq!(local_store_stats.len(), 0); @@ -344,8 +399,17 @@ mod tests { ) .await?; - let remote_storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::new(remote_storage, Arc::new(split_cache)); + let remote_storage: Arc = Arc::new(RamStorage::default()); + let primary_uri = remote_storage.uri().clone(); + let mut storages = HashMap::new(); + storages.insert(primary_uri.clone(), remote_storage); + let bucket_selector = super::super::default_bucket_selector(vec![primary_uri.clone()]); + let split_store = IndexingSplitStore::new( + storages, + bucket_selector, + Arc::new(split_cache), + primary_uri, + ); let split_id1 = Ulid::new().to_string(); let split_payload1 = SplitPayloadBuilder::get_split_payload(&[], &[], &[5, 5, 5])?; @@ -356,7 +420,8 @@ mod tests { let split_path = temp_dir.path().join(&split_id1); fs::create_dir_all(&split_path).await?; fs::write(split_path.join("splitdatafile"), b"hello-world").await?; - let split_metadata1 = create_test_split_metadata(&split_id1); + let mut split_metadata1 = create_test_split_metadata(&split_id1); + split_metadata1.storage_uri = Some(split_store.select_bucket().clone()); split_store .store_split( &split_metadata1, @@ -382,8 +447,8 @@ mod tests { let split_path = temp_dir.path().join(&split_id2); fs::create_dir_all(&split_path).await?; fs::write(split_path.join("splitdatafile2"), b"hello-world2").await?; - let split_metadata2 = create_test_split_metadata(&split_id2); - + let mut split_metadata2 = create_test_split_metadata(&split_id2); + split_metadata2.storage_uri = Some(split_store.select_bucket().clone()); split_store .store_split( &split_metadata2, @@ -410,7 +475,11 @@ mod tests { // get from remote storage because split_id1 was evicted by split_id2 let output = tempfile::tempdir()?; let _split1 = split_store - .fetch_and_open_split(&split_id1, output.path(), &io_controls) + .fetch_and_open_split( + &create_test_split_metadata(&split_id1), + output.path(), + &io_controls, + ) .await?; assert_eq!(io_controls.num_bytes(), split_payload1.len()); } @@ -418,7 +487,11 @@ mod tests { // get from cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split( + &create_test_split_metadata(&split_id2), + output.path(), + &io_controls, + ) .await?; // the number of downloaded by didn't change (still the size of split_payload1) assert_eq!(io_controls.num_bytes(), split_payload1.len()); @@ -427,7 +500,11 @@ mod tests { // get from remote because getting from cache removes the split from the cache let output = tempfile::tempdir()?; let _split2 = split_store - .fetch_and_open_split(&split_id2, output.path(), &io_controls) + .fetch_and_open_split( + &create_test_split_metadata(&split_id2), + output.path(), + &io_controls, + ) .await?; assert_eq!( io_controls.num_bytes(), diff --git a/quickwit/quickwit-indexing/src/split_store/mod.rs b/quickwit/quickwit-indexing/src/split_store/mod.rs index 9bef928b984..33043333dec 100644 --- a/quickwit/quickwit-indexing/src/split_store/mod.rs +++ b/quickwit/quickwit-indexing/src/split_store/mod.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod bucket_selector; mod indexing_split_cache; mod indexing_split_store; mod split_store_quota; +pub use bucket_selector::{BucketSelector, default_bucket_selector}; pub use indexing_split_cache::{IndexingSplitCache, get_tantivy_directory_from_split_bundle}; pub use indexing_split_store::IndexingSplitStore; pub use split_store_quota::SplitStoreQuota; diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index 34cefdc46a7..1b08c41c218 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -17,6 +17,7 @@ mod ingest_v1_tests; mod ingest_v2_tests; #[cfg(feature = "kafka-broker-tests")] mod kafka_tests; +mod multi_bucket_tests; mod no_cp_tests; mod otlp_tests; mod secondary_timestamp; diff --git a/quickwit/quickwit-integration-tests/src/tests/multi_bucket_tests.rs b/quickwit/quickwit-integration-tests/src/tests/multi_bucket_tests.rs new file mode 100644 index 00000000000..bddb8002339 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/multi_bucket_tests.rs @@ -0,0 +1,177 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use quickwit_config::ConfigFormat; +use quickwit_config::service::QuickwitService; +use quickwit_metastore::SplitState; +use quickwit_rest_client::rest_client::CommitType; +use quickwit_serve::ListSplitsQueryParams; +use serde_json::json; + +use crate::ingest_json; +use crate::test_utils::{ClusterSandboxBuilder, ingest}; + +/// Tests the main multi-bucket split sharding use case end-to-end: +/// +/// 1. Creates an index with `index_uri` + two `extra_index_uris` (three ram:// buckets). +/// 2. Ingests enough documents to produce multiple splits. +/// 3. Verifies that published splits have `storage_uri` set and that at least two distinct URIs +/// were used (round-robin distributes across buckets). +/// 4. Verifies that search returns correct results across all buckets. +#[tokio::test] +async fn test_multi_bucket_ingest_and_search() { + quickwit_common::setup_logging_for_tests(); + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + + let index_id = "test-multi-bucket"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + index_uri: ram:///multi-bucket-test/bucket-a/{index_id} + extra_index_uris: + - ram:///multi-bucket-test/bucket-b/{index_id} + - ram:///multi-bucket-test/bucket-c/{index_id} + doc_mapping: + field_mappings: + - name: body + type: text + - name: ts + type: datetime + fast: true + timestamp_field: ts + indexing_settings: + commit_timeout_secs: 1 + "# + ); + + // Create the index with three storage URIs. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config, ConfigFormat::Yaml, false) + .await + .unwrap(); + + // Ingest a first batch and let it commit into a split. + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "first record", "ts": 1735689600}), + CommitType::Auto, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 1) + .await + .unwrap(); + + // Ingest a second batch so we get a second split. + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "second record", "ts": 1735689601}), + CommitType::Auto, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 2) + .await + .unwrap(); + + // Ingest a third batch for a third split. + ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + ingest_json!({"body": "third record", "ts": 1735689602}), + CommitType::Auto, + ) + .await + .unwrap(); + + sandbox + .wait_for_splits(index_id, Some(vec![SplitState::Published]), 3) + .await + .unwrap(); + + // Verify that splits were distributed across multiple buckets. + let splits = sandbox + .rest_client(QuickwitService::Metastore) + .splits(index_id) + .list(ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(splits.len(), 3, "expected exactly 3 published splits"); + + let storage_uris: Vec = splits + .iter() + .map(|split| { + split + .split_metadata + .storage_uri + .as_ref() + .expect("every new split must have storage_uri set") + .to_string() + }) + .collect(); + + // With round-robin over 3 buckets and 3 splits, each bucket should get + // exactly one split. Verify at least 2 distinct URIs were used (in case + // pipeline restarts reset the counter). + let distinct_uris: std::collections::HashSet<&str> = + storage_uris.iter().map(|s| s.as_str()).collect(); + assert!( + distinct_uris.len() >= 2, + "expected splits across at least 2 distinct storage URIs, got: {storage_uris:?}" + ); + + // All URIs must be one of the three configured buckets. + let expected_uris = [ + format!("ram:///multi-bucket-test/bucket-a/{index_id}"), + format!("ram:///multi-bucket-test/bucket-b/{index_id}"), + format!("ram:///multi-bucket-test/bucket-c/{index_id}"), + ]; + for uri in &storage_uris { + assert!( + expected_uris.contains(uri), + "unexpected storage_uri {uri}, expected one of {expected_uris:?}" + ); + } + + // Search must return all 3 documents across all buckets. + sandbox.assert_hit_count(index_id, "body:record", 3).await; + + // Targeted searches should also work. + sandbox.assert_hit_count(index_id, "body:first", 1).await; + sandbox.assert_hit_count(index_id, "body:second", 1).await; + sandbox.assert_hit_count(index_id, "body:third", 1).await; + + // Cleanup. + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 9506a587bd4..ae0fa330621 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -37,7 +36,6 @@ use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{IndexUid, NodeId}; use quickwit_search::SearchJobPlacer; -use quickwit_storage::Storage; use serde::Serialize; use tokio::join; use tracing::info; @@ -77,7 +75,7 @@ pub struct DeleteTaskPipeline { index_uid: IndexUid, metastore: MetastoreServiceClient, search_job_placer: SearchJobPlacer, - index_storage: Arc, + split_store: IndexingSplitStore, delete_service_task_dir: PathBuf, handles: Option, max_concurrent_split_uploads: usize, @@ -129,7 +127,7 @@ impl DeleteTaskPipeline { index_uid: IndexUid, metastore: MetastoreServiceClient, search_job_placer: SearchJobPlacer, - index_storage: Arc, + split_store: IndexingSplitStore, delete_service_task_dir: PathBuf, max_concurrent_split_uploads: usize, merge_scheduler_service: Mailbox, @@ -139,7 +137,7 @@ impl DeleteTaskPipeline { index_uid, metastore, search_job_placer, - index_storage, + split_store, delete_service_task_dir, handles: Default::default(), max_concurrent_split_uploads, @@ -169,8 +167,7 @@ impl DeleteTaskPipeline { ); let (publisher_mailbox, publisher_supervisor_handler) = ctx.spawn_actor().supervise(publisher); - let split_store = - IndexingSplitStore::create_without_local_store_for_test(self.index_storage.clone()); + let split_store = self.split_store.clone(); let merge_policy = merge_policy_from_settings(&index_config.indexing_settings); let uploader = Uploader::new( UploaderType::DeleteUploader, @@ -283,8 +280,8 @@ mod tests { use quickwit_actors::{Handler, Universe}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; - use quickwit_indexing::TestSandbox; use quickwit_indexing::actors::MergeSchedulerService; + use quickwit_indexing::{IndexingSplitStore, TestSandbox}; use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitState}; use quickwit_proto::metastore::{DeleteQuery, ListSplitsRequest, MetastoreService}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; @@ -387,7 +384,7 @@ mod tests { test_sandbox.index_uid(), metastore.clone(), search_job_placer, - test_sandbox.storage(), + IndexingSplitStore::create_without_local_store_for_test(test_sandbox.storage()), delete_service_task_dir.path().into(), 4, merge_scheduler_service, @@ -468,7 +465,7 @@ mod tests { test_sandbox.index_uid(), metastore.clone(), search_job_placer, - test_sandbox.storage(), + IndexingSplitStore::create_without_local_store_for_test(test_sandbox.storage()), delete_service_task_dir.path().into(), 4, merge_scheduler_mailbox, diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index 5e08b7773e6..7aa1189ceeb 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -295,7 +295,9 @@ impl DeleteTaskPlanner { index_uri: &str, ctx: &ActorContext, ) -> anyhow::Result { - let search_job = SearchJob::from(&stale_split.split_metadata); + let index_uri_parsed = Uri::from_str(index_uri).context("invalid index URI")?; + let search_job = + SearchJob::from_split_metadata(&stale_split.split_metadata, &index_uri_parsed); let mut search_client = self .search_job_placer .assign_job(search_job.clone(), &HashSet::new()) diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs index 5fc2954e1d0..ff12bf6d789 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_service.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_service.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; @@ -22,6 +23,7 @@ use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::{self}; use quickwit_config::IndexConfig; use quickwit_indexing::actors::MergeSchedulerService; +use quickwit_indexing::{IndexingSplitCache, IndexingSplitStore, default_bucket_selector}; use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt}; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, @@ -161,8 +163,22 @@ impl DeleteTaskService { index_config: IndexConfig, ctx: &ActorContext, ) -> anyhow::Result<()> { - let index_uri = index_config.index_uri.clone(); - let index_storage = self.storage_resolver.resolve(&index_uri).await?; + let primary_uri = index_config.index_uri.clone(); + let all_uris = index_config.all_index_uris(); + let mut storages = HashMap::new(); + let mut selector_uris = Vec::with_capacity(all_uris.len()); + for uri in &all_uris { + let resolved = self.storage_resolver.resolve(uri).await?; + storages.insert((*uri).clone(), resolved); + selector_uris.push((*uri).clone()); + } + let bucket_selector = default_bucket_selector(selector_uris); + let split_store = IndexingSplitStore::new( + storages, + bucket_selector, + Arc::new(IndexingSplitCache::no_caching()), + primary_uri, + ); let index_metadata_request = IndexMetadataRequest::for_index_id(index_config.index_id.to_string()); let index_metadata = self @@ -174,7 +190,7 @@ impl DeleteTaskService { index_metadata.index_uid.clone(), self.metastore.clone(), self.search_job_placer.clone(), - index_storage, + split_store, self.delete_service_task_dir.clone(), self.max_concurrent_split_uploads, self.merge_scheduler_service.clone(), diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index 854346155e3..3dfc6fe1de7 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -147,6 +147,7 @@ impl GarbageCollector { .with_label_values(["error"]) .clone(), }), + &self.storage_resolver, ) .await; @@ -274,6 +275,11 @@ mod tests { async fn test_run_garbage_collect_calls_dependencies_appropriately() { let index_uid = IndexUid::for_test("test-index", 0); let mut mock_storage = MockStorage::default(); + mock_storage + .expect_uri() + .return_const(quickwit_common::uri::Uri::for_test( + "ram://indexes/test-index", + )); mock_storage .expect_bulk_delete() .times(1) @@ -368,6 +374,7 @@ mod tests { false, None, None, + &StorageResolver::unconfigured(), ) .await; assert!(result.is_ok()); diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs index bd1677e89fd..41241e8a6a7 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/file_backed_index/mod.rs @@ -25,6 +25,7 @@ use std::ops::Bound; use itertools::Itertools; use quickwit_common::pretty::PrettySample; +use quickwit_common::uri::Uri; use quickwit_config::{ DocMapping, IndexingSettings, IngestSettings, RetentionPolicy, SearchSettings, SourceConfig, }; @@ -222,6 +223,7 @@ impl FileBackedIndex { ingest_settings: IngestSettings, search_settings: SearchSettings, retention_policy_opt: Option, + extra_index_uris: Vec, ) -> MetastoreResult { self.metadata.update_index_config( doc_mapping, @@ -229,6 +231,7 @@ impl FileBackedIndex { ingest_settings, search_settings, retention_policy_opt, + extra_index_uris, ) } diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index af3df2a363d..71e126cfac3 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -567,6 +567,7 @@ impl MetastoreService for FileBackedMetastore { let ingest_settings = request.deserialize_ingest_settings()?; let search_settings = request.deserialize_search_settings()?; let retention_policy_opt = request.deserialize_retention_policy()?; + let extra_index_uris = request.deserialize_extra_index_uris()?; let index_metadata = self .mutate(index_uid, |index| { @@ -576,6 +577,7 @@ impl MetastoreService for FileBackedMetastore { ingest_settings, search_settings, retention_policy_opt, + extra_index_uris, )?; let index_metadata = index.metadata().clone(); diff --git a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs index e7a5677099e..86fef844e21 100644 --- a/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/index_metadata/mod.rs @@ -106,6 +106,7 @@ impl IndexMetadata { ingest_settings: IngestSettings, search_settings: SearchSettings, retention_policy_opt: Option, + extra_index_uris: Vec, ) -> MetastoreResult { let (updated_doc_mapping, mut mutation_occurred) = prepare_doc_mapping_update( doc_mapping, @@ -132,6 +133,10 @@ impl IndexMetadata { self.index_config.retention_policy_opt = retention_policy_opt; mutation_occurred = true; } + if extra_index_uris != self.index_config.extra_index_uris { + self.index_config.extra_index_uris = extra_index_uris; + mutation_occurred = true; + } Ok(mutation_occurred) } @@ -257,6 +262,7 @@ mod tests { current_index_config.ingest_settings.clone(), current_index_config.search_settings.clone(), current_index_config.retention_policy_opt.clone(), + Vec::new(), ) .unwrap(); assert!(!mutation_occurred); @@ -271,6 +277,7 @@ mod tests { current_index_config.ingest_settings.clone(), new_search_settings, current_index_config.retention_policy_opt.clone(), + Vec::new(), ) .unwrap(); assert!(mutation_occurred); @@ -299,6 +306,7 @@ mod tests { current_index_config.ingest_settings.clone(), current_index_config.search_settings.clone(), current_index_config.retention_policy_opt.clone(), + Vec::new(), ) .unwrap_err(); @@ -314,6 +322,7 @@ mod tests { current_index_config.ingest_settings, current_index_config.search_settings, current_index_config.retention_policy_opt, + Vec::new(), ) .unwrap(); assert!(mutation_occurred); @@ -329,4 +338,75 @@ mod tests { Mode::Strict ); } + + #[test] + fn test_update_extra_index_uris() { + let current_index_config = IndexConfig::for_test("test-index", "s3://test-index"); + let mut current_index_metadata = IndexMetadata::new(current_index_config.clone()); + + // No mutation when extra_index_uris stays empty. + let mutation_occurred = current_index_metadata + .update_index_config( + current_index_config.doc_mapping.clone(), + current_index_config.indexing_settings.clone(), + current_index_config.ingest_settings.clone(), + current_index_config.search_settings.clone(), + current_index_config.retention_policy_opt.clone(), + Vec::new(), + ) + .unwrap(); + assert!(!mutation_occurred); + + // Adding extra URIs triggers a mutation. + let extra_uris = vec![Uri::for_test("s3://bucket-b/test-index")]; + let mutation_occurred = current_index_metadata + .update_index_config( + current_index_config.doc_mapping.clone(), + current_index_config.indexing_settings.clone(), + current_index_config.ingest_settings.clone(), + current_index_config.search_settings.clone(), + current_index_config.retention_policy_opt.clone(), + extra_uris.clone(), + ) + .unwrap(); + assert!(mutation_occurred); + assert_eq!( + current_index_metadata.index_config().extra_index_uris, + extra_uris + ); + + // Same value again does not trigger a mutation. + let mutation_occurred = current_index_metadata + .update_index_config( + current_index_config.doc_mapping.clone(), + current_index_config.indexing_settings.clone(), + current_index_config.ingest_settings.clone(), + current_index_config.search_settings.clone(), + current_index_config.retention_policy_opt.clone(), + extra_uris.clone(), + ) + .unwrap(); + assert!(!mutation_occurred); + + // Adding a second URI triggers a mutation. + let extra_uris_expanded = vec![ + Uri::for_test("s3://bucket-b/test-index"), + Uri::for_test("s3://bucket-c/test-index"), + ]; + let mutation_occurred = current_index_metadata + .update_index_config( + current_index_config.doc_mapping.clone(), + current_index_config.indexing_settings.clone(), + current_index_config.ingest_settings.clone(), + current_index_config.search_settings.clone(), + current_index_config.retention_policy_opt.clone(), + extra_uris_expanded.clone(), + ) + .unwrap(); + assert!(mutation_occurred); + assert_eq!( + current_index_metadata.index_config().extra_index_uris, + extra_uris_expanded + ); + } } diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 187ad1676d9..dc17f03f940 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -21,6 +21,7 @@ pub mod control_plane_metastore; use std::cmp::Ordering; use std::ops::{Bound, RangeInclusive}; +use std::str::FromStr; use async_trait::async_trait; use bytes::Bytes; @@ -28,6 +29,7 @@ use futures::TryStreamExt; pub use index_metadata::IndexMetadata; use itertools::Itertools; use quickwit_common::thread_pool::run_cpu_intensive; +use quickwit_common::uri::Uri; use quickwit_config::{ DocMapping, FileSourceParams, IndexConfig, IndexingSettings, IngestSettings, RetentionPolicy, SearchSettings, SourceConfig, SourceParams, @@ -194,6 +196,7 @@ pub trait UpdateIndexRequestExt { ingest_settings: &IngestSettings, search_settings: &SearchSettings, retention_policy_opt: &Option, + extra_index_uris: &[Uri], ) -> MetastoreResult; /// Deserializes the `doc_mapping_json` field of an `[UpdateIndexRequest]` into a @@ -215,6 +218,10 @@ pub trait UpdateIndexRequestExt { /// Deserializes the `retention_policy_json` field of an [`UpdateIndexRequest`] into a /// [`RetentionPolicy`] object. fn deserialize_retention_policy(&self) -> MetastoreResult>; + + /// Deserializes the `extra_index_uris` field of an [`UpdateIndexRequest`] into a + /// `Vec`. + fn deserialize_extra_index_uris(&self) -> MetastoreResult>; } impl UpdateIndexRequestExt for UpdateIndexRequest { @@ -225,6 +232,7 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { ingest_settings: &IngestSettings, search_settings: &SearchSettings, retention_policy_opt: &Option, + extra_index_uris: &[Uri], ) -> MetastoreResult { let doc_mapping_json = serde_utils::to_json_str(doc_mapping)?; let indexing_settings_json = serde_utils::to_json_str(indexing_settings)?; @@ -235,6 +243,8 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { .map(serde_utils::to_json_str) .transpose()?; + let extra_index_uris = extra_index_uris.iter().map(|uri| uri.to_string()).collect(); + let update_request = UpdateIndexRequest { index_uid: Some(index_uid.into()), doc_mapping_json, @@ -242,6 +252,7 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { ingest_settings_json, search_settings_json, retention_policy_json_opt, + extra_index_uris, }; Ok(update_request) } @@ -267,6 +278,18 @@ impl UpdateIndexRequestExt for UpdateIndexRequest { .map(|policy_json| serde_utils::from_json_str(policy_json)) .transpose() } + + fn deserialize_extra_index_uris(&self) -> MetastoreResult> { + self.extra_index_uris + .iter() + .map(|uri_str| { + Uri::from_str(uri_str).map_err(|error| MetastoreError::Internal { + message: format!("failed to parse extra index URI `{uri_str}`: {error}"), + cause: error.to_string(), + }) + }) + .collect() + } } /// Helper trait to build a [`IndexMetadataResponse`] and deserialize its payload. diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 08c6e378254..35dbf239ef6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -428,6 +428,7 @@ impl MetastoreService for PostgresqlMetastore { let ingest_settings = request.deserialize_ingest_settings()?; let search_settings = request.deserialize_search_settings()?; let retention_policy_opt = request.deserialize_retention_policy()?; + let extra_index_uris = request.deserialize_extra_index_uris()?; let index_uid: IndexUid = request.index_uid().clone(); let updated_index_metadata = run_with_tx!(self.connection_pool, tx, "update index", { @@ -438,6 +439,7 @@ impl MetastoreService for PostgresqlMetastore { ingest_settings, search_settings, retention_policy_opt, + extra_index_uris, )?; Ok(MutationOccurred::from(mutation_occurred)) }) diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index 3de6f9122f4..19c715099d2 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -20,6 +20,7 @@ use std::str::FromStr; use std::time::Duration; use bytesize::ByteSize; +use quickwit_common::uri::Uri; use quickwit_proto::types::{DocMappingUid, IndexUid, SourceId, SplitId}; use serde::{Deserialize, Serialize}; use serde_with::{DurationMilliSeconds, serde_as}; @@ -138,6 +139,10 @@ pub struct SplitMetadata { /// Set of tantivy doc_ids that have been soft-deleted from this split. pub soft_deleted_doc_ids: BTreeSet, + + /// The storage URI where this split is stored. When `None`, the split is stored + /// under the index-level `index_uri`. + pub storage_uri: Option, } impl fmt::Debug for SplitMetadata { @@ -183,6 +188,9 @@ impl fmt::Debug for SplitMetadata { debug_struct.field("footer_offsets", &self.footer_offsets); debug_struct.field("delete_opstamp", &self.delete_opstamp); debug_struct.field("num_merge_ops", &self.num_merge_ops); + if let Some(ref storage_uri) = self.storage_uri { + debug_struct.field("storage_uri", storage_uri); + } if !self.soft_deleted_doc_ids.is_empty() { debug_struct.field("soft_deleted_doc_ids", &self.soft_deleted_doc_ids); } @@ -249,6 +257,12 @@ impl SplitMetadata { num_docs: self.num_docs, } } + + /// Returns the storage URI for this split, falling back to the given + /// index-level URI when no per-split URI is set. + pub fn effective_storage_uri<'a>(&'a self, default_index_uri: &'a Uri) -> &'a Uri { + self.storage_uri.as_ref().unwrap_or(default_index_uri) + } } /// A summarized version of the split metadata for display purposes. @@ -293,6 +307,7 @@ impl quickwit_config::TestableForRegression for SplitMetadata { num_merge_ops: 3, doc_mapping_uid: DocMappingUid::default(), soft_deleted_doc_ids: BTreeSet::new(), + storage_uri: None, } } @@ -435,6 +450,7 @@ mod tests { num_merge_ops: 0, doc_mapping_uid: DocMappingUid::default(), soft_deleted_doc_ids: BTreeSet::new(), + storage_uri: None, }; let expected_output = diff --git a/quickwit/quickwit-metastore/src/split_metadata_version.rs b/quickwit/quickwit-metastore/src/split_metadata_version.rs index 43b38542133..1da151b61fe 100644 --- a/quickwit/quickwit-metastore/src/split_metadata_version.rs +++ b/quickwit/quickwit-metastore/src/split_metadata_version.rs @@ -15,6 +15,7 @@ use std::collections::BTreeSet; use std::ops::{Range, RangeInclusive}; +use quickwit_common::uri::Uri; use quickwit_proto::types::{DocMappingUid, IndexUid, SplitId}; use serde::{Deserialize, Serialize}; @@ -101,6 +102,12 @@ pub(crate) struct SplitMetadataV0_8 { /// Set of tantivy doc_ids that have been soft-deleted from this split. #[serde(default)] pub soft_deleted_doc_ids: BTreeSet, + + /// The storage URI where this split is stored. When `None`, the split is stored + /// under the index-level `index_uri`. + #[schema(value_type = Option)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub storage_uri: Option, } impl From for SplitMetadata { @@ -139,6 +146,7 @@ impl From for SplitMetadata { num_merge_ops: v8.num_merge_ops, doc_mapping_uid: v8.doc_mapping_uid, soft_deleted_doc_ids: v8.soft_deleted_doc_ids, + storage_uri: v8.storage_uri, } } } @@ -163,6 +171,7 @@ impl From for SplitMetadataV0_8 { num_merge_ops: split.num_merge_ops, doc_mapping_uid: split.doc_mapping_uid, soft_deleted_doc_ids: split.soft_deleted_doc_ids, + storage_uri: split.storage_uri, } } } diff --git a/quickwit/quickwit-metastore/src/tests/index.rs b/quickwit/quickwit-metastore/src/tests/index.rs index 03a37c61aa0..cf486708249 100644 --- a/quickwit/quickwit-metastore/src/tests/index.rs +++ b/quickwit/quickwit-metastore/src/tests/index.rs @@ -127,6 +127,7 @@ pub async fn test_metastore_update_retention_policy< &index_config.ingest_settings, &index_config.search_settings, &loop_retention_policy_opt, + &index_config.extra_index_uris, ) .unwrap(); let response_metadata = metastore @@ -170,6 +171,7 @@ pub async fn test_metastore_update_ingest_settings< &ingest_settings, &index_config.search_settings, &index_config.retention_policy_opt, + &index_config.extra_index_uris, ) .unwrap(); @@ -225,6 +227,7 @@ pub async fn test_metastore_update_search_settings< &index_config.ingest_settings, &search_settings, &index_config.retention_policy_opt, + &index_config.extra_index_uris, ) .unwrap(); let response_metadata = metastore @@ -284,6 +287,7 @@ pub async fn test_metastore_update_indexing_settings< &index_config.ingest_settings, &index_config.search_settings, &index_config.retention_policy_opt, + &index_config.extra_index_uris, ) .unwrap(); let resp_metadata = metastore @@ -354,6 +358,7 @@ pub async fn test_metastore_update_doc_mapping< &index_config.ingest_settings, &index_config.search_settings, &index_config.retention_policy_opt, + &index_config.extra_index_uris, ) .unwrap(); let resp_metadata = metastore diff --git a/quickwit/quickwit-proto/protos/quickwit/metastore.proto b/quickwit/quickwit-proto/protos/quickwit/metastore.proto index 712538f193a..5f8af231b78 100644 --- a/quickwit/quickwit-proto/protos/quickwit/metastore.proto +++ b/quickwit/quickwit-proto/protos/quickwit/metastore.proto @@ -229,6 +229,7 @@ message UpdateIndexRequest { string ingest_settings_json = 6; string search_settings_json = 2; optional string retention_policy_json_opt = 3; + repeated string extra_index_uris = 7; } message ListIndexesMetadataRequest { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs index d503a940f44..34b84a3f624 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.metastore.rs @@ -35,6 +35,8 @@ pub struct UpdateIndexRequest { pub retention_policy_json_opt: ::core::option::Option< ::prost::alloc::string::String, >, + #[prost(string, repeated, tag = "7")] + pub extra_index_uris: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index b9eec1696b5..7f3cebd9b42 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -34,7 +34,7 @@ use quickwit_proto::types::{IndexId, IndexUid}; use quickwit_storage::Storage; use crate::leaf::open_split_bundle; -use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_job_placer::group_jobs_by_index_and_storage; use crate::service::SearcherContext; use crate::{ ClusterClient, SearchError, SearchJob, list_relevant_splits, resolve_index_patterns, @@ -432,7 +432,21 @@ pub async fn root_list_fields( .await?; // Build requests for each index id - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let jobs: Vec = split_metadatas + .iter() + .map(|split_metadata| { + let index_uri = &index_uid_to_index_meta + .get(&split_metadata.index_uid) + .ok_or_else(|| { + SearchError::Internal(format!( + "index {} not found in metadata map", + split_metadata.index_uid + )) + })? + .index_uri; + Ok(SearchJob::from_split_metadata(split_metadata, index_uri)) + }) + .collect::>>()?; let assigned_leaf_search_jobs = cluster_client .search_job_placer .assign_jobs(jobs, &HashSet::default()) @@ -461,7 +475,8 @@ pub async fn root_list_fields( Ok(ListFieldsResponse { fields }) } -/// Builds a list of [`LeafListFieldsRequest`], one per index, from a list of [`SearchJob`]. +/// Builds a list of [`LeafListFieldsRequest`], one per `(index, storage_uri)` group, +/// from a list of [`SearchJob`]. pub fn jobs_to_leaf_requests( request: &ListFieldsRequest, index_uid_to_id: &HashMap, @@ -469,9 +484,10 @@ pub fn jobs_to_leaf_requests( ) -> crate::Result> { let search_request_for_leaf = request.clone(); let mut leaf_search_requests = Vec::new(); - // Group jobs by index uid. - group_jobs_by_index_id(jobs, |job_group| { + // Group by (index_uid, storage_uri) so splits in different buckets get separate requests. + group_jobs_by_index_and_storage(jobs, |job_group| { let index_uid = &job_group[0].index_uid; + let storage_uri = &job_group[0].storage_uri; let index_meta = index_uid_to_id.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( "received list fields job for an unknown index {index_uid}. it should never happen" @@ -480,7 +496,7 @@ pub fn jobs_to_leaf_requests( let leaf_search_request = LeafListFieldsRequest { index_id: index_meta.index_id.to_string(), - index_uri: index_meta.index_uri.to_string(), + index_uri: storage_uri.to_string(), fields: search_request_for_leaf.fields.clone(), split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), }; diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index 026408737a2..ac1013b967e 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -20,6 +20,7 @@ use anyhow::Context; use futures::future::try_join_all; use itertools::{Either, Itertools}; use quickwit_common::pretty::PrettySample; +use quickwit_common::uri::Uri; use quickwit_config::build_doc_mapper; use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata}; use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; @@ -35,7 +36,7 @@ use tracing::{debug, error, info, instrument}; use crate::leaf::open_index_with_caches; use crate::metrics::queue_label; -use crate::search_job_placer::group_jobs_by_index_id; +use crate::search_job_placer::group_jobs_by_index_and_storage; use crate::search_permit_provider::compute_initial_memory_allocation; use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_index_patterns}; @@ -101,12 +102,12 @@ pub async fn root_list_terms( if let Some(end_ts) = list_terms_request.end_timestamp { query = query.with_time_range_end_lt(end_ts); } - let index_uid_to_index_uri: HashMap = indexes_metadata + let index_uid_to_index_uri: HashMap = indexes_metadata .iter() .map(|index_metadata| { ( index_metadata.index_uid.clone(), - index_metadata.index_uri().to_string(), + index_metadata.index_uri().clone(), ) }) .collect(); @@ -118,7 +119,20 @@ pub async fn root_list_terms( .collect_splits_metadata() .await?; - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let jobs: Vec = split_metadatas + .iter() + .map(|split_metadata| { + let index_uri = index_uid_to_index_uri + .get(&split_metadata.index_uid) + .ok_or_else(|| { + SearchError::Internal(format!( + "index {} not found in metadata map", + split_metadata.index_uid + )) + })?; + Ok(SearchJob::from_split_metadata(split_metadata, index_uri)) + }) + .collect::>>()?; let assigned_leaf_search_jobs = cluster_client .search_job_placer .assign_jobs(jobs, &HashSet::default()) @@ -126,8 +140,7 @@ pub async fn root_list_terms( let mut leaf_request_tasks = Vec::new(); // For each node, forward to a node with an affinity for that index id. for (client, client_jobs) in assigned_leaf_search_jobs { - let leaf_requests = - jobs_to_leaf_requests(list_terms_request, &index_uid_to_index_uri, client_jobs)?; + let leaf_requests = jobs_to_leaf_requests(list_terms_request, client_jobs)?; for leaf_request in leaf_requests { leaf_request_tasks.push(cluster_client.leaf_list_terms(leaf_request, client.clone())); } @@ -178,25 +191,21 @@ pub async fn root_list_terms( }) } -/// Builds a list of [`LeafListTermsRequest`], one per index, from a list of [`SearchJob`]. +/// Builds a list of [`LeafListTermsRequest`], one per `(index, storage_uri)` group, +/// from a list of [`SearchJob`]. pub fn jobs_to_leaf_requests( request: &ListTermsRequest, - index_uid_to_uri: &HashMap, jobs: Vec, ) -> crate::Result> { let search_request_for_leaf = request.clone(); let mut leaf_search_requests = Vec::new(); - group_jobs_by_index_id(jobs, |job_group| { - let index_uid = &job_group[0].index_uid; - let index_uri = index_uid_to_uri.get(index_uid).ok_or_else(|| { - SearchError::Internal(format!( - "received list fields job for an unknown index {index_uid}. it should never happen" - )) - })?; + // Group by (index_uid, storage_uri) so splits in different buckets get separate requests. + group_jobs_by_index_and_storage(jobs, |job_group| { + let storage_uri = &job_group[0].storage_uri; let leaf_search_request = LeafListTermsRequest { list_terms_request: Some(search_request_for_leaf.clone()), - index_uri: index_uri.to_string(), + index_uri: storage_uri.to_string(), split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), }; leaf_search_requests.push(leaf_search_request); diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index ef0da33b0d3..dd7f862f20a 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -51,7 +51,7 @@ use crate::cluster_client::ClusterClient; use crate::collector::{QuickwitAggregations, make_merge_collector}; use crate::metrics_trackers::{RootSearchMetricsFuture, SearchPlanMetricsFuture}; use crate::scroll_context::{ScrollContext, ScrollKeyAndStartOffset}; -use crate::search_job_placer::{Job, group_by, group_jobs_by_index_id}; +use crate::search_job_placer::{Job, group_by, group_jobs_by_index_and_storage}; use crate::search_response_rest::StorageRequestCount; use crate::service::SearcherContext; use crate::{ @@ -81,6 +81,8 @@ const SORT_DOC_FIELD_NAMES: &[&str] = &["_shard_doc", "_doc"]; pub struct SearchJob { /// The index UID. pub index_uid: IndexUid, + /// The effective storage URI for this split. + pub storage_uri: Uri, cost: usize, /// The split ID and footer offsets of the split. pub offsets: SplitIdAndFooterOffsets, @@ -93,6 +95,7 @@ impl SearchJob { use std::str::FromStr; SearchJob { index_uid: IndexUid::from_str("test-index:00000000000000000000000000").unwrap(), + storage_uri: Uri::for_test("ram:///test"), cost, offsets: SplitIdAndFooterOffsets { split_id: split_id.to_string(), @@ -100,24 +103,24 @@ impl SearchJob { }, } } -} -impl From for SplitIdAndFooterOffsets { - fn from(search_job: SearchJob) -> Self { - search_job.offsets - } -} - -impl<'a> From<&'a SplitMetadata> for SearchJob { - fn from(split_metadata: &'a SplitMetadata) -> Self { + /// Creates a [`SearchJob`] from split metadata, using the effective storage URI. + pub fn from_split_metadata(split_metadata: &SplitMetadata, index_uri: &Uri) -> Self { SearchJob { index_uid: split_metadata.index_uid.clone(), + storage_uri: split_metadata.effective_storage_uri(index_uri).clone(), cost: compute_split_cost(split_metadata), offsets: extract_split_and_footer_offsets(split_metadata), } } } +impl From for SplitIdAndFooterOffsets { + fn from(search_job: SearchJob) -> Self { + search_job.offsets + } +} + impl Job for SearchJob { fn split_id(&self) -> &str { &self.offsets.split_id @@ -130,6 +133,7 @@ impl Job for SearchJob { pub struct FetchDocsJob { index_uid: IndexUid, + storage_uri: Uri, offsets: SplitIdAndFooterOffsets, pub partial_hits: Vec, } @@ -766,7 +770,21 @@ pub(crate) async fn search_partial_hits_phase( if is_metadata_count_request(search_request) { get_count_from_metadata(split_metadatas) } else { - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); + let jobs: Vec = split_metadatas + .iter() + .map(|split_metadata| { + let index_uri = &indexes_metas_for_leaf_search + .get(&split_metadata.index_uid) + .ok_or_else(|| { + SearchError::Internal(format!( + "index {} not found in metadata map", + split_metadata.index_uid + )) + })? + .index_uri; + Ok(SearchJob::from_split_metadata(split_metadata, index_uri)) + }) + .collect::>>()?; let assigned_leaf_search_jobs = cluster_client .search_job_placer .assign_jobs(jobs, &HashSet::default()) @@ -870,6 +888,7 @@ pub(crate) async fn fetch_docs_phase( let assigned_fetch_docs_jobs = assign_client_fetch_docs_jobs( partial_hits, split_metadatas, + indexes_metas_for_leaf_search, &cluster_client.search_job_placer, ) .await?; @@ -1673,21 +1692,31 @@ impl<'b> QueryAstVisitor<'b> for ExtractTimestampRange<'_> { async fn assign_client_fetch_docs_jobs( partial_hits: &[PartialHit], split_metadatas: &[SplitMetadata], + indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch, client_pool: &SearchJobPlacer, ) -> crate::Result)>> { - let index_uids_and_split_offsets_map: HashMap = - split_metadatas - .iter() - .map(|metadata| { + let split_info_map: HashMap = split_metadatas + .iter() + .map(|metadata| { + let index_uri = &indexes_metas_for_leaf_search + .get(&metadata.index_uid) + .ok_or_else(|| { + SearchError::Internal(format!( + "index {} not found in metadata map", + metadata.index_uid + )) + })? + .index_uri; + Ok(( + metadata.split_id().to_string(), ( - metadata.split_id().to_string(), - ( - metadata.index_uid.clone(), - extract_split_and_footer_offsets(metadata), - ), - ) - }) - .collect(); + metadata.index_uid.clone(), + metadata.effective_storage_uri(index_uri).clone(), + extract_split_and_footer_offsets(metadata), + ), + )) + }) + .collect::>>()?; // Group the partial hits per split let mut partial_hits_map: HashMap> = HashMap::new(); @@ -1700,7 +1729,7 @@ async fn assign_client_fetch_docs_jobs( let mut fetch_docs_req_jobs: Vec = Vec::new(); for (split_id, partial_hits) in partial_hits_map { - let (index_uid, offsets) = index_uids_and_split_offsets_map + let (index_uid, storage_uri, offsets) = split_info_map .get(&split_id) .ok_or_else(|| { crate::SearchError::Internal(format!( @@ -1710,6 +1739,7 @@ async fn assign_client_fetch_docs_jobs( .clone(); let fetch_docs_job = FetchDocsJob { index_uid: index_uid.clone(), + storage_uri, offsets, partial_hits, }; @@ -1749,15 +1779,23 @@ pub fn jobs_to_leaf_request( }; let mut added_doc_mappers: HashMap<&str, u32> = HashMap::new(); - // Group jobs by index uid, as the split offsets are relative to the index. - group_jobs_by_index_id(jobs, |job_group| { + let mut added_index_ids: HashSet = HashSet::new(); + + // Group jobs by (index_uid, storage_uri) to support splits from the same + // index living in different storage buckets. + group_jobs_by_index_and_storage(jobs, |job_group| { let index_uid = &job_group[0].index_uid; - leaf_search_request - .search_request - .as_mut() - .unwrap() - .index_id_patterns - .push(index_uid.index_id.to_string()); + let storage_uri = job_group[0].storage_uri.clone(); + + if added_index_ids.insert(index_uid.index_id.to_string()) { + leaf_search_request + .search_request + .as_mut() + .unwrap() + .index_id_patterns + .push(index_uid.index_id.to_string()); + } + let search_index_meta = search_indexes_metadatas.get(index_uid).ok_or_else(|| { SearchError::Internal(format!( "received job for an unknown index {index_uid}. it should never happen" @@ -1773,24 +1811,20 @@ pub fn jobs_to_leaf_request( ord as u32 }); let index_uri_ord = leaf_search_request.index_uris.len() as u32; - leaf_search_request - .index_uris - .push(search_index_meta.index_uri.to_string()); + leaf_search_request.index_uris.push(storage_uri.to_string()); - let leaf_search_request_ref = LeafRequestRef { + leaf_search_request.leaf_requests.push(LeafRequestRef { split_offsets: job_group.into_iter().map(|job| job.offsets).collect(), doc_mapper_ord, index_uri_ord, - }; - leaf_search_request - .leaf_requests - .push(leaf_search_request_ref); + }); Ok(()) })?; Ok(leaf_search_request) } -/// Builds a list of [`FetchDocsRequest`], one per index, from a list of [`FetchDocsJob`]. +/// Builds a list of [`FetchDocsRequest`], one per (index, storage_uri) group, +/// from a list of [`FetchDocsJob`]. pub fn jobs_to_fetch_docs_requests( snippet_request_opt: Option, indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch, @@ -1800,9 +1834,14 @@ pub fn jobs_to_fetch_docs_requests( // Group jobs by index uid. group_by( jobs, - |job| &job.index_uid, + |a, b| { + a.index_uid + .cmp(&b.index_uid) + .then_with(|| a.storage_uri.cmp(&b.storage_uri)) + }, |fetch_docs_jobs| { let index_uid = &fetch_docs_jobs[0].index_uid; + let storage_uri_string = fetch_docs_jobs[0].storage_uri.to_string(); let index_meta = indexes_metas_for_leaf_search .get(index_uid) @@ -1822,7 +1861,7 @@ pub fn jobs_to_fetch_docs_requests( let fetch_docs_req = FetchDocsRequest { partial_hits, split_offsets, - index_uri: index_meta.index_uri.to_string(), + index_uri: storage_uri_string, snippet_request: snippet_request_opt.clone(), doc_mapper: index_meta.doc_mapper_str.clone(), }; @@ -1949,6 +1988,7 @@ mod tests { ingest_settings, search_settings, retention_policy_opt: None, + extra_index_uris: Vec::new(), }) } @@ -2123,6 +2163,7 @@ mod tests { indexing_settings, search_settings, retention_policy_opt: None, + extra_index_uris: Vec::new(), }) } diff --git a/quickwit/quickwit-search/src/search_job_placer.rs b/quickwit/quickwit-search/src/search_job_placer.rs index 8be9cced66b..c97c2e033bb 100644 --- a/quickwit/quickwit-search/src/search_job_placer.rs +++ b/quickwit/quickwit-search/src/search_job_placer.rs @@ -269,34 +269,40 @@ impl PartialEq for CandidateNode { impl Eq for CandidateNode {} -/// Groups jobs by index id and returns a list of `SearchJob` per index -pub fn group_jobs_by_index_id( +/// Groups jobs by `(index_uid, storage_uri)`. +pub fn group_jobs_by_index_and_storage( jobs: Vec, cb: impl FnMut(Vec) -> crate::Result<()>, ) -> crate::Result<()> { - // Group jobs by index uid. - group_by(jobs, |job| &job.index_uid, cb)?; - Ok(()) + group_by( + jobs, + |a, b| { + a.index_uid + .cmp(&b.index_uid) + .then_with(|| a.storage_uri.cmp(&b.storage_uri)) + }, + cb, + ) } /// Note: The data will be sorted. /// -/// Returns slices of the input data grouped by passed closure. -pub fn group_by( +/// Returns slices of the input data grouped by the comparator closure. +pub fn group_by( mut data: Vec, - compare_by: impl Fn(&T) -> &K, + cmp: impl Fn(&T, &T) -> Ordering, mut callback: F, ) -> crate::Result<()> where F: FnMut(Vec) -> crate::Result<()>, { - data.sort_by(|job1, job2| compare_by(job2).cmp(compare_by(job1))); + data.sort_by(|a, b| cmp(b, a)); while !data.is_empty() { let last_element = data.last().unwrap(); let count = data .iter() .rev() - .take_while(|&x| compare_by(x) == compare_by(last_element)) + .take_while(|&x| cmp(x, last_element) == Ordering::Equal) .count(); let group = data.split_off(data.len() - count); @@ -317,7 +323,7 @@ mod tests { let mut outputs: Vec> = Vec::new(); group_by( data, - |el| el, + |a, b| a.cmp(b), |group| { outputs.push(group); Ok(()) @@ -337,7 +343,7 @@ mod tests { let mut outputs: Vec> = Vec::new(); group_by( data, - |el| el, + |a, b| a.cmp(b), |group| { outputs.push(group); Ok(()) @@ -353,7 +359,7 @@ mod tests { let mut outputs: Vec> = Vec::new(); group_by( data, - |el| el, + |a: &i32, b| a.cmp(b), |group| { outputs.push(group); Ok(()) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 71fc69b0ad9..e0d266207c3 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -1331,7 +1331,14 @@ async fn check_cluster_configuration( .deserialize_indexes_metadata() .await? .into_iter() - .filter(|index_metadata| index_metadata.index_uri().protocol().is_file_storage()) + .filter(|index_metadata| { + index_metadata.index_uri().protocol().is_file_storage() + || index_metadata + .index_config + .extra_index_uris + .iter() + .any(|uri| uri.protocol().is_file_storage()) + }) .collect::>(); if !file_backed_indexes.is_empty() { let index_ids = file_backed_indexes