diff --git a/docs/en/antalya/swarm.md b/docs/en/antalya/swarm.md
new file mode 100644
index 000000000000..a26f9de26e0a
--- /dev/null
+++ b/docs/en/antalya/swarm.md
@@ -0,0 +1,73 @@
+# Antalya branch
+
+## Swarm
+
+### Difference with upstream version
+
+#### `storage_type` argument in object storage functions
+
+In upstream ClickHouse, there are several table functions to read Iceberg tables from different storage backends such as `icebergLocal`, `icebergS3`, `icebergAzure`, `icebergHDFS`, cluster variants, the `iceberg` function as a synonym for `icebergS3`, and table engines like `IcebergLocal`, `IcebergS3`, `IcebergAzure`, `IcebergHDFS`.
+
+In the Antalya branch, the `iceberg` table function and the `Iceberg` table engine unify all variants into one by using a new named argument, `storage_type`, which can be one of `local`, `s3`, `azure`, or `hdfs`.
+
+Old syntax examples:
+
+```sql
+SELECT * FROM icebergS3('http://minio1:9000/root/table_data', 'minio', 'minio123', 'Parquet');
+SELECT * FROM icebergAzureCluster('mycluster', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+CREATE TABLE mytable ENGINE=IcebergHDFS('/table_data', 'Parquet');
+```
+
+New syntax examples:
+
+```sql
+SELECT * FROM iceberg(storage_type='s3', 'http://minio1:9000/root/table_data', 'minio', 'minio123', 'Parquet');
+SELECT * FROM icebergCluster('mycluster', storage_type='azure', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+CREATE TABLE mytable ENGINE=Iceberg('/table_data', 'Parquet', storage_type='hdfs');
+```
+
+Also, if a named collection is used to store access parameters, the field `storage_type` can be included in the same named collection:
+
+```xml
+
+
+ http://minio1:9001/root/
+ minio
+ minio123
+ s3
+
+
+```
+
+```sql
+SELECT * FROM iceberg(s3, filename='table_data');
+```
+
+By default `storage_type` is `'s3'` to maintain backward compatibility.
+
+
+#### `object_storage_cluster` setting
+
+The new setting `object_storage_cluster` controls whether a single-node or cluster variant of table functions reading from object storage (e.g., `s3`, `azure`, `iceberg`, and their cluster variants like `s3Cluster`, `azureCluster`, `icebergCluster`) is used.
+
+Old syntax examples:
+
+```sql
+SELECT * from s3Cluster('myCluster', 'http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))');
+SELECT * FROM icebergAzureCluster('mycluster', 'http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet');
+```
+
+New syntax examples:
+
+```sql
+SELECT * from s3('http://minio1:9001/root/data/{clickhouse,database}/*', 'minio', 'minio123', 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
+ SETTINGS object_storage_cluster='myCluster';
+SELECT * FROM icebergAzure('http://azurite1:30000/devstoreaccount1', 'cont', '/table_data', 'devstoreaccount1', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'Parquet')
+ SETTINGS object_storage_cluster='myCluster';
+```
+
+This setting also applies to table engines and can be used with tables managed by Iceberg Catalog.
+
+Note: The upstream ClickHouse has introduced analogous settings, such as `parallel_replicas_for_cluster_engines` and `cluster_for_parallel_replicas`. Since version 25.10, these settings work with table engines. It is possible that in the future, the `object_storage_cluster` setting will be deprecated.
diff --git a/docs/en/engines/table-engines/integrations/iceberg.md b/docs/en/engines/table-engines/integrations/iceberg.md
index 17e75dc0eb30..83c912f1a488 100644
--- a/docs/en/engines/table-engines/integrations/iceberg.md
+++ b/docs/en/engines/table-engines/integrations/iceberg.md
@@ -293,6 +293,62 @@ CREATE TABLE example_table ENGINE = Iceberg(
`Iceberg` table engine and table function support metadata cache storing the information of manifest files, manifest list and metadata json. The cache is stored in memory. This feature is controlled by setting `use_iceberg_metadata_files_cache`, which is enabled by default.
+## Altinity Antalya branch
+
+### Specify storage type in arguments
+
+Only in the Altinity Antalya branch does `Iceberg` table engine support all storage types. The storage type can be specified using the named argument `storage_type`. Supported values are `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+CREATE TABLE iceberg_table_s3
+ ENGINE = Iceberg(storage_type='s3', url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression])
+
+CREATE TABLE iceberg_table_azure
+ ENGINE = Iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression])
+
+CREATE TABLE iceberg_table_hdfs
+ ENGINE = Iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method])
+
+CREATE TABLE iceberg_table_local
+ ENGINE = Iceberg(storage_type='local', path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in named collection
+
+Only in Altinity Antalya branch `storage_type` can be included as part of a named collection. This allows for centralized configuration of storage settings.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+CREATE TABLE iceberg_table ENGINE=Iceberg(iceberg_conf, filename = 'test_table')
+```
+
+The default value for `storage_type` is `s3`.
+
+### The `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch is an alternative syntax for the `Iceberg` table engine available. This syntax allows execution on a cluster when the `object_storage_cluster` setting is non-empty and contains the cluster name.
+
+```sql
+CREATE TABLE iceberg_table_s3
+ ENGINE = Iceberg(storage_type='s3', url, [, NOSIGN | access_key_id, secret_access_key, [session_token]], format, [,compression]);
+
+SELECT * FROM iceberg_table_s3 SETTINGS object_storage_cluster='cluster_simple';
+```
+
## See also {#see-also}
- [iceberg table function](/sql-reference/table-functions/iceberg.md)
diff --git a/docs/en/sql-reference/distribution-on-cluster.md b/docs/en/sql-reference/distribution-on-cluster.md
new file mode 100644
index 000000000000..3a9835e23856
--- /dev/null
+++ b/docs/en/sql-reference/distribution-on-cluster.md
@@ -0,0 +1,23 @@
+# Task distribution in *Cluster family functions
+
+## Task distribution algorithm
+
+Table functions such as `s3Cluster`, `azureBlobStorageCluster`, `hdsfCluster`, `icebergCluster`, and table engines like `S3`, `Azure`, `HDFS`, `Iceberg` with the setting `object_storage_cluster` distribute tasks across all cluster nodes or a subset limited by the `object_storage_max_nodes` setting. This setting limits the number of nodes involved in processing a distributed query, randomly selecting nodes for each query.
+
+A single task corresponds to processing one source file.
+
+For each file, one cluster node is selected as the primary node using a consistent Rendezvous Hashing algorithm. This algorithm guarantees that:
+ * The same node is consistently selected as primary for each file, as long as the cluster remains unchanged.
+ * When the cluster changes (nodes added or removed), only files assigned to those affected nodes change their primary node assignment.
+
+This improves cache efficiency by minimizing data movement among nodes.
+
+## `lock_object_storage_task_distribution_ms` setting
+
+Each node begins processing files for which it is the primary node. After completing its assigned files, a node may take tasks from other nodes, either immediately or after waiting for `lock_object_storage_task_distribution_ms` milliseconds if the primary node does not request new files during that interval. The default value of `lock_object_storage_task_distribution_ms` is 500 milliseconds. This setting balances between caching efficiency and workload redistribution when nodes are imbalanced.
+
+## `SYSTEM STOP SWARM MODE` command
+
+If a node needs to shut down gracefully, the command `SYSTEM STOP SWARM MODE` prevents the node from receiving new tasks for *Cluster-family queries. The node finishes processing already assigned files before it can safely shut down without errors.
+
+Receiving new tasks can be resumed with the command `SYSTEM START SWARM MODE`.
diff --git a/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md b/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
index 4db1dbb594c6..b67ea0efe2e2 100644
--- a/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
+++ b/docs/en/sql-reference/table-functions/azureBlobStorageCluster.md
@@ -54,6 +54,20 @@ SELECT count(*) FROM azureBlobStorageCluster(
See [azureBlobStorage](/sql-reference/table-functions/azureBlobStorage#using-shared-access-signatures-sas-sas-tokens) for examples.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch, the alternative syntax for the `azureBlobStorageCluster` table function is avilable. This allows the `azureBlobStorage` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Azure Blob Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*) FROM azureBlobStorage(
+ 'http://azurite1:10000/devstoreaccount1', 'testcontainer', 'test_cluster_count.csv', 'devstoreaccount1',
+ 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==', 'CSV',
+ 'auto', 'key UInt64')
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [AzureBlobStorage engine](../../engines/table-engines/integrations/azureBlobStorage.md)
diff --git a/docs/en/sql-reference/table-functions/deltalakeCluster.md b/docs/en/sql-reference/table-functions/deltalakeCluster.md
index f01b40d5ce6f..865399172ff3 100644
--- a/docs/en/sql-reference/table-functions/deltalakeCluster.md
+++ b/docs/en/sql-reference/table-functions/deltalakeCluster.md
@@ -45,6 +45,17 @@ A table with the specified structure for reading data from cluster in the specif
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `deltaLakeCluster` table function is available. This allows the `deltaLake` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Delta Lake Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*) FROM deltaLake(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [deltaLake engine](engines/table-engines/integrations/deltalake.md)
diff --git a/docs/en/sql-reference/table-functions/hdfsCluster.md b/docs/en/sql-reference/table-functions/hdfsCluster.md
index 74a526a2de7b..bfc4ab30fd5b 100644
--- a/docs/en/sql-reference/table-functions/hdfsCluster.md
+++ b/docs/en/sql-reference/table-functions/hdfsCluster.md
@@ -60,6 +60,18 @@ FROM hdfsCluster('cluster_simple', 'hdfs://hdfs1:9000/{some,another}_dir/*', 'TS
If your listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `hdfsCluster` table function is available. This allows the `hdfs` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over HDFS Storage across a ClickHouse cluster.
+
+```sql
+SELECT count(*)
+FROM hdfs('hdfs://hdfs1:9000/{some,another}_dir/*', 'TSV', 'name String, value UInt32')
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [HDFS engine](../../engines/table-engines/integrations/hdfs.md)
diff --git a/docs/en/sql-reference/table-functions/hudiCluster.md b/docs/en/sql-reference/table-functions/hudiCluster.md
index 3f44a369d062..1087ef51cb84 100644
--- a/docs/en/sql-reference/table-functions/hudiCluster.md
+++ b/docs/en/sql-reference/table-functions/hudiCluster.md
@@ -43,6 +43,18 @@ A table with the specified structure for reading data from cluster in the specif
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `hudiCluster` table function is available. This allows the `hudi` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Hudi Storage across a ClickHouse cluster.
+
+```sql
+SELECT *
+FROM hudi(url [,aws_access_key_id, aws_secret_access_key] [,format] [,structure] [,compression])
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [Hudi engine](engines/table-engines/integrations/hudi.md)
diff --git a/docs/en/sql-reference/table-functions/iceberg.md b/docs/en/sql-reference/table-functions/iceberg.md
index a1946d094caa..9fd78c5e17d7 100644
--- a/docs/en/sql-reference/table-functions/iceberg.md
+++ b/docs/en/sql-reference/table-functions/iceberg.md
@@ -502,6 +502,48 @@ x: Ivanov
y: 993
```
+
+## Altinity Antalya branch
+
+### Specify storage type in arguments
+
+Only in the Altinity Antalya branch does the `iceberg` table function support all storage types. The storage type can be specified using the named argument `storage_type`. Supported values are `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+iceberg(storage_type='s3', url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
+
+iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
+
+iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method])
+
+iceberg(storage_type='local', path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in named collection
+
+Only in the Altinity Antalya branch can storage_type be included as part of a named collection. This allows for centralized configuration of storage settings.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+iceberg(named_collection[, option=value [,..]])
+```
+
+The default value for `storage_type` is `s3`.
+
## See Also {#see-also}
* [Iceberg engine](/engines/table-engines/integrations/iceberg.md)
diff --git a/docs/en/sql-reference/table-functions/icebergCluster.md b/docs/en/sql-reference/table-functions/icebergCluster.md
index d3ce33579d3e..f1db4c4f44ac 100644
--- a/docs/en/sql-reference/table-functions/icebergCluster.md
+++ b/docs/en/sql-reference/table-functions/icebergCluster.md
@@ -50,6 +50,81 @@ SELECT * FROM icebergS3Cluster('cluster_simple', 'http://test.s3.amazonaws.com/c
- `_time` — Last modified time of the file. Type: `Nullable(DateTime)`. If the time is unknown, the value is `NULL`.
- `_etag` — The etag of the file. Type: `LowCardinality(String)`. If the etag is unknown, the value is `NULL`.
+## Altinity Antalya branch
+
+### `icebergLocalCluster` table function
+
+Only in the Altinity Antalya branch, `icebergLocalCluster` designed to make distributed cluster queries when Iceberg data is stored on shared network storage mounted with a local path. The path must be identical on all replicas.
+
+```sql
+icebergLocalCluster(cluster_name, path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in function arguments
+
+Only in the Altinity Antalya branch, the `icebergCluster` table function supports all storage backends. The storage backend can be specified using the named argument `storage_type`. Valid values include `s3`, `azure`, `hdfs`, and `local`.
+
+```sql
+icebergCluster(storage_type='s3', cluster_name, url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method])
+
+icebergCluster(storage_type='azure', cluster_name, connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method])
+
+icebergCluster(storage_type='hdfs', cluster_name, path_to_table, [,format] [,compression_method])
+
+icebergCluster(storage_type='local', cluster_name, path_to_table, [,format] [,compression_method])
+```
+
+### Specify storage type in a named collection
+
+Only in the Altinity Antalya branch, `storage_type` can be part of a named collection.
+
+```xml
+
+
+
+ http://test.s3.amazonaws.com/clickhouse-bucket/
+ test
+ test
+ auto
+ auto
+ s3
+
+
+
+```
+
+```sql
+icebergCluster(iceberg_conf[, option=value [,..]])
+```
+
+The default value for `storage_type` is `s3`.
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch, an alternative syntax for `icebergCluster` table function is available. This allows the `iceberg` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over Iceberg table across a ClickHouse cluster.
+
+```sql
+icebergS3(url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergAzure(connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergHDFS(path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergLocal(path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+icebergS3(option=value [,..]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='s3', url [, NOSIGN | access_key_id, secret_access_key, [session_token]] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='azure', connection_string|storage_account_url, container_name, blobpath, [,account_name], [,account_key] [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='hdfs', path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(storage_type='local', path_to_table, [,format] [,compression_method]) SETTINGS object_storage_cluster='cluster_name'
+
+iceberg(iceberg_conf[, option=value [,..]]) SETTINGS object_storage_cluster='cluster_name'
+```
+
**See Also**
- [Iceberg engine](/engines/table-engines/integrations/iceberg.md)
diff --git a/docs/en/sql-reference/table-functions/s3Cluster.md b/docs/en/sql-reference/table-functions/s3Cluster.md
index 2e6af0273ba0..f0cce77a0b49 100644
--- a/docs/en/sql-reference/table-functions/s3Cluster.md
+++ b/docs/en/sql-reference/table-functions/s3Cluster.md
@@ -91,6 +91,23 @@ Users can use the same approaches as document for the s3 function [here](/sql-re
For details on optimizing the performance of the s3 function see [our detailed guide](/integrations/s3/performance).
+## Altinity Antalya branch
+
+### `object_storage_cluster` setting.
+
+Only in the Altinity Antalya branch alternative syntax for `s3Cluster` table function is available. This allows the `s3` function to be used with the non-empty `object_storage_cluster` setting, specifying a cluster name. This enables distributed queries over S3 Storage across a ClickHouse cluster.
+
+```sql
+SELECT * FROM s3(
+ 'http://minio1:9001/root/data/{clickhouse,database}/*',
+ 'minio',
+ 'ClickHouse_Minio_P@ssw0rd',
+ 'CSV',
+ 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))'
+) ORDER BY (name, value, polygon)
+SETTINGS object_storage_cluster='cluster_simple'
+```
+
## Related {#related}
- [S3 engine](../../engines/table-engines/integrations/s3.md)
diff --git a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
index 8bcb6e147420..e4f63192c95b 100644
--- a/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
+++ b/src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h
@@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
{
public:
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
- size_t size() const override { return arguments ? arguments->size() : 0; }
- std::unique_ptr at(size_t n) const override { return std::make_unique(arguments->at(n).get()); }
+ size_t size() const override
+ { /// size withous skipped indexes
+ return arguments ? arguments->size() - skippedSize() : 0;
+ }
+ std::unique_ptr at(size_t n) const override
+ { /// n is relative index, some can be skipped
+ return std::make_unique(arguments->at(getRealIndex(n)).get());
+ }
private:
const QueryTreeNodes * arguments = nullptr;
};
diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp
index 3f9bd174afeb..c25e609ee164 100644
--- a/src/Common/ProfileEvents.cpp
+++ b/src/Common/ProfileEvents.cpp
@@ -349,6 +349,11 @@
M(IcebergTrivialCountOptimizationApplied, "Trivial count optimization applied while reading from Iceberg", ValueType::Number) \
M(IcebergVersionHintUsed, "Number of times version-hint.text has been used.", ValueType::Number) \
M(IcebergMinMaxIndexPrunedFiles, "Number of skipped files by using MinMax index in Iceberg", ValueType::Number) \
+ M(IcebergAvroFileParsing, "Number of times avro metadata files have been parsed.", ValueType::Number) \
+ M(IcebergAvroFileParsingMicroseconds, "Time spent for parsing avro metadata files for Iceberg tables.", ValueType::Microseconds) \
+ M(IcebergJsonFileParsing, "Number of times json metadata files have been parsed.", ValueType::Number) \
+ M(IcebergJsonFileParsingMicroseconds, "Time spent for parsing json metadata files for Iceberg tables.", ValueType::Microseconds) \
+ \
M(JoinBuildTableRowCount, "Total number of rows in the build table for a JOIN operation.", ValueType::Number) \
M(JoinProbeTableRowCount, "Total number of rows in the probe table for a JOIN operation.", ValueType::Number) \
M(JoinResultRowCount, "Total number of rows in the result of a JOIN operation.", ValueType::Number) \
@@ -670,8 +675,10 @@ The server successfully detected this situation and will download merged part fr
M(S3DeleteObjects, "Number of S3 API DeleteObject(s) calls.", ValueType::Number) \
M(S3CopyObject, "Number of S3 API CopyObject calls.", ValueType::Number) \
M(S3ListObjects, "Number of S3 API ListObjects calls.", ValueType::Number) \
+ M(S3ListObjectsMicroseconds, "Time of S3 API ListObjects execution.", ValueType::Microseconds) \
M(S3HeadObject, "Number of S3 API HeadObject calls.", ValueType::Number) \
M(S3GetObjectTagging, "Number of S3 API GetObjectTagging calls.", ValueType::Number) \
+ M(S3HeadObjectMicroseconds, "Time of S3 API HeadObject execution.", ValueType::Microseconds) \
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.", ValueType::Number) \
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.", ValueType::Number) \
M(S3UploadPart, "Number of S3 API UploadPart calls.", ValueType::Number) \
@@ -726,6 +733,7 @@ The server successfully detected this situation and will download merged part fr
M(AzureCopyObject, "Number of Azure blob storage API CopyObject calls", ValueType::Number) \
M(AzureDeleteObjects, "Number of Azure blob storage API DeleteObject(s) calls.", ValueType::Number) \
M(AzureListObjects, "Number of Azure blob storage API ListObjects calls.", ValueType::Number) \
+ M(AzureListObjectsMicroseconds, "Time of Azure blob storage API ListObjects execution.", ValueType::Microseconds) \
M(AzureGetProperties, "Number of Azure blob storage API GetProperties calls.", ValueType::Number) \
M(AzureCreateContainer, "Number of Azure blob storage API CreateContainer calls.", ValueType::Number) \
\
diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp
index e72653747d54..8ebea6e3a130 100644
--- a/src/Core/Settings.cpp
+++ b/src/Core/Settings.cpp
@@ -7315,6 +7315,16 @@ Allows creation of [QBit](../../sql-reference/data-types/qbit.md) data type.
)", BETA, allow_experimental_qbit_type) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
+ DECLARE(Timezone, iceberg_timezone_for_timestamptz, "UTC", R"(
+Timezone for Iceberg timestamptz field.
+
+Possible values:
+
+- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
+- `` (empty value) - use session timezone
+
+Default value is `UTC`.
+)", 0) \
\
/* ####################################################### */ \
/* ########### START OF EXPERIMENTAL FEATURES ############ */ \
@@ -7441,6 +7451,15 @@ Enable PRQL - an alternative to SQL.
)", EXPERIMENTAL) \
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
+)", EXPERIMENTAL) \
+ DECLARE(String, object_storage_cluster, "", R"(
+Cluster to make distributed requests to object storages with alternative syntax.
+)", EXPERIMENTAL) \
+ DECLARE(UInt64, object_storage_max_nodes, 0, R"(
+Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
+Possible values:
+- Positive integer.
+- 0 — All hosts in cluster.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_delta_kernel_rs, true, R"(
Allow experimental delta-kernel-rs implementation.
@@ -7519,6 +7538,9 @@ If the number of set bits in a runtime bloom filter exceeds this ratio the filte
)", EXPERIMENTAL) \
DECLARE(Bool, rewrite_in_to_join, false, R"(
Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optimizing the whole query with join reordering.
+)", EXPERIMENTAL) \
+ DECLARE(Bool, object_storage_remote_initiator, false, R"(
+Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
\
/** Experimental timeSeries* aggregate functions. */ \
diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp
index e8686e2a43ea..1cac5f6580ad 100644
--- a/src/Core/SettingsChangesHistory.cpp
+++ b/src/Core/SettingsChangesHistory.cpp
@@ -39,6 +39,13 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
/// Note: please check if the key already exists to prevent duplicate entries.
+ addSettingsChanges(settings_changes_history, "26.1.1.20001",
+ {
+ {"object_storage_cluster", "", "", "Antalya: New setting"},
+ {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
+ {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
+ {"object_storage_remote_initiator", 0, 0, "Antalya: New setting"},
+ });
addSettingsChanges(settings_changes_history, "26.1",
{
{"parallel_replicas_filter_pushdown", false, false, "New setting"},
diff --git a/src/Databases/DataLake/Common.cpp b/src/Databases/DataLake/Common.cpp
index 681dd957b43f..8946d3412d70 100644
--- a/src/Databases/DataLake/Common.cpp
+++ b/src/Databases/DataLake/Common.cpp
@@ -61,14 +61,14 @@ std::vector splitTypeArguments(const String & type_str)
return args;
}
-DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix)
+DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix)
{
String name = trim(type_name);
if (name.starts_with("array<") && name.ends_with(">"))
{
String inner = name.substr(6, name.size() - 7);
- return std::make_shared(getType(inner, nullable));
+ return std::make_shared(getType(inner, nullable, context));
}
if (name.starts_with("map<") && name.ends_with(">"))
@@ -79,7 +79,7 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
if (args.size() != 2)
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Invalid data type {}", type_name);
- return std::make_shared(getType(args[0], false), getType(args[1], nullable));
+ return std::make_shared(getType(args[0], false, context), getType(args[1], nullable, context));
}
if (name.starts_with("struct<") && name.ends_with(">"))
@@ -101,13 +101,13 @@ DB::DataTypePtr getType(const String & type_name, bool nullable, const String &
String full_field_name = prefix.empty() ? field_name : prefix + "." + field_name;
field_names.push_back(full_field_name);
- field_types.push_back(getType(field_type, nullable, full_field_name));
+ field_types.push_back(getType(field_type, nullable, context, full_field_name));
}
return std::make_shared(field_types, field_names);
}
- return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name))
- : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name);
+ return nullable ? DB::makeNullable(DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context))
+ : DB::Iceberg::IcebergSchemaProcessor::getSimpleType(name, context);
}
std::pair parseTableName(const std::string & name)
diff --git a/src/Databases/DataLake/Common.h b/src/Databases/DataLake/Common.h
index cd4b6214e343..9b0dd7c626a6 100644
--- a/src/Databases/DataLake/Common.h
+++ b/src/Databases/DataLake/Common.h
@@ -2,6 +2,7 @@
#include
#include
+#include
namespace DataLake
{
@@ -10,7 +11,7 @@ String trim(const String & str);
std::vector splitTypeArguments(const String & type_str);
-DB::DataTypePtr getType(const String & type_name, bool nullable, const String & prefix = "");
+DB::DataTypePtr getType(const String & type_name, bool nullable, DB::ContextPtr context, const String & prefix = "");
/// Parse a string, containing at least one dot, into a two substrings:
/// A.B.C.D.E -> A.B.C.D and E, where
diff --git a/src/Databases/DataLake/DataLakeConstants.h b/src/Databases/DataLake/DataLakeConstants.h
index eaa8f5a276e6..02f6a7dcfcd7 100644
--- a/src/Databases/DataLake/DataLakeConstants.h
+++ b/src/Databases/DataLake/DataLakeConstants.h
@@ -8,6 +8,7 @@ namespace DataLake
{
static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
+static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";
/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"
diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp
index d617751a3eb6..449e90341764 100644
--- a/src/Databases/DataLake/DatabaseDataLake.cpp
+++ b/src/Databases/DataLake/DatabaseDataLake.cpp
@@ -55,6 +55,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString oauth_server_uri;
extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body;
extern const DatabaseDataLakeSettingsBool vended_credentials;
+ extern const DatabaseDataLakeSettingsString object_storage_cluster;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
@@ -241,7 +242,7 @@ std::shared_ptr DatabaseDataLake::getCatalog() const
return catalog_impl;
}
-std::shared_ptr DatabaseDataLake::getConfiguration(
+StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const
{
@@ -449,7 +450,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
auto [namespace_name, table_name] = DataLake::parseTableName(name);
- if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
+ if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
return nullptr;
if (ignore_if_not_iceberg && !table_metadata.isDefaultReadableTable())
return nullptr;
@@ -567,7 +568,7 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
/// with_table_structure = false: because there will be
/// no table structure in table definition AST.
- StorageObjectStorageConfiguration::initialize(*configuration, args, context_copy, /* with_table_structure */false);
+ configuration->initialize(args, context_copy, /* with_table_structure */false);
const auto & query_settings = context_->getSettingsRef();
@@ -579,49 +580,34 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
const auto is_secondary_query = context_->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
- if (can_use_parallel_replicas && !is_secondary_query)
- {
- auto storage_cluster = std::make_shared(
- parallel_replicas_cluster_name,
- configuration,
- configuration->createObjectStorage(context_copy, /* is_readonly */ false),
- StorageID(getDatabaseName(), name),
- columns,
- ConstraintsDescription{},
- nullptr,
- context_,
- /// Use is_table_function = true,
- /// because this table is actually stateless like a table function.
- /* is_table_function */true);
-
- storage_cluster->startup();
- return storage_cluster;
- }
+ std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : "";
- bool can_use_distributed_iterator =
- context_->getClientInfo().collaborate_with_initiator &&
- can_use_parallel_replicas;
+ if (cluster_name.empty() && can_use_parallel_replicas && !is_secondary_query)
+ cluster_name = parallel_replicas_cluster_name;
- return std::make_shared(
+ auto storage_cluster = std::make_shared(
+ cluster_name,
configuration,
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
- context_copy,
StorageID(getDatabaseName(), name),
/* columns */columns,
/* constraints */ConstraintsDescription{},
- /* comment */"",
+ /* partition_by */nullptr,
+ /* order_by */nullptr,
+ context_copy,
+ /* comment */ "",
getFormatSettings(context_copy),
LoadingStrictnessLevel::CREATE,
getCatalog(),
/* if_not_exists*/true,
/* is_datalake_query*/true,
- /* distributed_processing */can_use_distributed_iterator,
- /* partition_by */nullptr,
- /* order_by */nullptr,
/// Use is_table_function = true,
/// because this table is actually stateless like a table function.
/* is_table_function */true,
/* lazy_init */true);
+
+ storage_cluster->startup();
+ return storage_cluster;
}
void DatabaseDataLake::dropTable( /// NOLINT
@@ -821,7 +807,7 @@ ASTPtr DatabaseDataLake::getCreateDatabaseQueryImpl() const
ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
const String & name,
- ContextPtr /* context_ */,
+ ContextPtr context_,
bool throw_on_error) const
{
auto catalog = getCatalog();
@@ -829,7 +815,7 @@ ASTPtr DatabaseDataLake::getCreateTableQueryImpl(
const auto [namespace_name, table_name] = DataLake::parseTableName(name);
- if (!catalog->tryGetTableMetadata(namespace_name, table_name, table_metadata))
+ if (!catalog->tryGetTableMetadata(namespace_name, table_name, context_, table_metadata))
{
if (throw_on_error)
throw Exception(ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY, "Table `{}` doesn't exist", name);
@@ -922,6 +908,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
}
+ if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
+ {
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
+ }
+
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
@@ -1017,6 +1008,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
args.uuid);
};
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
+ factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
}
}
diff --git a/src/Databases/DataLake/DatabaseDataLake.h b/src/Databases/DataLake/DatabaseDataLake.h
index bed47c5ccae6..736b951b0b21 100644
--- a/src/Databases/DataLake/DatabaseDataLake.h
+++ b/src/Databases/DataLake/DatabaseDataLake.h
@@ -84,7 +84,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
void validateSettings();
std::shared_ptr getCatalog() const;
- std::shared_ptr getConfiguration(
+ StorageObjectStorageConfigurationPtr getConfiguration(
DatabaseDataLakeStorageType type,
DataLakeStorageSettingsPtr storage_settings) const;
diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp
index 0bb26afaafa1..ed21065e4ce0 100644
--- a/src/Databases/DataLake/GlueCatalog.cpp
+++ b/src/Databases/DataLake/GlueCatalog.cpp
@@ -292,6 +292,7 @@ bool GlueCatalog::existsTable(const std::string & database_name, const std::stri
bool GlueCatalog::tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
Aws::Glue::Model::GetTableRequest request;
@@ -386,7 +387,7 @@ bool GlueCatalog::tryGetTableMetadata(
column_type = "timestamptz";
}
- schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
+ schema.push_back({column.GetName(), getType(column_type, can_be_nullable, getContext())});
}
result.setSchema(schema);
}
@@ -408,9 +409,10 @@ bool GlueCatalog::tryGetTableMetadata(
void GlueCatalog::getTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!tryGetTableMetadata(database_name, table_name, result))
+ if (!tryGetTableMetadata(database_name, table_name, context_, result))
{
throw DB::Exception(
DB::ErrorCodes::DATALAKE_DATABASE_ERROR,
@@ -513,7 +515,7 @@ GlueCatalog::ObjectStorageWithPath GlueCatalog::createObjectStorageForEarlyTable
auto storage_settings = std::make_shared();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared(storage_settings);
- DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false);
+ configuration->initialize(args, getContext(), false);
auto object_storage = configuration->createObjectStorage(getContext(), true);
diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h
index 7392cdfb4afd..4d1b5db12439 100644
--- a/src/Databases/DataLake/GlueCatalog.h
+++ b/src/Databases/DataLake/GlueCatalog.h
@@ -41,11 +41,13 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & database_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override
diff --git a/src/Databases/DataLake/HiveCatalog.cpp b/src/Databases/DataLake/HiveCatalog.cpp
index b86f70dfc4b5..bc6ff5244749 100644
--- a/src/Databases/DataLake/HiveCatalog.cpp
+++ b/src/Databases/DataLake/HiveCatalog.cpp
@@ -121,13 +121,21 @@ bool HiveCatalog::existsTable(const std::string & namespace_name, const std::str
return true;
}
-void HiveCatalog::getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
+void HiveCatalog::getTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const
{
- if (!tryGetTableMetadata(namespace_name, table_name, result))
+ if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}
-bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const
+bool HiveCatalog::tryGetTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const
{
Apache::Hadoop::Hive::Table table;
@@ -155,7 +163,7 @@ bool HiveCatalog::tryGetTableMetadata(const std::string & namespace_name, const
auto columns = table.sd.cols;
for (const auto & column : columns)
{
- schema.push_back({column.name, getType(column.type, true)});
+ schema.push_back({column.name, getType(column.type, true, context_)});
}
result.setSchema(schema);
}
diff --git a/src/Databases/DataLake/HiveCatalog.h b/src/Databases/DataLake/HiveCatalog.h
index 29b4e6ce6c63..0fba0e132486 100644
--- a/src/Databases/DataLake/HiveCatalog.h
+++ b/src/Databases/DataLake/HiveCatalog.h
@@ -38,9 +38,17 @@ class HiveCatalog final : public ICatalog, private DB::WithContext
bool existsTable(const std::string & namespace_name, const std::string & table_name) const override;
- void getTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;
-
- bool tryGetTableMetadata(const std::string & namespace_name, const std::string & table_name, TableMetadata & result) const override;
+ void getTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const override;
+
+ bool tryGetTableMetadata(
+ const std::string & namespace_name,
+ const std::string & table_name,
+ DB::ContextPtr context_,
+ TableMetadata & result) const override;
std::optional getStorageType() const override;
diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp
index fbc05711e7d1..38dcb13d7531 100644
--- a/src/Databases/DataLake/ICatalog.cpp
+++ b/src/Databases/DataLake/ICatalog.cpp
@@ -102,33 +102,39 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
if (pos_to_path == std::string::npos)
- throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
-
- pos_to_path = pos_to_bucket + pos_to_path;
-
- location_without_path = location_.substr(0, pos_to_path);
- path = location_.substr(pos_to_path + 1);
-
- /// For Azure ABFSS format: abfss://container@account.dfs.core.windows.net/path
- /// The bucket (container) is the part before '@', not the whole string before '/'
- String bucket_part = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
- auto at_pos = bucket_part.find('@');
- if (at_pos != std::string::npos)
- {
- /// Azure ABFSS format: extract container (before @) and account (after @)
- bucket = bucket_part.substr(0, at_pos);
- azure_account_with_suffix = bucket_part.substr(at_pos + 1);
- LOG_TEST(getLogger("TableMetadata"),
- "Parsed Azure location - container: {}, account: {}, path: {}",
- bucket, azure_account_with_suffix, path);
+ { // empty path, AWS S3Table
+ location_without_path = location_;
+ path.clear();
+ bucket = location_.substr(pos_to_bucket);
}
else
{
- /// Standard format (S3, GCS, etc.)
- bucket = bucket_part;
- LOG_TEST(getLogger("TableMetadata"),
- "Parsed location without path: {}, path: {}",
- location_without_path, path);
+ pos_to_path = pos_to_bucket + pos_to_path;
+
+ location_without_path = location_.substr(0, pos_to_path);
+ path = location_.substr(pos_to_path + 1);
+
+ /// For Azure ABFSS format: abfss://container@account.dfs.core.windows.net/path
+ /// The bucket (container) is the part before '@', not the whole string before '/'
+ String bucket_part = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
+ auto at_pos = bucket_part.find('@');
+ if (at_pos != std::string::npos)
+ {
+ /// Azure ABFSS format: extract container (before @) and account (after @)
+ bucket = bucket_part.substr(0, at_pos);
+ azure_account_with_suffix = bucket_part.substr(at_pos + 1);
+ LOG_TEST(getLogger("TableMetadata"),
+ "Parsed Azure location - container: {}, account: {}, path: {}",
+ bucket, azure_account_with_suffix, path);
+ }
+ else
+ {
+ /// Standard format (S3, GCS, etc.)
+ bucket = bucket_part;
+ LOG_TEST(getLogger("TableMetadata"),
+ "Parsed location without path: {}, path: {}",
+ location_without_path, path);
+ }
}
}
diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h
index 5b4e481f01e7..cdfaf2978946 100644
--- a/src/Databases/DataLake/ICatalog.h
+++ b/src/Databases/DataLake/ICatalog.h
@@ -8,6 +8,14 @@
#include
#include
+namespace DB
+{
+
+class Context;
+using ContextPtr = std::shared_ptr;
+
+}
+
namespace DataLake
{
@@ -153,6 +161,7 @@ class ICatalog
virtual void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context,
TableMetadata & result) const = 0;
/// Get table metadata in the given namespace.
@@ -160,6 +169,7 @@ class ICatalog
virtual bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context,
TableMetadata & result) const = 0;
/// Get storage type, where Iceberg tables' data is stored.
diff --git a/src/Databases/DataLake/PaimonRestCatalog.cpp b/src/Databases/DataLake/PaimonRestCatalog.cpp
index 609bfb8128fc..b6a0fff1c025 100644
--- a/src/Databases/DataLake/PaimonRestCatalog.cpp
+++ b/src/Databases/DataLake/PaimonRestCatalog.cpp
@@ -466,7 +466,7 @@ bool PaimonRestCatalog::existsTable(const String & database_name, const String &
return true;
}
-bool PaimonRestCatalog::tryGetTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const
+bool PaimonRestCatalog::tryGetTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr /*context_*/, TableMetadata & result) const
{
try
{
@@ -592,9 +592,9 @@ Poco::JSON::Object::Ptr PaimonRestCatalog::requestRest(
return json.extract();
}
-void PaimonRestCatalog::getTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const
+void PaimonRestCatalog::getTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr context_, TableMetadata & result) const
{
- if (!tryGetTableMetadata(database_name, table_name, result))
+ if (!tryGetTableMetadata(database_name, table_name, context_, result))
{
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from paimon rest catalog");
}
diff --git a/src/Databases/DataLake/PaimonRestCatalog.h b/src/Databases/DataLake/PaimonRestCatalog.h
index 78713832e288..c81722c63964 100644
--- a/src/Databases/DataLake/PaimonRestCatalog.h
+++ b/src/Databases/DataLake/PaimonRestCatalog.h
@@ -89,9 +89,9 @@ class PaimonRestCatalog final : public ICatalog, private DB::WithContext
bool existsTable(const String & database_name, const String & table_name) const override;
- void getTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const override;
+ void getTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr context_, TableMetadata & result) const override;
- bool tryGetTableMetadata(const String & database_name, const String & table_name, TableMetadata & result) const override;
+ bool tryGetTableMetadata(const String & database_name, const String & table_name, DB::ContextPtr /*context_*/, TableMetadata & result) const override;
std::optional getStorageType() const override { return storage_type; }
diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp
index 6229e84eca84..a9c7fba4bcb5 100644
--- a/src/Databases/DataLake/RestCatalog.cpp
+++ b/src/Databases/DataLake/RestCatalog.cpp
@@ -596,17 +596,18 @@ DB::Names RestCatalog::parseTables(DB::ReadBuffer & buf, const std::string & bas
bool RestCatalog::existsTable(const std::string & namespace_name, const std::string & table_name) const
{
TableMetadata table_metadata;
- return tryGetTableMetadata(namespace_name, table_name, table_metadata);
+ return tryGetTableMetadata(namespace_name, table_name, getContext(), table_metadata);
}
bool RestCatalog::tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
try
{
- return getTableMetadataImpl(namespace_name, table_name, result);
+ return getTableMetadataImpl(namespace_name, table_name, context_, result);
}
catch (const DB::Exception & ex)
{
@@ -618,15 +619,17 @@ bool RestCatalog::tryGetTableMetadata(
void RestCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!getTableMetadataImpl(namespace_name, table_name, result))
+ if (!getTableMetadataImpl(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from iceberg catalog");
}
bool RestCatalog::getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
LOG_DEBUG(log, "Checking table {} in namespace {}", table_name, namespace_name);
@@ -688,8 +691,8 @@ bool RestCatalog::getTableMetadataImpl(
if (result.requiresSchema())
{
// int format_version = metadata_object->getValue("format-version");
- auto schema_processor = DB::Iceberg::IcebergSchemaProcessor();
- auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, log);
+ auto schema_processor = DB::Iceberg::IcebergSchemaProcessor(context_);
+ auto id = DB::IcebergMetadata::parseTableSchema(metadata_object, schema_processor, context_, log);
auto schema = schema_processor.getClickhouseTableSchemaById(id);
result.setSchema(*schema);
}
diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h
index f9ce53729ce4..d0e9678c876c 100644
--- a/src/Databases/DataLake/RestCatalog.h
+++ b/src/Databases/DataLake/RestCatalog.h
@@ -53,11 +53,13 @@ class RestCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override;
@@ -144,6 +146,7 @@ class RestCatalog final : public ICatalog, private DB::WithContext
bool getTableMetadataImpl(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const;
Config loadConfig();
diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp
index 1f59c16242fd..06de69c15a8a 100644
--- a/src/Databases/DataLake/UnityCatalog.cpp
+++ b/src/Databases/DataLake/UnityCatalog.cpp
@@ -91,9 +91,10 @@ DB::Names UnityCatalog::getTables() const
void UnityCatalog::getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const
{
- if (!tryGetTableMetadata(namespace_name, table_name, result))
+ if (!tryGetTableMetadata(namespace_name, table_name, context_, result))
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "No response from unity catalog");
}
@@ -159,6 +160,7 @@ void UnityCatalog::getCredentials(const std::string & table_id, TableMetadata &
bool UnityCatalog::tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
+ DB::ContextPtr /* context_ */,
TableMetadata & result) const
{
auto full_table_name = warehouse + "." + schema_name + "." + table_name;
diff --git a/src/Databases/DataLake/UnityCatalog.h b/src/Databases/DataLake/UnityCatalog.h
index 2e6262d6e5d7..9d4dc0a74877 100644
--- a/src/Databases/DataLake/UnityCatalog.h
+++ b/src/Databases/DataLake/UnityCatalog.h
@@ -34,11 +34,13 @@ class UnityCatalog final : public ICatalog, private DB::WithContext
void getTableMetadata(
const std::string & namespace_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
bool tryGetTableMetadata(
const std::string & schema_name,
const std::string & table_name,
+ DB::ContextPtr context_,
TableMetadata & result) const override;
std::optional getStorageType() const override { return std::nullopt; }
diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
index 510e2829bb8a..53fafbb1e1d2 100644
--- a/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
+++ b/src/Disks/DiskObjectStorage/ObjectStorages/AzureBlobStorage/AzureObjectStorage.cpp
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
namespace CurrentMetrics
@@ -33,6 +34,7 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event AzureListObjects;
+ extern const Event AzureListObjectsMicroseconds;
extern const Event DiskAzureListObjects;
extern const Event AzureDeleteObjects;
extern const Event DiskAzureDeleteObjects;
@@ -84,6 +86,7 @@ class AzureIteratorAsync final : public IObjectStorageIteratorAsync
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client->IsClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskAzureListObjects);
+ ProfileEventTimeIncrement watch(ProfileEvents::AzureListObjectsMicroseconds);
chassert(batch.empty());
auto blob_list_response = client->ListBlobs(options);
@@ -185,7 +188,15 @@ void AzureObjectStorage::listObjects(const std::string & path, RelativePathsWith
else
options.PageSizeHint = settings.get()->list_object_keys_size;
- for (auto blob_list_response = client_ptr->ListBlobs(options); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
+ AzureBlobStorage::ListBlobsPagedResponse blob_list_response;
+
+ auto list_blobs = [&]()->void
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::AzureListObjectsMicroseconds);
+ blob_list_response = client_ptr->ListBlobs(options);
+ };
+
+ for (list_blobs(); blob_list_response.HasPage(); blob_list_response.MoveToNextPage())
{
ProfileEvents::increment(ProfileEvents::AzureListObjects);
if (client_ptr->IsClientForDisk())
diff --git a/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
index 1e9ef01fb894..dbf9bfa2fe0f 100644
--- a/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
+++ b/src/Disks/DiskObjectStorage/ObjectStorages/S3/S3ObjectStorage.cpp
@@ -32,6 +32,7 @@
#include
#include
#include
+#include
#include
#include
@@ -39,6 +40,7 @@
namespace ProfileEvents
{
extern const Event S3ListObjects;
+ extern const Event S3ListObjectsMicroseconds;
extern const Event DiskS3DeleteObjects;
extern const Event DiskS3ListObjects;
}
@@ -143,7 +145,12 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
- auto outcome = client->ListObjectsV2(*request);
+ Aws::S3::Model::ListObjectsV2Outcome outcome;
+
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::S3ListObjectsMicroseconds);
+ outcome = client->ListObjectsV2(*request);
+ }
/// Outcome failure will be handled on the caller side.
if (outcome.IsSuccess())
@@ -288,7 +295,11 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
ProfileEvents::increment(ProfileEvents::S3ListObjects);
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
- outcome = client.get()->ListObjectsV2(request);
+ {
+ ProfileEventTimeIncrement watch(ProfileEvents::S3ListObjectsMicroseconds);
+ outcome = client.get()->ListObjectsV2(request);
+ }
+
throwIfError(outcome);
auto result = outcome.GetResult();
diff --git a/src/Disks/DiskType.cpp b/src/Disks/DiskType.cpp
index bf4506b4cbf6..ddc42cd07dc3 100644
--- a/src/Disks/DiskType.cpp
+++ b/src/Disks/DiskType.cpp
@@ -10,7 +10,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
-MetadataStorageType metadataTypeFromString(const String & type)
+MetadataStorageType metadataTypeFromString(const std::string & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "local")
@@ -58,25 +58,7 @@ String DataSourceDescription::name() const
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
- {
- switch (object_storage_type)
- {
- case ObjectStorageType::S3:
- return "s3";
- case ObjectStorageType::HDFS:
- return "hdfs";
- case ObjectStorageType::Azure:
- return "azure_blob_storage";
- case ObjectStorageType::Local:
- return "local_blob_storage";
- case ObjectStorageType::Web:
- return "web";
- case ObjectStorageType::None:
- return "none";
- case ObjectStorageType::Max:
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
- }
- }
+ return DB::toString(object_storage_type);
}
}
@@ -86,4 +68,45 @@ String DataSourceDescription::toString() const
name(), description, is_encrypted, is_cached, zookeeper_name);
}
+ObjectStorageType objectStorageTypeFromString(const std::string & type)
+{
+ auto check_type = Poco::toLower(type);
+ if (check_type == "s3")
+ return ObjectStorageType::S3;
+ if (check_type == "hdfs")
+ return ObjectStorageType::HDFS;
+ if (check_type == "azure_blob_storage" || check_type == "azure")
+ return ObjectStorageType::Azure;
+ if (check_type == "local_blob_storage" || check_type == "local")
+ return ObjectStorageType::Local;
+ if (check_type == "web")
+ return ObjectStorageType::Web;
+ if (check_type == "none")
+ return ObjectStorageType::None;
+
+ throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
+ "Unknown object storage type: {}", type);
+}
+
+std::string toString(ObjectStorageType type)
+{
+ switch (type)
+ {
+ case ObjectStorageType::S3:
+ return "s3";
+ case ObjectStorageType::HDFS:
+ return "hdfs";
+ case ObjectStorageType::Azure:
+ return "azure_blob_storage";
+ case ObjectStorageType::Local:
+ return "local_blob_storage";
+ case ObjectStorageType::Web:
+ return "web";
+ case ObjectStorageType::None:
+ return "none";
+ case ObjectStorageType::Max:
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
+ }
+}
+
}
diff --git a/src/Disks/DiskType.h b/src/Disks/DiskType.h
index 9018cd605481..835c1341775b 100644
--- a/src/Disks/DiskType.h
+++ b/src/Disks/DiskType.h
@@ -36,7 +36,10 @@ enum class MetadataStorageType : uint8_t
Memory,
};
-MetadataStorageType metadataTypeFromString(const String & type);
+MetadataStorageType metadataTypeFromString(const std::string & type);
+
+ObjectStorageType objectStorageTypeFromString(const std::string & type);
+std::string toString(ObjectStorageType type);
struct DataSourceDescription
{
diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp
index 0fa0b9dff629..e73c35f59ed4 100644
--- a/src/IO/ReadBufferFromS3.cpp
+++ b/src/IO/ReadBufferFromS3.cpp
@@ -484,6 +484,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
}
+ else
+ {
+ LOG_TEST(
+ log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
+ bucket, key, version_id.empty() ? "Latest" : version_id);
+ }
ProfileEvents::increment(ProfileEvents::S3GetObject);
if (client_ptr->isClientForDisk())
diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp
index 574a5d9d69df..af1fb5055ab3 100644
--- a/src/IO/S3/Client.cpp
+++ b/src/IO/S3/Client.cpp
@@ -456,7 +456,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
- if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
+ if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -673,7 +673,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));
-
bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
@@ -1042,12 +1041,15 @@ std::optional Client::getURIFromError(const Aws::S3::S3Error & error) c
}
// Do a list request because head requests don't have body in response
-std::optional Client::updateURIForBucketForHead(const std::string & bucket) const
+// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
+std::optional Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
{
- ListObjectsV2Request req;
+ GetObjectRequest req;
req.SetBucket(bucket);
- req.SetMaxKeys(1);
- auto result = ListObjectsV2(req);
+ req.SetKey(key);
+ req.SetRange("bytes=0-1");
+ auto result = GetObject(req);
+
if (result.IsSuccess())
return std::nullopt;
return result.GetError();
diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h
index 61ee4ead3dc0..4a65239a8582 100644
--- a/src/IO/S3/Client.h
+++ b/src/IO/S3/Client.h
@@ -285,7 +285,7 @@ class Client : private Aws::S3::S3Client
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional getURIFromError(const Aws::S3::S3Error & error) const;
- std::optional updateURIForBucketForHead(const std::string & bucket) const;
+ std::optional updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
std::optional getURIForBucket(const std::string & bucket) const;
diff --git a/src/IO/S3/URI.cpp b/src/IO/S3/URI.cpp
index b8150e740144..5ab8e5cfd724 100644
--- a/src/IO/S3/URI.cpp
+++ b/src/IO/S3/URI.cpp
@@ -191,10 +191,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax, bool keep_pre
validateKey(key, uri);
}
+bool URI::isAWSRegion(std::string_view region)
+{
+ /// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
+ static const std::unordered_set regions = {
+ "us-east-2",
+ "us-east-1",
+ "us-west-1",
+ "us-west-2",
+ "af-south-1",
+ "ap-east-1",
+ "ap-south-2",
+ "ap-southeast-3",
+ "ap-southeast-5",
+ "ap-southeast-4",
+ "ap-south-1",
+ "ap-northeast-3",
+ "ap-northeast-2",
+ "ap-southeast-1",
+ "ap-southeast-2",
+ "ap-east-2",
+ "ap-southeast-7",
+ "ap-northeast-1",
+ "ca-central-1",
+ "ca-west-1",
+ "eu-central-1",
+ "eu-west-1",
+ "eu-west-2",
+ "eu-south-1",
+ "eu-west-3",
+ "eu-south-2",
+ "eu-north-1",
+ "eu-central-2",
+ "il-central-1",
+ "mx-central-1",
+ "me-south-1",
+ "me-central-1",
+ "sa-east-1",
+ "us-gov-east-1",
+ "us-gov-west-1"
+ };
+
+ /// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
+ /// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
+ if (region.substr(0, 3) == "s3-")
+ region = region.substr(3);
+
+ return regions.contains(region);
+}
+
void URI::addRegionToURI(const std::string ®ion)
{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
+ {
+ if (pos > 0)
+ { /// Check if region is already in endpoint to avoid add it second time
+ auto prev_pos = endpoint.find_last_of("/.", pos - 1);
+ if (prev_pos == std::string::npos)
+ prev_pos = 0;
+ else
+ ++prev_pos;
+ std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
+ if (isAWSRegion(endpoint_region))
+ return;
+ }
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
+ }
}
void URI::validateBucket(const String & bucket, const Poco::URI & uri)
diff --git a/src/IO/S3/URI.h b/src/IO/S3/URI.h
index fa259b9de451..fd45baa39774 100644
--- a/src/IO/S3/URI.h
+++ b/src/IO/S3/URI.h
@@ -44,6 +44,10 @@ struct URI
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
static void validateKey(const std::string & key, const Poco::URI & uri);
+
+ /// Returns true if 'region' string is an AWS S3 region
+ /// https://docs.aws.amazon.com/general/latest/gr/s3.html
+ static bool isAWSRegion(std::string_view region);
};
}
diff --git a/src/IO/S3/getObjectInfo.cpp b/src/IO/S3/getObjectInfo.cpp
index 8d44435e70c4..9f2cea7cd944 100644
--- a/src/IO/S3/getObjectInfo.cpp
+++ b/src/IO/S3/getObjectInfo.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#if USE_AWS_S3
@@ -15,6 +16,7 @@ namespace ProfileEvents
extern const Event S3GetObject;
extern const Event S3GetObjectTagging;
extern const Event S3HeadObject;
+ extern const Event S3HeadObjectMicroseconds;
extern const Event DiskS3GetObject;
extern const Event DiskS3GetObjectTagging;
extern const Event DiskS3HeadObject;
@@ -35,6 +37,7 @@ namespace
ProfileEvents::increment(ProfileEvents::S3HeadObject);
if (client.isClientForDisk())
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
+ ProfileEventTimeIncrement watch(ProfileEvents::S3HeadObjectMicroseconds);
S3::HeadObjectRequest req;
req.SetBucket(bucket);
diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp
index 4901596a7696..cc321ba7af8c 100644
--- a/src/IO/S3Common.cpp
+++ b/src/IO/S3Common.cpp
@@ -19,14 +19,6 @@
#include
-namespace ProfileEvents
-{
- extern const Event S3GetObjectMetadata;
- extern const Event S3HeadObject;
- extern const Event DiskS3GetObjectMetadata;
- extern const Event DiskS3HeadObject;
-}
-
namespace DB
{
diff --git a/src/Interpreters/Cluster.cpp b/src/Interpreters/Cluster.cpp
index fb6249c5e3e4..a3e7014469de 100644
--- a/src/Interpreters/Cluster.cpp
+++ b/src/Interpreters/Cluster.cpp
@@ -741,9 +741,9 @@ void Cluster::initMisc()
}
}
-std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
+std::unique_ptr Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
{
- return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
+ return std::unique_ptr{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
}
std::unique_ptr Cluster::getClusterWithSingleShard(size_t index) const
@@ -792,7 +792,7 @@ void shuffleReplicas(std::vector & replicas, const Settings &
}
-Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
+Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
{
if (from.addresses_with_failover.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
@@ -814,6 +814,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
+ addresses_with_failover.emplace_back(Addresses({address}));
auto pool = ConnectionPoolFactory::instance().get(
static_cast(settings[Setting::distributed_connections_pool_size]),
@@ -837,9 +838,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.per_replica_pools = {std::move(pool)};
info.default_database = address.default_database;
- addresses_with_failover.emplace_back(Addresses{address});
-
- slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
@@ -861,10 +859,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
secret = from.secret;
name = from.name;
+ constrainShardInfoAndAddressesToMaxHosts(max_hosts);
+
+ for (size_t i = 0; i < shards_info.size(); ++i)
+ slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
+
initMisc();
}
+void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
+{
+ if (max_hosts == 0 || shards_info.size() <= max_hosts)
+ return;
+
+ pcg64_fast gen{randomSeed()};
+ std::shuffle(shards_info.begin(), shards_info.end(), gen);
+ shards_info.resize(max_hosts);
+
+ AddressesWithFailover addresses_with_failover_;
+
+ UInt32 shard_num = 0;
+ for (auto & shard_info : shards_info)
+ {
+ addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
+ shard_info.shard_num = ++shard_num;
+ }
+
+ addresses_with_failover.swap(addresses_with_failover_);
+}
+
+
Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector & indices)
{
for (size_t index : indices)
diff --git a/src/Interpreters/Cluster.h b/src/Interpreters/Cluster.h
index b5a4c51c11db..f9b581034ef7 100644
--- a/src/Interpreters/Cluster.h
+++ b/src/Interpreters/Cluster.h
@@ -270,7 +270,7 @@ class Cluster
std::unique_ptr getClusterWithMultipleShards(const std::vector & indices) const;
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
- std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
+ std::unique_ptr getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@@ -296,7 +296,7 @@ class Cluster
/// For getClusterWithReplicasAsShards implementation
struct ReplicasAsShardsTag {};
- Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
+ Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);
void addShard(
const Settings & settings,
@@ -308,6 +308,9 @@ class Cluster
ShardInfoInsertPathForInternalReplication insert_paths = {},
bool internal_replication = false);
+ /// Reduce size of cluster to max_hosts
+ void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);
+
/// Inter-server secret
String secret;
diff --git a/src/Interpreters/IcebergMetadataLog.cpp b/src/Interpreters/IcebergMetadataLog.cpp
index bdfea14b2a19..723af0a81b49 100644
--- a/src/Interpreters/IcebergMetadataLog.cpp
+++ b/src/Interpreters/IcebergMetadataLog.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
#include
namespace DB
@@ -79,7 +80,7 @@ void IcebergMetadataLogElement::appendToBlock(MutableColumns & columns) const
void insertRowToLogTable(
const ContextPtr & local_context,
- String row,
+ std::function get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
@@ -107,7 +108,7 @@ void insertRowToLogTable(
.content_type = row_log_level,
.table_path = table_path,
.file_path = file_path,
- .metadata_content = row,
+ .metadata_content = get_row(),
.row_in_file = row_in_file,
.pruning_status = pruning_status});
}
diff --git a/src/Interpreters/IcebergMetadataLog.h b/src/Interpreters/IcebergMetadataLog.h
index b43e2cfa47b2..afa0c66023fb 100644
--- a/src/Interpreters/IcebergMetadataLog.h
+++ b/src/Interpreters/IcebergMetadataLog.h
@@ -25,9 +25,11 @@ struct IcebergMetadataLogElement
void appendToBlock(MutableColumns & columns) const;
};
+/// Here `get_row` function is used instead `row` string to calculate string only when required.
+/// Inside `insertRowToLogTable` code can exit immediately after `iceberg_metadata_log_level` setting check.
void insertRowToLogTable(
const ContextPtr & local_context,
- String row,
+ std::function get_row,
IcebergMetadataLogLevel row_log_level,
const String & table_path,
const String & file_path,
diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp
index 435aa65af41d..05e60f09224c 100644
--- a/src/Interpreters/InterpreterCreateQuery.cpp
+++ b/src/Interpreters/InterpreterCreateQuery.cpp
@@ -1991,8 +1991,7 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
auto table_function_ast = create.getChild(*create.as_table_function);
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
- if (!table_function->canBeUsedToCreateTable())
- throw Exception(ErrorCodes::BAD_ARGUMENTS, "Table function '{}' cannot be used to create a table", table_function->getName());
+ table_function->validateUseToCreateTable();
/// In case of CREATE AS table_function() query we should use global context
/// in storage creation because there will be no query context on server startup
diff --git a/src/Parsers/ASTSetQuery.cpp b/src/Parsers/ASTSetQuery.cpp
index f4fe077280ac..968e4f4ee569 100644
--- a/src/Parsers/ASTSetQuery.cpp
+++ b/src/Parsers/ASTSetQuery.cpp
@@ -131,7 +131,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format,
return true;
}
- if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name)
+ if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name
+ || DataLake::DATABASE_ALIAS_NAME == state.create_engine_name)
{
if (DataLake::SETTINGS_TO_HIDE.contains(change.name))
{
diff --git a/src/Parsers/FunctionSecretArgumentsFinder.h b/src/Parsers/FunctionSecretArgumentsFinder.h
index 8a3ef97422e8..e2b9a957f833 100644
--- a/src/Parsers/FunctionSecretArgumentsFinder.h
+++ b/src/Parsers/FunctionSecretArgumentsFinder.h
@@ -3,9 +3,12 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
namespace DB
@@ -29,6 +32,21 @@ class AbstractFunction
virtual ~Arguments() = default;
virtual size_t size() const = 0;
virtual std::unique_ptr at(size_t n) const = 0;
+ void skipArgument(size_t n) { skipped_indexes.insert(n); }
+ void unskipArguments() { skipped_indexes.clear(); }
+ size_t getRealIndex(size_t n) const
+ {
+ for (auto idx : skipped_indexes)
+ {
+ if (n < idx)
+ break;
+ ++n;
+ }
+ return n;
+ }
+ size_t skippedSize() const { return skipped_indexes.size(); }
+ private:
+ std::set skipped_indexes;
};
virtual ~AbstractFunction() = default;
@@ -77,14 +95,15 @@ class FunctionSecretArgumentsFinder
{
if (index >= function->arguments->size())
return;
+ auto real_index = function->arguments->getRealIndex(index);
if (!result.count)
{
- result.start = index;
+ result.start = real_index;
result.are_named = argument_is_named;
}
- chassert(index >= result.start); /// We always check arguments consecutively
+ chassert(real_index >= result.start); /// We always check arguments consecutively
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
- result.count = index + 1 - result.start;
+ result.count = real_index + 1 - result.start;
if (!argument_is_named)
result.are_named = false;
}
@@ -102,8 +121,16 @@ class FunctionSecretArgumentsFinder
{
findMongoDBSecretArguments();
}
+ else if (function->name() == "iceberg")
+ {
+ findIcebergFunctionSecretArguments(/* is_cluster_function= */ false);
+ }
+ else if (function ->name() == "icebergCluster")
+ {
+ findIcebergFunctionSecretArguments(/* is_cluster_function= */ true);
+ }
else if ((function->name() == "s3") || (function->name() == "cosn") || (function->name() == "oss") ||
- (function->name() == "deltaLake") || (function->name() == "hudi") || (function->name() == "iceberg") ||
+ (function->name() == "deltaLake") || (function->name() == "hudi") ||
(function->name() == "gcs") || (function->name() == "icebergS3") || (function->name() == "paimon") ||
(function->name() == "paimonS3"))
{
@@ -112,7 +139,7 @@ class FunctionSecretArgumentsFinder
}
else if ((function->name() == "s3Cluster") || (function ->name() == "hudiCluster") ||
(function ->name() == "deltaLakeCluster") || (function ->name() == "deltaLakeS3Cluster") ||
- (function ->name() == "icebergS3Cluster") || (function ->name() == "icebergCluster"))
+ (function ->name() == "icebergS3Cluster"))
{
/// s3Cluster('cluster_name', 'url', 'aws_access_key_id', 'aws_secret_access_key', ...)
findS3FunctionSecretArguments(/* is_cluster_function= */ true);
@@ -270,6 +297,12 @@ class FunctionSecretArgumentsFinder
findSecretNamedArgument("secret_access_key", 1);
return;
}
+ if (is_cluster_function && isNamedCollectionName(1))
+ {
+ /// s3Cluster(cluster, named_collection, ..., secret_access_key = 'secret_access_key', ...)
+ findSecretNamedArgument("secret_access_key", 2);
+ return;
+ }
findSecretNamedArgument("secret_access_key", url_arg_idx);
@@ -277,6 +310,7 @@ class FunctionSecretArgumentsFinder
/// s3('url', NOSIGN, 'format' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
/// s3('url', 'format', 'structure' [, 'compression'] [, extra_credentials(..)] [, headers(..)])
size_t count = excludeS3OrURLNestedMaps();
+
if ((url_arg_idx + 3 <= count) && (count <= url_arg_idx + 4))
{
String second_arg;
@@ -341,6 +375,48 @@ class FunctionSecretArgumentsFinder
markSecretArgument(url_arg_idx + 4);
}
+ std::string findIcebergStorageType(bool is_cluster_function)
+ {
+ std::string storage_type = "s3";
+
+ size_t count = function->arguments->size();
+ if (!count)
+ return storage_type;
+
+ auto storage_type_idx = findNamedArgument(&storage_type, "storage_type");
+ if (storage_type_idx != -1)
+ {
+ storage_type = Poco::toLower(storage_type);
+ function->arguments->skipArgument(storage_type_idx);
+ }
+ else if (isNamedCollectionName(is_cluster_function ? 1 : 0))
+ {
+ std::string collection_name;
+ if (function->arguments->at(is_cluster_function ? 1 : 0)->tryGetString(&collection_name, true))
+ {
+ NamedCollectionPtr collection = NamedCollectionFactory::instance().tryGet(collection_name);
+ if (collection && collection->has("storage_type"))
+ {
+ storage_type = Poco::toLower(collection->get("storage_type"));
+ }
+ }
+ }
+
+ return storage_type;
+ }
+
+ void findIcebergFunctionSecretArguments(bool is_cluster_function)
+ {
+ auto storage_type = findIcebergStorageType(is_cluster_function);
+
+ if (storage_type == "s3")
+ findS3FunctionSecretArguments(is_cluster_function);
+ else if (storage_type == "azure")
+ findAzureBlobStorageFunctionSecretArguments(is_cluster_function);
+
+ function->arguments->unskipArguments();
+ }
+
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
{
String url_arg;
@@ -364,7 +440,7 @@ class FunctionSecretArgumentsFinder
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
- result.start = url_arg_idx;
+ result.start = function->arguments->getRealIndex(url_arg_idx);
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
@@ -375,7 +451,7 @@ class FunctionSecretArgumentsFinder
if (RE2::Replace(&url_arg, sas_signature_pattern, "SharedAccessSignature=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
- result.start = url_arg_idx;
+ result.start = function->arguments->getRealIndex(url_arg_idx);
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
@@ -534,6 +610,7 @@ class FunctionSecretArgumentsFinder
void findTableEngineSecretArguments()
{
const String & engine_name = function->name();
+
if (engine_name == "ExternalDistributed")
{
/// ExternalDistributed('engine', 'host:port', 'database', 'table', 'user', 'password')
@@ -551,10 +628,13 @@ class FunctionSecretArgumentsFinder
{
findMongoDBSecretArguments();
}
+ else if (engine_name == "Iceberg")
+ {
+ findIcebergTableEngineSecretArguments();
+ }
else if ((engine_name == "S3") || (engine_name == "COSN") || (engine_name == "OSS")
|| (engine_name == "DeltaLake") || (engine_name == "Hudi")
- || (engine_name == "Iceberg") || (engine_name == "IcebergS3")
- || (engine_name == "S3Queue"))
+ || (engine_name == "IcebergS3") || (engine_name == "S3Queue"))
{
/// S3('url', ['aws_access_key_id', 'aws_secret_access_key',] ...)
findS3TableEngineSecretArguments();
@@ -563,7 +643,7 @@ class FunctionSecretArgumentsFinder
{
findURLSecretArguments();
}
- else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue")
+ else if (engine_name == "AzureBlobStorage" || engine_name == "AzureQueue" || engine_name == "IcebergAzure")
{
findAzureBlobStorageTableEngineSecretArguments();
}
@@ -681,6 +761,18 @@ class FunctionSecretArgumentsFinder
markSecretArgument(2);
}
+ void findIcebergTableEngineSecretArguments()
+ {
+ auto storage_type = findIcebergStorageType(0);
+
+ if (storage_type == "s3")
+ findS3TableEngineSecretArguments();
+ else if (storage_type == "azure")
+ findAzureBlobStorageTableEngineSecretArguments();
+
+ function->arguments->unskipArguments();
+ }
+
void findDatabaseEngineSecretArguments()
{
const String & engine_name = function->name();
@@ -697,7 +789,7 @@ class FunctionSecretArgumentsFinder
/// S3('url', 'access_key_id', 'secret_access_key')
findS3DatabaseSecretArguments();
}
- else if (engine_name == "DataLakeCatalog")
+ else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg")
{
findDataLakeCatalogSecretArguments();
}
diff --git a/src/Parsers/FunctionSecretArgumentsFinderAST.h b/src/Parsers/FunctionSecretArgumentsFinderAST.h
index a260c0d58da6..3624d7a7e87b 100644
--- a/src/Parsers/FunctionSecretArgumentsFinderAST.h
+++ b/src/Parsers/FunctionSecretArgumentsFinderAST.h
@@ -54,10 +54,13 @@ class FunctionAST : public AbstractFunction
{
public:
explicit ArgumentsAST(const ASTs * arguments_) : arguments(arguments_) {}
- size_t size() const override { return arguments ? arguments->size() : 0; }
+ size_t size() const override
+ { /// size withous skipped indexes
+ return arguments ? arguments->size() - skippedSize() : 0;
+ }
std::unique_ptr at(size_t n) const override
- {
- return std::make_unique(arguments->at(n).get());
+ { /// n is relative index, some can be skipped
+ return std::make_unique(arguments->at(getRealIndex(n)).get());
}
private:
const ASTs * arguments = nullptr;
diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp
index 05fdfdd997b7..391a0f3e271e 100644
--- a/src/Server/TCPHandler.cpp
+++ b/src/Server/TCPHandler.cpp
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -34,7 +35,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/src/Storages/HivePartitioningUtils.cpp b/src/Storages/HivePartitioningUtils.cpp
index 86084717dd8e..060f04474e98 100644
--- a/src/Storages/HivePartitioningUtils.cpp
+++ b/src/Storages/HivePartitioningUtils.cpp
@@ -210,9 +210,9 @@ HivePartitionColumnsWithFileColumnsPair setupHivePartitioningForObjectStorage(
* Otherwise, in case `use_hive_partitioning=1`, we can keep the old behavior of extracting it from the sample path.
* And if the schema was inferred (not specified in the table definition), we need to enrich it with the path partition columns
*/
- if (configuration->partition_strategy && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE)
+ if (configuration->getPartitionStrategy() && configuration->getPartitionStrategyType() == PartitionStrategyFactory::StrategyType::HIVE)
{
- hive_partition_columns_to_read_from_file_path = configuration->partition_strategy->getPartitionColumns();
+ hive_partition_columns_to_read_from_file_path = configuration->getPartitionStrategy()->getPartitionColumns();
sanityCheckSchemaAndHivePartitionColumns(hive_partition_columns_to_read_from_file_path, columns, /* check_contained_in_schema */true);
}
else if (context->getSettingsRef()[Setting::use_hive_partitioning])
@@ -226,7 +226,7 @@ HivePartitionColumnsWithFileColumnsPair setupHivePartitioningForObjectStorage(
sanityCheckSchemaAndHivePartitionColumns(hive_partition_columns_to_read_from_file_path, columns, /* check_contained_in_schema */false);
}
- if (configuration->partition_columns_in_data_file)
+ if (configuration->getPartitionColumnsInDataFile())
{
file_columns = columns.getAllPhysical();
}
diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h
index 96f5adbc2de5..d297a284855c 100644
--- a/src/Storages/IStorage.h
+++ b/src/Storages/IStorage.h
@@ -71,6 +71,9 @@ using ConditionSelectivityEstimatorPtr = std::shared_ptr;
+
class ActionsDAG;
/** Storage. Describes the table. Responsible for
@@ -432,6 +435,7 @@ class IStorage : public std::enable_shared_from_this, public TypePromo
size_t /*max_block_size*/,
size_t /*num_streams*/);
+public:
/// Should we process blocks of data returned by the storage in parallel
/// even when the storage returned only one stream of data for reading?
/// It is beneficial, for example, when you read from a file quickly,
@@ -442,7 +446,6 @@ class IStorage : public std::enable_shared_from_this, public TypePromo
/// useless).
virtual bool parallelizeOutputAfterReading(ContextPtr) const { return !isSystemStorage(); }
-public:
/// Other version of read which adds reading step to query plan.
/// Default implementation creates ReadFromStorageStep and uses usual read.
/// Can be called after `shutdown`, but not after `drop`.
diff --git a/src/Storages/IStorageCluster.cpp b/src/Storages/IStorageCluster.cpp
index c6c69c0f21bc..1acde9606493 100644
--- a/src/Storages/IStorageCluster.cpp
+++ b/src/Storages/IStorageCluster.cpp
@@ -1,5 +1,8 @@
#include
+#include
+#include
+
#include
#include
#include
@@ -12,6 +15,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -20,6 +24,9 @@
#include
#include
#include
+#include
+#include
+#include
#include
#include
@@ -37,6 +44,13 @@ namespace Setting
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
+ extern const SettingsUInt64 object_storage_max_nodes;
+ extern const SettingsBool object_storage_remote_initiator;
+}
+
+namespace ErrorCodes
+{
+ extern const int NOT_IMPLEMENTED;
}
namespace ErrorCodes
@@ -115,6 +129,8 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
if (extension)
return;
+ storage->updateExternalDynamicMetadataIfExists(context);
+
extension = storage->getTaskIteratorExtension(
predicate,
filter_actions_dag ? filter_actions_dag.get() : query_info.filter_actions_dag.get(),
@@ -131,22 +147,31 @@ void IStorageCluster::read(
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/)
+ size_t max_block_size,
+ size_t num_streams)
{
+ auto cluster_name_from_settings = getClusterName(context);
+
+ if (!isClusterSupported() || cluster_name_from_settings.empty())
+ {
+ readFallBackToPure(query_plan, column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
+ return;
+ }
+
updateConfigurationIfNeeded(context);
storage_snapshot->check(column_names);
- updateBeforeRead(context);
- auto cluster = getCluster(context);
+ const auto & settings = context->getSettingsRef();
+
+ auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0);
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
SharedHeader sample_block;
ASTPtr query_to_send = query_info.query;
- if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
+ if (settings[Setting::allow_experimental_analyzer])
{
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
}
@@ -159,6 +184,17 @@ void IStorageCluster::read(
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
+ if (settings[Setting::object_storage_remote_initiator])
+ {
+ auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
+ auto src_distributed = std::dynamic_pointer_cast(storage_and_context.storage);
+ auto modified_query_info = query_info;
+ modified_query_info.cluster = src_distributed->getCluster();
+ auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
+ storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
+ return;
+ }
+
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as(), 0));
data.remote_table.database = context->getCurrentDatabase();
@@ -186,6 +222,76 @@ void IStorageCluster::read(
query_plan.addStep(std::move(reading));
}
+IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
+ ClusterPtr cluster,
+ ContextPtr context,
+ const std::string & cluster_name_from_settings,
+ ASTPtr query_to_send)
+{
+ auto host_addresses = cluster->getShardsAddresses();
+ if (host_addresses.empty())
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
+
+ static pcg64 rng(randomSeed());
+ size_t shard_num = rng() % host_addresses.size();
+ auto shard_addresses = host_addresses[shard_num];
+ /// After getClusterImpl each shard must have exactly 1 replica
+ if (shard_addresses.size() != 1)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
+ auto host_name = shard_addresses[0].toString();
+
+ LOG_INFO(log, "Choose remote initiator '{}'", host_name);
+
+ bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
+ std::string remote_function_name = secure ? "remoteSecure" : "remote";
+
+ /// Clean object_storage_remote_initiator setting to avoid infinite remote call
+ auto new_context = Context::createCopy(context);
+ new_context->setSetting("object_storage_remote_initiator", false);
+
+ auto * select_query = query_to_send->as();
+ if (!select_query)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");
+
+ auto query_settings = select_query->settings();
+ if (query_settings)
+ {
+ auto & settings_ast = query_settings->as();
+ if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
+ {
+ select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
+ }
+ }
+
+ ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
+ if (!table_expression)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");
+
+ auto remote_query = makeASTFunction(remote_function_name, make_intrusive(host_name), table_expression->table_function);
+
+ table_expression->table_function = remote_query;
+
+ auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);
+
+ auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);
+
+ return RemoteCallVariables{storage, new_context};
+}
+
+SinkToStoragePtr IStorageCluster::write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & metadata_snapshot,
+ ContextPtr context,
+ bool async_insert)
+{
+ auto cluster_name_from_settings = getClusterName(context);
+
+ if (cluster_name_from_settings.empty())
+ return writeFallBackToPure(query, metadata_snapshot, context, async_insert);
+
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method write is not supported by storage {}", getName());
+}
+
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
@@ -278,9 +384,9 @@ ContextPtr ReadFromCluster::updateSettings(const Settings & settings)
return new_context;
}
-ClusterPtr IStorageCluster::getCluster(ContextPtr context) const
+ClusterPtr IStorageCluster::getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts)
{
- return context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
+ return context->getCluster(cluster_name_)->getClusterWithReplicasAsShards(context->getSettingsRef(), /* max_replicas_from_shard */ 0, max_hosts);
}
}
diff --git a/src/Storages/IStorageCluster.h b/src/Storages/IStorageCluster.h
index 3248b26b8c5e..ac266bf82da7 100644
--- a/src/Storages/IStorageCluster.h
+++ b/src/Storages/IStorageCluster.h
@@ -30,10 +30,16 @@ class IStorageCluster : public IStorage
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
- size_t /*max_block_size*/,
- size_t /*num_streams*/) override;
+ size_t max_block_size,
+ size_t num_streams) override;
- ClusterPtr getCluster(ContextPtr context) const;
+ SinkToStoragePtr write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & metadata_snapshot,
+ ContextPtr context,
+ bool async_insert) override;
+
+ ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
@@ -51,13 +57,53 @@ class IStorageCluster : public IStorage
bool supportsOptimizationToSubcolumns() const override { return false; }
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override { return true; }
+ const String & getOriginalClusterName() const { return cluster_name; }
+ virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
+
protected:
- virtual void updateBeforeRead(const ContextPtr &) {}
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
virtual void updateConfigurationIfNeeded(ContextPtr /* context */) {}
+ struct RemoteCallVariables
+ {
+ StoragePtr storage;
+ ContextPtr context;
+ };
+
+ RemoteCallVariables convertToRemote(
+ ClusterPtr cluster,
+ ContextPtr context,
+ const std::string & cluster_name_from_settings,
+ ASTPtr query_to_send);
+
+ virtual void readFallBackToPure(
+ QueryPlan & /* query_plan */,
+ const Names & /* column_names */,
+ const StorageSnapshotPtr & /* storage_snapshot */,
+ SelectQueryInfo & /* query_info */,
+ ContextPtr /* context */,
+ QueryProcessingStage::Enum /* processed_stage */,
+ size_t /* max_block_size */,
+ size_t /* num_streams */)
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method readFallBackToPure is not supported by storage {}", getName());
+ }
+
+ virtual SinkToStoragePtr writeFallBackToPure(
+ const ASTPtr & /*query*/,
+ const StorageMetadataPtr & /*metadata_snapshot*/,
+ ContextPtr /*context*/,
+ bool /*async_insert*/)
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method writeFallBackToPure is not supported by storage {}", getName());
+ }
+
private:
+ static ClusterPtr getClusterImpl(ContextPtr context, const String & cluster_name_, size_t max_hosts = 0);
+
+ virtual bool isClusterSupported() const { return true; }
+
LoggerPtr log;
String cluster_name;
};
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.cpp b/src/Storages/ObjectStorage/Azure/Configuration.cpp
index 2a612579445b..f2527c045b8c 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.cpp
+++ b/src/Storages/ObjectStorage/Azure/Configuration.cpp
@@ -65,6 +65,7 @@ const std::unordered_set optional_configuration_keys = {
"partition_columns_in_data_file",
"client_id",
"tenant_id",
+ "storage_type",
};
void StorageAzureConfiguration::check(ContextPtr context)
@@ -208,10 +209,6 @@ void AzureStorageParsedArguments::fromNamedCollection(const NamedCollection & co
String connection_url;
String container_name;
- std::optional account_name;
- std::optional account_key;
- std::optional client_id;
- std::optional tenant_id;
if (collection.has("connection_string"))
connection_url = collection.get("connection_string");
@@ -392,16 +389,10 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
std::unordered_map engine_args_to_idx;
-
String connection_url = checkAndGetLiteralArgument(engine_args[0], "connection_string/storage_account_url");
String container_name = checkAndGetLiteralArgument(engine_args[1], "container");
blob_path = checkAndGetLiteralArgument(engine_args[2], "blobpath");
- std::optional account_name;
- std::optional account_key;
- std::optional client_id;
- std::optional tenant_id;
-
collectCredentials(extra_credentials, client_id, tenant_id, context);
auto is_format_arg = [] (const std::string & s) -> bool
@@ -451,8 +442,7 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
auto sixth_arg = checkAndGetLiteralArgument(engine_args[5], "partition_strategy/structure");
if (magic_enum::enum_contains(sixth_arg, magic_enum::case_insensitive))
{
- partition_strategy_type
- = magic_enum::enum_cast(sixth_arg, magic_enum::case_insensitive).value();
+ partition_strategy_type = magic_enum::enum_cast(sixth_arg, magic_enum::case_insensitive).value();
}
else
{
@@ -572,8 +562,7 @@ void AzureStorageParsedArguments::fromAST(ASTs & engine_args, ContextPtr context
auto eighth_arg = checkAndGetLiteralArgument(engine_args[7], "partition_strategy/structure");
if (magic_enum::enum_contains(eighth_arg, magic_enum::case_insensitive))
{
- partition_strategy_type
- = magic_enum::enum_cast(eighth_arg, magic_enum::case_insensitive).value();
+ partition_strategy_type = magic_enum::enum_cast(eighth_arg, magic_enum::case_insensitive).value();
}
else
{
@@ -825,6 +814,26 @@ void StorageAzureConfiguration::initializeFromParsedArguments(const AzureStorage
StorageObjectStorageConfiguration::initializeFromParsedArguments(parsed_arguments);
blob_path = parsed_arguments.blob_path;
connection_params = parsed_arguments.connection_params;
+ account_name = parsed_arguments.account_name;
+ account_key = parsed_arguments.account_key;
+ client_id = parsed_arguments.client_id;
+ tenant_id = parsed_arguments.tenant_id;
+}
+
+ASTPtr StorageAzureConfiguration::createArgsWithAccessData() const
+{
+ auto arguments = make_intrusive();
+
+ arguments->children.push_back(make_intrusive(connection_params.endpoint.storage_account_url));
+ arguments->children.push_back(make_intrusive(connection_params.endpoint.container_name));
+ arguments->children.push_back(make_intrusive(blob_path.path));
+ if (account_name && account_key)
+ {
+ arguments->children.push_back(make_intrusive(*account_name));
+ arguments->children.push_back(make_intrusive(*account_key));
+ }
+
+ return arguments;
}
void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
@@ -832,13 +841,13 @@ void StorageAzureConfiguration::addStructureAndFormatToArgsIfNeeded(
{
if (disk)
{
- if (format == "auto")
+ if (getFormat() == "auto")
{
ASTs format_equal_func_args = {make_intrusive("format"), make_intrusive(format_)};
auto format_equal_func = makeASTFunction("equals", std::move(format_equal_func_args));
args.push_back(format_equal_func);
}
- if (structure == "auto")
+ if (getStructure() == "auto")
{
ASTs structure_equal_func_args = {make_intrusive("structure"), make_intrusive(structure_)};
auto structure_equal_func = makeASTFunction("equals", std::move(structure_equal_func_args));
diff --git a/src/Storages/ObjectStorage/Azure/Configuration.h b/src/Storages/ObjectStorage/Azure/Configuration.h
index 61618e3b8340..0bdef02a11f6 100644
--- a/src/Storages/ObjectStorage/Azure/Configuration.h
+++ b/src/Storages/ObjectStorage/Azure/Configuration.h
@@ -76,6 +76,11 @@ struct AzureStorageParsedArguments : private StorageParsedArguments
Path blob_path;
AzureBlobStorage::ConnectionParams connection_params;
+
+ std::optional account_name;
+ std::optional account_key;
+ std::optional client_id;
+ std::optional tenant_id;
};
class StorageAzureConfiguration : public StorageObjectStorageConfiguration
@@ -124,6 +129,7 @@ class StorageAzureConfiguration : public StorageObjectStorageConfiguration
onelake_client_secret = client_secret_;
onelake_tenant_id = tenant_id_;
}
+ ASTPtr createArgsWithAccessData() const override;
protected:
void fromDisk(const String & disk_name, ASTs & args, ContextPtr context, bool with_structure) override;
@@ -135,14 +141,21 @@ class StorageAzureConfiguration : public StorageObjectStorageConfiguration
Path blob_path;
Paths blobs_paths;
AzureBlobStorage::ConnectionParams connection_params;
- DiskPtr disk;
+
+ std::optional account_name;
+ std::optional account_key;
+ std::optional client_id;
+ std::optional tenant_id;
String onelake_client_id;
String onelake_client_secret;
String onelake_tenant_id;
+ DiskPtr disk;
+
void initializeFromParsedArguments(const AzureStorageParsedArguments & parsed_arguments);
};
+
}
#endif
diff --git a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
index 2225bc756634..54f5cadcd0e2 100644
--- a/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
+++ b/src/Storages/ObjectStorage/DataLakes/Common/AvroForIcebergDeserializer.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
namespace DB::ErrorCodes
{
@@ -19,6 +20,12 @@ namespace DB::ErrorCodes
extern const int INCORRECT_DATA;
}
+namespace ProfileEvents
+{
+ extern const Event IcebergAvroFileParsing;
+ extern const Event IcebergAvroFileParsingMicroseconds;
+}
+
namespace DB::Iceberg
{
@@ -30,6 +37,9 @@ try
: buffer(std::move(buffer_))
, manifest_file_path(manifest_file_path_)
{
+ ProfileEvents::increment(ProfileEvents::IcebergAvroFileParsing);
+ ProfileEventTimeIncrement watch(ProfileEvents::IcebergAvroFileParsingMicroseconds);
+
auto manifest_file_reader
= std::make_unique(std::make_unique(*buffer));
diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
index 0dc4e6b7653d..2669f70b5ce1 100644
--- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
+++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
@@ -2,6 +2,7 @@
#include "config.h"
#include
+#include
#include
#include
#include
@@ -12,11 +13,17 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
#include
+#include
+#include
+#include
+#include
+#include
+
#include
#include
@@ -47,28 +54,31 @@ namespace ErrorCodes
namespace DataLakeStorageSetting
{
- extern DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
- extern DataLakeStorageSettingsString object_storage_endpoint;
- extern DataLakeStorageSettingsString storage_aws_access_key_id;
- extern DataLakeStorageSettingsString storage_aws_secret_access_key;
- extern DataLakeStorageSettingsString storage_region;
- extern DataLakeStorageSettingsString storage_catalog_url;
- extern DataLakeStorageSettingsString storage_warehouse;
- extern DataLakeStorageSettingsString storage_catalog_credential;
-
- extern DataLakeStorageSettingsString storage_auth_scope;
- extern DataLakeStorageSettingsString storage_auth_header;
- extern DataLakeStorageSettingsString storage_oauth_server_uri;
- extern DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
+ extern const DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
+ extern const DataLakeStorageSettingsString object_storage_endpoint;
+ extern const DataLakeStorageSettingsString storage_aws_access_key_id;
+ extern const DataLakeStorageSettingsString storage_aws_secret_access_key;
+ extern const DataLakeStorageSettingsString storage_region;
+ extern const DataLakeStorageSettingsString storage_catalog_url;
+ extern const DataLakeStorageSettingsString storage_warehouse;
+ extern const DataLakeStorageSettingsString storage_catalog_credential;
+
+ extern const DataLakeStorageSettingsString storage_auth_scope;
+ extern const DataLakeStorageSettingsString storage_auth_header;
+ extern const DataLakeStorageSettingsString storage_oauth_server_uri;
+ extern const DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
+ extern const DataLakeStorageSettingsString iceberg_metadata_file_path;
}
template
concept StorageConfiguration = std::derived_from;
-template
+template
class DataLakeConfiguration : public BaseStorageConfiguration, public std::enable_shared_from_this
{
public:
+ DataLakeConfiguration() {}
+
explicit DataLakeConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {}
bool isDataLakeConfiguration() const override { return true; }
@@ -124,13 +134,13 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool supportsDelete() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsDelete();
}
bool supportsParallelInsert() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsParallelInsert();
}
@@ -141,25 +151,25 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::shared_ptr catalog,
const std::optional & format_settings) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->mutate(commands, shared_from_this(), context, storage_id, metadata_snapshot, catalog, format_settings);
}
void checkMutationIsPossible(const MutationCommands & commands) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->checkMutationIsPossible(commands);
}
void checkAlterIsPossible(const AlterCommands & commands) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->checkAlterIsPossible(commands);
}
void alter(const AlterCommands & params, ContextPtr context) override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->alter(params, context);
}
@@ -173,7 +183,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::optional tryGetTableStructureFromMetadata(ContextPtr local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
if (auto schema = current_metadata->getTableSchema(local_context); !schema.empty())
return ColumnsDescription(std::move(schema));
return std::nullopt;
@@ -181,37 +191,37 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
std::optional totalRows(ContextPtr local_context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->totalRows(local_context);
}
std::optional totalBytes(ContextPtr local_context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->totalBytes(local_context);
}
bool isDataSortedBySortingKey(StorageMetadataPtr metadata_snapshot, ContextPtr local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->isDataSortedBySortingKey(metadata_snapshot, local_context);
}
std::shared_ptr getInitialSchemaByPath(ContextPtr local_context, ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getInitialSchemaByPath(local_context, object_info);
}
std::shared_ptr getSchemaTransformer(ContextPtr local_context, ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getSchemaTransformer(local_context, object_info);
}
StorageInMemoryMetadata getStorageSnapshotMetadata(ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getStorageSnapshotMetadata(context);
}
@@ -226,7 +236,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
IDataLakeMetadata * getExternalMetadata() override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata.get();
}
@@ -234,7 +244,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool supportsWrites() const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->supportsWrites();
}
@@ -245,7 +255,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
StorageMetadataPtr storage_metadata,
ContextPtr context) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->iterate(filter_dag, callback, list_batch_size, storage_metadata, context);
}
@@ -257,7 +267,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
/// because the code will be removed ASAP anyway)
DeltaLakePartitionColumns getDeltaLakePartitionColumns() const
{
- assertInitialized();
+ assertInitializedDL();
const auto * delta_lake_metadata = dynamic_cast(current_metadata.get());
if (delta_lake_metadata)
return delta_lake_metadata->getPartitionColumns();
@@ -267,18 +277,18 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
void modifyFormatSettings(FormatSettings & settings_, const Context & local_context) const override
{
- assertInitialized();
+ assertInitializedDL();
current_metadata->modifyFormatSettings(settings_, local_context);
}
ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getColumnMapperForObject(object_info);
}
ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->getColumnMapperForCurrentSchema(storage_metadata_snapshot, context);
}
@@ -347,7 +357,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override
{
- assertInitialized();
+ assertInitializedDL();
return current_metadata->optimize(metadata_snapshot, context, format_settings);
}
@@ -366,14 +376,53 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
ready_object_storage = disk->getObjectStorage();
}
+ bool isClusterSupported() const override { return is_cluster_supported; }
+
+ ASTPtr createArgsWithAccessData() const override
+ {
+ auto res = BaseStorageConfiguration::createArgsWithAccessData();
+
+ auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path];
+
+ if (iceberg_metadata_file_path.changed)
+ {
+ auto * arguments = res->template as();
+ if (!arguments)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list");
+
+ bool has_settings = false;
+
+ for (auto & arg : arguments->children)
+ {
+ if (auto * settings_ast = arg->template as())
+ {
+ has_settings = true;
+ settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
+ break;
+ }
+ }
+
+ if (!has_settings)
+ {
+ boost::intrusive_ptr settings_ast = make_intrusive();
+ settings_ast->is_standalone = false;
+ settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
+ arguments->children.push_back(settings_ast);
+ }
+ }
+
+ return res;
+ }
+
private:
DataLakeMetadataPtr current_metadata;
LoggerPtr log = getLogger("DataLakeConfiguration");
const DataLakeStorageSettingsPtr settings;
ObjectStoragePtr ready_object_storage;
- void assertInitialized() const
+ void assertInitializedDL() const
{
+ BaseStorageConfiguration::assertInitialized();
if (!current_metadata)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Metadata is not initialized");
}
@@ -406,18 +455,379 @@ using StorageS3IcebergConfiguration = DataLakeConfiguration;
#endif
-#if USE_AZURE_BLOB_STORAGE
+# if USE_AZURE_BLOB_STORAGE
using StorageAzureIcebergConfiguration = DataLakeConfiguration;
using StorageAzurePaimonConfiguration = DataLakeConfiguration;
#endif
-#if USE_HDFS
+# if USE_HDFS
using StorageHDFSIcebergConfiguration = DataLakeConfiguration;
using StorageHDFSPaimonConfiguration = DataLakeConfiguration;
#endif
using StorageLocalIcebergConfiguration = DataLakeConfiguration;
-using StorageLocalPaimonConfiguration = DataLakeConfiguration;
+using StorageLocalPaimonConfiguration = DataLakeConfiguration;
+
+/// Class detects storage type by `storage_type` parameter if exists
+/// and uses appropriate implementation - S3, Azure, HDFS or Local
+class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, public std::enable_shared_from_this
+{
+ friend class StorageObjectStorageConfiguration;
+
+public:
+ StorageIcebergConfiguration() {}
+
+ explicit StorageIcebergConfiguration(DataLakeStorageSettingsPtr settings_) : settings(settings_) {}
+
+ void initialize(
+ ASTs & engine_args,
+ ContextPtr local_context,
+ bool with_table_structure) override
+ {
+ createDynamicConfiguration(engine_args, local_context);
+ getImpl().initialize(engine_args, local_context, with_table_structure);
+ }
+
+ ObjectStorageType getType() const override { return getImpl().getType(); }
+
+ std::string getTypeName() const override { return getImpl().getTypeName(); }
+ std::string getEngineName() const override { return getImpl().getEngineName(); }
+ std::string getNamespaceType() const override { return getImpl().getNamespaceType(); }
+
+ Path getRawPath() const override { return getImpl().getRawPath(); }
+ const String & getRawURI() const override { return getImpl().getRawURI(); }
+ const Path & getPathForRead() const override { return getImpl().getPathForRead(); }
+ Path getPathForWrite(const std::string & partition_id) const override { return getImpl().getPathForWrite(partition_id); }
+
+ void setPathForRead(const Path & path) override { getImpl().setPathForRead(path); }
+
+ const Paths & getPaths() const override { return getImpl().getPaths(); }
+ void setPaths(const Paths & paths) override { getImpl().setPaths(paths); }
+
+ String getDataSourceDescription() const override { return getImpl().getDataSourceDescription(); }
+ String getNamespace() const override { return getImpl().getNamespace(); }
+
+ StorageObjectStorageQuerySettings getQuerySettings(const ContextPtr & context) const override
+ { return getImpl().getQuerySettings(context); }
+
+ void addStructureAndFormatToArgsIfNeeded(
+ ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) override
+ { getImpl().addStructureAndFormatToArgsIfNeeded(args, structure_, format_, context, with_structure); }
+
+ bool isNamespaceWithGlobs() const override { return getImpl().isNamespaceWithGlobs(); }
+
+ bool isArchive() const override { return getImpl().isArchive(); }
+ bool isPathInArchiveWithGlobs() const override { return getImpl().isPathInArchiveWithGlobs(); }
+ std::string getPathInArchive() const override { return getImpl().getPathInArchive(); }
+
+ void check(ContextPtr context) override { getImpl().check(context); }
+ void validateNamespace(const String & name) const override { getImpl().validateNamespace(name); }
+
+ ObjectStoragePtr createObjectStorage(ContextPtr context, bool is_readonly) override
+ { return getImpl().createObjectStorage(context, is_readonly); }
+ bool isStaticConfiguration() const override { return getImpl().isStaticConfiguration(); }
+
+ bool isDataLakeConfiguration() const override { return getImpl().isDataLakeConfiguration(); }
+
+ std::optional totalRows(ContextPtr context) override { return getImpl().totalRows(context); }
+ std::optional totalBytes(ContextPtr context) override { return getImpl().totalBytes(context); }
+ bool isDataSortedBySortingKey(StorageMetadataPtr storage_metadata, ContextPtr context) const override
+ { return getImpl().isDataSortedBySortingKey(storage_metadata, context); }
+
+ bool needsUpdateForSchemaConsistency() const override { return getImpl().needsUpdateForSchemaConsistency(); }
+
+ IDataLakeMetadata * getExternalMetadata() override { return getImpl().getExternalMetadata(); }
+
+ std::shared_ptr getInitialSchemaByPath(ContextPtr context, ObjectInfoPtr object_info) const override
+ { return getImpl().getInitialSchemaByPath(context, object_info); }
+
+ std::shared_ptr getSchemaTransformer(ContextPtr context, ObjectInfoPtr object_info) const override
+ { return getImpl().getSchemaTransformer(context, object_info); }
+
+ void modifyFormatSettings(FormatSettings & settings_, const Context & context) const override
+ { getImpl().modifyFormatSettings(settings_, context); }
+
+ void addDeleteTransformers(
+ ObjectInfoPtr object_info,
+ QueryPipelineBuilder & builder,
+ const std::optional & format_settings,
+ FormatParserSharedResourcesPtr parser_shared_resources,
+ ContextPtr local_context) const override
+ { getImpl().addDeleteTransformers(object_info, builder, format_settings, parser_shared_resources, local_context); }
+
+ ReadFromFormatInfo prepareReadingFromFormat(
+ ObjectStoragePtr object_storage,
+ const Strings & requested_columns,
+ const StorageSnapshotPtr & storage_snapshot,
+ bool supports_subset_of_columns,
+ bool supports_tuple_elements,
+ ContextPtr local_context,
+ const PrepareReadingFromFormatHiveParams & hive_parameters) override
+ {
+ return getImpl().prepareReadingFromFormat(
+ object_storage,
+ requested_columns,
+ storage_snapshot,
+ supports_subset_of_columns,
+ supports_tuple_elements,
+ local_context,
+ hive_parameters);
+ }
+
+ void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context) override
+ { getImpl().initPartitionStrategy(partition_by, columns, context); }
+
+ StorageInMemoryMetadata getStorageSnapshotMetadata(ContextPtr local_context) const override
+ { return getImpl().getStorageSnapshotMetadata(local_context); }
+ std::optional tryGetTableStructureFromMetadata(ContextPtr local_context) const override
+ { return getImpl().tryGetTableStructureFromMetadata(local_context); }
+
+ bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); }
+ bool supportsParallelInsert() const override { return getImpl().supportsParallelInsert(); }
+ bool supportsWrites() const override { return getImpl().supportsWrites(); }
+
+ bool supportsPartialPathPrefix() const override { return getImpl().supportsPartialPathPrefix(); }
+
+ ObjectIterator iterate(
+ const ActionsDAG * filter_dag,
+ IDataLakeMetadata::FileProgressCallback callback,
+ size_t list_batch_size,
+ StorageMetadataPtr storage_metadata,
+ ContextPtr context) override
+ {
+ return getImpl().iterate(filter_dag, callback, list_batch_size, storage_metadata, context);
+ }
+
+ void update(
+ ObjectStoragePtr object_storage_ptr,
+ ContextPtr context,
+ bool if_not_updated_before) override
+ {
+ getImpl().update(object_storage_ptr, context, if_not_updated_before);
+ }
+
+ void create(
+ ObjectStoragePtr object_storage,
+ ContextPtr local_context,
+ const std::optional & columns,
+ ASTPtr partition_by,
+ ASTPtr order_by,
+ bool if_not_exists,
+ std::shared_ptr catalog,
+ const StorageID & table_id_) override
+ {
+ getImpl().create(object_storage, local_context, columns, partition_by, order_by, if_not_exists, catalog, table_id_);
+ }
+
+ SinkToStoragePtr write(
+ SharedHeader sample_block,
+ const StorageID & table_id,
+ ObjectStoragePtr object_storage,
+ const std::optional & format_settings,
+ ContextPtr context,
+ std::shared_ptr catalog) override
+ {
+ return getImpl().write(sample_block, table_id, object_storage, format_settings, context, catalog);
+ }
+
+ bool supportsDelete() const override { return getImpl().supportsDelete(); }
+ void mutate(const MutationCommands & commands,
+ ContextPtr context,
+ const StorageID & storage_id,
+ StorageMetadataPtr metadata_snapshot,
+ std::shared_ptr catalog,
+ const std::optional & format_settings) override
+ {
+ getImpl().mutate(commands, context, storage_id, metadata_snapshot, catalog, format_settings);
+ }
+ void checkMutationIsPossible(const MutationCommands & commands) override { getImpl().checkMutationIsPossible(commands); }
+
+ void checkAlterIsPossible(const AlterCommands & commands) override { getImpl().checkAlterIsPossible(commands); }
+
+ void alter(const AlterCommands & params, ContextPtr context) override { getImpl().alter(params, context); }
+
+ const DataLakeStorageSettings & getDataLakeSettings() const override { return getImpl().getDataLakeSettings(); }
+
+ ASTPtr createArgsWithAccessData() const override
+ {
+ return getImpl().createArgsWithAccessData();
+ }
+
+ void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override
+ { getImpl().fromNamedCollection(collection, context); }
+ void fromAST(ASTs & args, ContextPtr context, bool with_structure) override
+ { getImpl().fromAST(args, context, with_structure); }
+ void fromDisk(const String & disk_name, ASTs & args, ContextPtr context, bool with_structure) override
+ { getImpl().fromDisk(disk_name, args, context, with_structure); }
+
+ /// Find storage_type argument and remove it from args if exists.
+ /// Return storage type.
+ ObjectStorageType extractDynamicStorageType(ASTs & args, ContextPtr context, ASTPtr * type_arg, bool cluster_name_first) const override
+ {
+ static const auto * const storage_type_name = "storage_type";
+
+ {
+ auto args_copy = args;
+ if (cluster_name_first)
+ {
+ // Remove cluster name from args to avoid confusing cluster name and named collection name
+ args_copy.erase(args_copy.begin());
+ }
+
+ if (auto named_collection = tryGetNamedCollectionWithOverrides(args_copy, context))
+ {
+ if (named_collection->has(storage_type_name))
+ {
+ return objectStorageTypeFromString(named_collection->get(storage_type_name));
+ }
+ }
+ }
+
+ auto type_it = args.end();
+
+ /// S3 by default for backward compatibility
+ /// Iceberg without storage_type == IcebergS3
+ ObjectStorageType type = ObjectStorageType::S3;
+
+ for (auto arg_it = args.begin(); arg_it != args.end(); ++arg_it)
+ {
+ const auto * type_ast_function = (*arg_it)->as();
+
+ if (type_ast_function && type_ast_function->name == "equals"
+ && type_ast_function->arguments && type_ast_function->arguments->children.size() == 2)
+ {
+ auto * name = type_ast_function->arguments->children[0]->as();
+
+ if (name && name->name() == storage_type_name)
+ {
+ if (type_it != args.end())
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "DataLake can have only one key-value argument: storage_type='type'.");
+ }
+
+ auto * value = type_ast_function->arguments->children[1]->as();
+
+ if (!value)
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "DataLake parameter 'storage_type' has wrong type, string literal expected.");
+ }
+
+ if (value->value.getType() != Field::Types::String)
+ {
+ throw Exception(
+ ErrorCodes::BAD_ARGUMENTS,
+ "DataLake parameter 'storage_type' has wrong value type, string expected.");
+ }
+
+ type = objectStorageTypeFromString(value->value.safeGet());
+
+ type_it = arg_it;
+ }
+ }
+ }
+
+ if (type_it != args.end())
+ {
+ if (type_arg)
+ *type_arg = *type_it;
+ args.erase(type_it);
+ }
+
+ return type;
+ }
+
+ const String & getFormat() const override { return getImpl().getFormat(); }
+ const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); }
+ const String & getStructure() const override { return getImpl().getStructure(); }
+
+ PartitionStrategyFactory::StrategyType getPartitionStrategyType() const override { return getImpl().getPartitionStrategyType(); }
+ bool getPartitionColumnsInDataFile() const override { return getImpl().getPartitionColumnsInDataFile(); }
+ std::shared_ptr getPartitionStrategy() const override { return getImpl().getPartitionStrategy(); }
+
+ void setFormat(const String & format_) override { getImpl().setFormat(format_); }
+ void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); }
+ void setStructure(const String & structure_) override { getImpl().setStructure(structure_); }
+
+ void setPartitionStrategyType(PartitionStrategyFactory::StrategyType partition_strategy_type_) override
+ { getImpl().setPartitionStrategyType(partition_strategy_type_); }
+ void setPartitionColumnsInDataFile(bool partition_columns_in_data_file_) override
+ { getImpl().setPartitionColumnsInDataFile(partition_columns_in_data_file_); }
+ void setPartitionStrategy(const std::shared_ptr & partition_strategy_) override
+ { getImpl().setPartitionStrategy(partition_strategy_); }
+
+ void assertInitialized() const override { getImpl().assertInitialized(); }
+
+ ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr obj) const override { return getImpl().getColumnMapperForObject(obj); }
+
+ ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override
+ { return getImpl().getColumnMapperForCurrentSchema(storage_metadata_snapshot, context); }
+
+ std::shared_ptr getCatalog(ContextPtr context, bool is_attach) const override
+ { return getImpl().getCatalog(context, is_attach); }
+
+ bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional & format_settings) override
+ { return getImpl().optimize(metadata_snapshot, context, format_settings); }
+
+ void drop(ContextPtr context) override { getImpl().drop(context); }
+
+protected:
+ void createDynamicConfiguration(ASTs & args, ContextPtr context)
+ {
+ ObjectStorageType type = extractDynamicStorageType(args, context, nullptr, false);
+ createDynamicStorage(type);
+ }
+
+private:
+ inline StorageObjectStorageConfiguration & getImpl() const
+ {
+ if (!impl)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Dynamic DataLake storage not initialized");
+
+ return *impl;
+ }
+
+ void createDynamicStorage(ObjectStorageType type)
+ {
+ if (impl)
+ {
+ if (impl->getType() == type)
+ return;
+
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't change datalake engine storage");
+ }
+
+ switch (type)
+ {
+# if USE_AWS_S3
+ case ObjectStorageType::S3:
+ impl = std::make_unique(settings);
+ break;
+# endif
+# if USE_AZURE_BLOB_STORAGE
+ case ObjectStorageType::Azure:
+ impl = std::make_unique(settings);
+ break;
+# endif
+# if USE_HDFS
+ case ObjectStorageType::HDFS:
+ impl = std::make_unique(settings);
+ break;
+# endif
+ case ObjectStorageType::Local:
+ impl = std::make_unique