diff --git a/contrib/curl-cmake/CMakeLists.txt b/contrib/curl-cmake/CMakeLists.txt index 77dd32ea2a6c..47caa9292e9a 100644 --- a/contrib/curl-cmake/CMakeLists.txt +++ b/contrib/curl-cmake/CMakeLists.txt @@ -109,6 +109,7 @@ set (SRCS "${LIBRARY_DIR}/lib/sendf.c" "${LIBRARY_DIR}/lib/setopt.c" "${LIBRARY_DIR}/lib/sha256.c" + "${LIBRARY_DIR}/lib/curl_share.c" "${LIBRARY_DIR}/lib/slist.c" "${LIBRARY_DIR}/lib/smb.c" "${LIBRARY_DIR}/lib/smtp.c" @@ -169,6 +170,7 @@ set (SRCS "${LIBRARY_DIR}/lib/curlx/dynbuf.c" "${LIBRARY_DIR}/lib/curlx/fopen.c" "${LIBRARY_DIR}/lib/curlx/inet_ntop.c" + "${LIBRARY_DIR}/lib/curlx/strcopy.c" "${LIBRARY_DIR}/lib/curlx/inet_pton.c" "${LIBRARY_DIR}/lib/curlx/multibyte.c" "${LIBRARY_DIR}/lib/curlx/nonblock.c" diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 5548ba6d916f..c163bdd6c93a 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -52,6 +52,8 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString aws_access_key_id; extern const DatabaseDataLakeSettingsString aws_secret_access_key; extern const DatabaseDataLakeSettingsString region; + extern const DatabaseDataLakeSettingsString aws_role_arn; + extern const DatabaseDataLakeSettingsString aws_role_session_name; } namespace Setting @@ -121,6 +123,8 @@ std::shared_ptr DatabaseDataLake::getCatalog() const .aws_access_key_id = settings[DatabaseDataLakeSetting::aws_access_key_id].value, .aws_secret_access_key = settings[DatabaseDataLakeSetting::aws_secret_access_key].value, .region = settings[DatabaseDataLakeSetting::region].value, + .aws_role_arn = settings[DatabaseDataLakeSetting::aws_role_arn].value, + .aws_role_session_name = settings[DatabaseDataLakeSetting::aws_role_session_name].value, }; switch (settings[DatabaseDataLakeSetting::catalog_type].value) diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index 7a6b4c07b599..588d8e47ee8f 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -18,16 +18,18 @@ namespace ErrorCodes #define DATABASE_ICEBERG_RELATED_SETTINGS(DECLARE, ALIAS) \ DECLARE(DatabaseDataLakeCatalogType, catalog_type, DatabaseDataLakeCatalogType::NONE, "Catalog type", 0) \ - DECLARE(String, catalog_credential, "", "", 0) \ - DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \ - DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \ - DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \ - DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \ - DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ - DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ - DECLARE(String, aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \ - DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ - DECLARE(String, region, "", "Region for Glue catalog", 0) \ + DECLARE(String, catalog_credential, "", "", 0) \ + DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \ + DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \ + DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \ + DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \ + DECLARE(String, warehouse, "", "Warehouse name inside the catalog", 0) \ + DECLARE(String, auth_header, "", "Authorization header of format 'Authorization: '", 0) \ + DECLARE(String, aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \ + DECLARE(String, aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ + DECLARE(String, region, "", "Region for Glue catalog", 0) \ + DECLARE(String, aws_role_arn, "", "Role arn for AWS connection for Glue catalog", 0) \ + DECLARE(String, aws_role_session_name, "", "Role session name for AWS connection for Glue catalog", 0) \ DECLARE(String, storage_endpoint, "", "Object storage endpoint", 0) \ #define LIST_OF_DATABASE_ICEBERG_SETTINGS(M, ALIAS) \ diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index 59a256d581a4..e4287641aa76 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -66,11 +66,6 @@ namespace DB::Setting extern const SettingsUInt64 s3_request_timeout_ms; } -namespace DB::StorageObjectStorageSetting -{ - extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; -} - namespace DB::DatabaseDataLakeSetting { extern const DatabaseDataLakeSettingsString storage_endpoint; @@ -96,7 +91,6 @@ GlueCatalog::GlueCatalog( : ICatalog("") , DB::WithContext(context_) , log(getLogger("GlueCatalog(" + settings_.region + ")")) - , credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key) , region(settings_.region) , settings(settings_) , table_engine_definition(table_engine_definition_) @@ -104,6 +98,8 @@ GlueCatalog::GlueCatalog( { DB::S3::CredentialsConfiguration creds_config; creds_config.use_environment_credentials = true; + creds_config.role_arn = settings.aws_role_arn; + creds_config.role_session_name = settings.aws_role_session_name; const DB::Settings & global_settings = getContext()->getGlobalContext()->getSettingsRef(); @@ -126,6 +122,7 @@ GlueCatalog::GlueCatalog( /* get_request_throttler = */ nullptr, /* put_request_throttler = */ nullptr); + Aws::Glue::GlueClientConfiguration client_configuration; client_configuration.maxConnections = static_cast(global_settings[DB::Setting::s3_max_connections]); client_configuration.connectTimeoutMs = static_cast(global_settings[DB::Setting::s3_connect_timeout_ms]); @@ -133,31 +130,33 @@ GlueCatalog::GlueCatalog( client_configuration.region = region; auto endpoint_provider = std::make_shared(); + Aws::Auth::AWSCredentials credentials(settings_.aws_access_key_id, settings_.aws_secret_access_key); /// Only for testing when we are mocking glue if (!endpoint.empty()) { client_configuration.endpointOverride = endpoint; endpoint_provider->OverrideEndpoint(endpoint); - Aws::Auth::AWSCredentials fake_credentials_for_fake_catalog; + if (credentials.IsEmpty()) { /// You can specify any key for fake moto glue, it's just important /// for it not to be empty. - fake_credentials_for_fake_catalog.SetAWSAccessKeyId("testing"); - fake_credentials_for_fake_catalog.SetAWSSecretKey("testing"); + credentials.SetAWSAccessKeyId("testing"); + credentials.SetAWSSecretKey("testing"); } - else - fake_credentials_for_fake_catalog = credentials; - glue_client = std::make_unique(fake_credentials_for_fake_catalog, endpoint_provider, client_configuration); + Poco::URI uri(endpoint); + if (uri.getScheme() == "http") + poco_config.scheme = Aws::Http::Scheme::HTTP; } else { LOG_TRACE(log, "Creating AWS glue client with credentials empty {}, region '{}', endpoint '{}'", credentials.IsEmpty(), region, endpoint); - std::shared_ptr chain = std::make_shared(poco_config, credentials, creds_config); - glue_client = std::make_unique(chain, endpoint_provider, client_configuration); } + std::shared_ptr chain = std::make_shared(poco_config, credentials, creds_config); + credentials_provider = chain; + glue_client = std::make_unique(chain, endpoint_provider, client_configuration); } GlueCatalog::~GlueCatalog() = default; @@ -282,7 +281,6 @@ bool GlueCatalog::tryGetTableMetadata( request.SetDatabaseName(database_name); request.SetName(table_name); - auto outcome = glue_client->GetTable(request); if (outcome.IsSuccess()) { @@ -412,8 +410,9 @@ void GlueCatalog::setCredentials(TableMetadata & metadata) const if (storage_type == StorageType::S3) { - auto creds = std::make_shared(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()); - metadata.setStorageCredentials(creds); + auto credentials = credentials_provider->GetAWSCredentials(); + auto s3_creds = std::make_shared(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()); + metadata.setStorageCredentials(s3_creds); } else { @@ -459,7 +458,7 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet DB::ASTs args = storage->engine->arguments->children; String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri; - + if (args.empty()) args.emplace_back(std::make_shared(storage_endpoint)); else @@ -469,8 +468,12 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet { if (table_metadata.hasStorageCredentials()) table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args); - else if (!credentials.IsExpiredOrEmpty()) - DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args); + else + { + auto credentials = credentials_provider->GetAWSCredentials(); + if (!credentials.IsExpiredOrEmpty()) + DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args); + } } auto storage_settings = std::make_shared(); @@ -529,11 +532,17 @@ String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_lo else args[0] = std::make_shared(storage_endpoint); - if (args.size() == 1 && table_metadata.hasStorageCredentials()) + if (args.size() == 1) { - auto storage_credentials = table_metadata.getStorageCredentials(); - if (storage_credentials) - storage_credentials->addCredentialsToEngineArgs(args); + if (table_metadata.hasStorageCredentials()) + { + table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args); + } + else + { + auto credentials = credentials_provider->GetAWSCredentials(); + DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args); + } } auto storage_settings = std::make_shared(); diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index bcecfd2368ca..3d0616a8a8ab 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -17,6 +17,11 @@ namespace Aws::Glue class GlueClient; } +namespace Aws::Auth +{ + class AWSCredentialsProvider; +} + namespace DataLake { @@ -70,7 +75,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext std::unique_ptr glue_client; const LoggerPtr log; - Aws::Auth::AWSCredentials credentials; + std::shared_ptr credentials_provider; std::string region; CatalogSettings settings; DB::ASTPtr table_engine_definition; diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index eceaa73e55b6..0fd82e47951a 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -249,6 +249,8 @@ DB::SettingsChanges CatalogSettings::allChanged() const changes.emplace_back("aws_access_key_id", aws_access_key_id); changes.emplace_back("aws_secret_access_key", aws_secret_access_key); changes.emplace_back("region", region); + changes.emplace_back("aws_role_arn", aws_role_arn); + changes.emplace_back("aws_role_session_name", aws_role_session_name); return changes; } diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index 26964aa36433..a9f63b52a3f9 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -125,6 +125,8 @@ struct CatalogSettings String aws_access_key_id; String aws_secret_access_key; String region; + String aws_role_arn; + String aws_role_session_name; DB::SettingsChanges allChanged() const; }; diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h index 4042793f7301..5d521cb31ff2 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h @@ -52,7 +52,8 @@ namespace DataLakeStorageSetting extern const DataLakeStorageSettingsString storage_catalog_url; extern const DataLakeStorageSettingsString storage_warehouse; extern const DataLakeStorageSettingsString storage_catalog_credential; - + extern DataLakeStorageSettingsString storage_aws_role_arn; + extern DataLakeStorageSettingsString storage_aws_role_session_name; extern const DataLakeStorageSettingsString storage_auth_scope; extern const DataLakeStorageSettingsString storage_auth_header; extern const DataLakeStorageSettingsString storage_oauth_server_uri; @@ -287,6 +288,8 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl .aws_access_key_id = (*settings)[DataLakeStorageSetting::storage_aws_access_key_id].value, .aws_secret_access_key = (*settings)[DataLakeStorageSetting::storage_aws_secret_access_key].value, .region = (*settings)[DataLakeStorageSetting::storage_region].value, + .aws_role_arn = (*settings)[DataLakeStorageSetting::storage_aws_role_arn].value, + .aws_role_session_name = (*settings)[DataLakeStorageSetting::storage_aws_role_session_name].value }; return std::make_shared( diff --git a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h index 5c9ef47f8f01..13563d1dc0f3 100644 --- a/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h +++ b/src/Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h @@ -77,6 +77,8 @@ Metadata format version. DECLARE(String, storage_aws_access_key_id, "", "Key for AWS connection for Glue catalog", 0) \ DECLARE(String, storage_aws_secret_access_key, "", "Key for AWS connection for Glue Catalog'", 0) \ DECLARE(String, storage_region, "", "Region for Glue catalog", 0) \ + DECLARE(String, storage_aws_role_arn, "", "Role arn for AWS connection for Glue catalog", 0) \ + DECLARE(String, storage_aws_role_session_name, "", "Role session name for AWS connection for Glue catalog", 0) \ DECLARE(String, object_storage_endpoint, "", "Object storage endpoint", 0) \ DECLARE(String, storage_catalog_url, "", "Catalog url", 0) \ diff --git a/tests/integration/compose/docker_compose_glue_catalog.yml b/tests/integration/compose/docker_compose_glue_catalog.yml index f97f8aa27550..32b3bbda245a 100644 --- a/tests/integration/compose/docker_compose_glue_catalog.yml +++ b/tests/integration/compose/docker_compose_glue_catalog.yml @@ -44,6 +44,5 @@ services: until (/usr/bin/mc config host add minio http://minio:9000 minio ClickHouse_Minio_P@ssw0rd) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/warehouse-glue; /usr/bin/mc mb minio/warehouse-glue --ignore-existing; - /usr/bin/mc policy set public minio/warehouse-glue; tail -f /dev/null " diff --git a/tests/integration/test_database_glue/s3_mocks/mock_sts.py b/tests/integration/test_database_glue/s3_mocks/mock_sts.py new file mode 100644 index 000000000000..df1ab486d4fd --- /dev/null +++ b/tests/integration/test_database_glue/s3_mocks/mock_sts.py @@ -0,0 +1,42 @@ +import sys +from datetime import datetime, timedelta, timezone + +from bottle import request, response, route, run + +if len(sys.argv) >= 3: + expected_role = sys.argv[2] +else: + expected_role = 'miniorole' + +@route("/") +def ping(): + response.content_type = "text/plain" + response.set_header("Content-Length", 2) + return "OK" + + +@route("/", method="POST") +def sts(): + access_key = "minio" + secret_access_key = "wrong_key" + + if f"RoleSessionName={expected_role}" in str(request.url): + secret_access_key = "ClickHouse_Minio_P@ssw0rd" + + expiration = datetime.now(timezone.utc) + timedelta(hours=1) + expiration_str = expiration.strftime("%Y-%m-%dT%H:%M:%SZ") + + response.content_type = "text/xml" + return f""" + + + + {access_key} + {secret_access_key} + {expiration_str} + + + + """ + +run(host="0.0.0.0", port=int(sys.argv[1])) diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index e5ffe67cc851..30a105536758 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -29,9 +29,20 @@ ) from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm +from helpers.mock_servers import start_mock_servers import boto3 + +def run_s3_mocks(started_cluster): + script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks") + start_mock_servers( + started_cluster, + script_dir, + [("mock_sts.py", "sts.us-east-1.amazonaws.com", "80")], + ) + + CATALOG_NAME = "test" BASE_URL = "http://glue:3000" @@ -182,7 +193,7 @@ def generate_arrow_data(num_rows=5): return table def create_clickhouse_glue_database( - started_cluster, node, name, additional_settings={} + started_cluster, node, name, additional_settings={}, with_credentials=True ): settings = { "catalog_type": "glue", @@ -193,12 +204,14 @@ def create_clickhouse_glue_database( settings.update(additional_settings) + credential_args = f",'{minio_access_key}', '{minio_secret_key}'" if with_credentials else "" + node.query( f""" DROP DATABASE IF EXISTS {name}; SET allow_database_glue_catalog=true; SET write_full_path_in_iceberg_metadata=true; -CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}', '{minio_access_key}', '{minio_secret_key}') +CREATE DATABASE {name} ENGINE = DataLakeCatalog('{BASE_URL}'{credential_args}) SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))} """ ) @@ -251,8 +264,20 @@ def started_cluster(): with_glue_catalog=True, ) + sts = cluster.add_instance( + name="sts.us-east-1.amazonaws.com", + hostname="sts.us-east-1.amazonaws.com", + image="clickhouse/python-bottle", + tag="latest", + stay_alive=True, + ) + sts.stop_clickhouse(kill=True) + logging.info("Starting cluster...") cluster.start() + logging.info("Cluster started") + + run_s3_mocks(cluster) yield cluster @@ -642,3 +667,74 @@ def test_system_tables(started_cluster): assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}') SETTINGS show_data_lake_catalogs_in_system_tables = true").strip()) != 0 assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}')").strip()) != 0 assert int(node.query(f"SELECT count() FROM system.completions WHERE startsWith(word, '{test_ref}') SETTINGS show_data_lake_catalogs_in_system_tables = false").strip()) == 0 + +def test_sts_smoke(started_cluster): + """Test that STS authentication works with Glue catalog using role_arn and role_session_name""" + node = started_cluster.instances["node1"] + + test_ref = f"test_sts_smoke_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=StringType(), required=False), + NestedField(field_id=2, name="value", field_type=DoubleType(), required=False), + ) + table = create_table(catalog, root_namespace, table_name, schema, PartitionSpec(), DEFAULT_SORT_ORDER, dir=table_name) + + data = [ + {"id": "row1", "value": 10.0}, + {"id": "row2", "value": 20.0}, + {"id": "row3", "value": 30.0}, + ] + df = pa.Table.from_pylist(data) + table.append(df) + + # Test with wrong role_session_name - should fail + db_name_fail = f"db_fail_{test_ref.replace('-', '_')}" + create_clickhouse_glue_database( + started_cluster, + node, + db_name_fail, + additional_settings={ + "aws_role_arn": "arn::role", + "aws_role_session_name": "wrongsession", + }, + with_credentials=False, + ) + + # Query should fail with wrong session name + try: + result = node.query( + f"SELECT sum(value) FROM {db_name_fail}.`{root_namespace}.{table_name}` " + f"SETTINGS s3_max_single_read_retries = 1, s3_retry_attempts = 1, s3_request_timeout_ms = 1000" + ) + assert False, f"Expected query to fail with wrong session name but got result: {result}" + except Exception as e: + error_str = str(e) + assert "403" in error_str or "Failed to get object info" in error_str or "HTTP response code: 403" in error_str, \ + f"Expected 403 error but got: {error_str}" + + # Test with correct role_session_name - should succeed + db_name_success = f"db_success_{test_ref.replace('-', '_')}" + create_clickhouse_glue_database( + started_cluster, + node, + db_name_success, + additional_settings={ + "aws_role_arn": "arn::role", + "aws_role_session_name": "miniorole", + }, + with_credentials=False, + ) + + # Query should succeed with correct session name + result = node.query(f"SELECT sum(value) FROM {db_name_success}.`{root_namespace}.{table_name}`") + assert result.strip() == "60", f"Expected sum to be 60 but got: {result}" + + # Cleanup + node.query(f"DROP DATABASE IF EXISTS {db_name_fail} SYNC") + node.query(f"DROP DATABASE IF EXISTS {db_name_success} SYNC")