diff --git a/.github/workflows/api-binary-compatibility.yml b/.github/workflows/api-binary-compatibility.yml index 15cd705b0db2..39e4f8370318 100644 --- a/.github/workflows/api-binary-compatibility.yml +++ b/.github/workflows/api-binary-compatibility.yml @@ -47,7 +47,7 @@ jobs: revapi: runs-on: ubuntu-22.04 env: - GITHUB_ACTOR: ${{ github.actor }} + GITHUB_USERNAME: ${{ github.actor }} GITHUB_TOKEN: ${{ github.token }} steps: - uses: actions/checkout@v4 diff --git a/build.gradle b/build.gradle index 15a5c0651b34..e6e6808580d6 100644 --- a/build.gradle +++ b/build.gradle @@ -119,6 +119,7 @@ allprojects { version = projectVersion repositories { maven { + name = 'arenadata' url = 'https://maven.pkg.github.com/arenadata/*' credentials { username = project.findProperty("gpr.user") ?: System.getenv("GITHUB_USERNAME") @@ -126,14 +127,6 @@ allprojects { } } mavenCentral() - maven { - name = 'arenadata' - url = uri('https://maven.pkg.github.com/arenadata/*') - credentials { - username = System.getenv('GITHUB_ACTOR') ?: '' - password = System.getenv('GITHUB_TOKEN') ?: '' - } - } mavenLocal() } } diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index c17ed59b2674..9dbb63131138 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -162,4 +162,19 @@ private CatalogProperties() {} public static final String ENCRYPTION_KMS_TYPE = "encryption.kms-type"; public static final String ENCRYPTION_KMS_IMPL = "encryption.kms-impl"; + + /** + * Controls whether a table's metadata {@code location} is updated to the new default warehouse + * path when a managed table is renamed. When disabled (default), rename only repoints the + * metastore entry, matching the Iceberg specification. When enabled, a default-located table is + * relocated to the new name's default path so that re-creating a table with the old name does not + * share a directory with the renamed table. + * + *

Limit. Only writes made after the rename land under the new directory. Data + * files written before the rename keep their absolute paths under the old directory, so an + * external directory-level purge of the old directory will still destroy those pre-rename files. + */ + public static final String RENAME_UPDATE_METADATA_LOCATION = "rename.metadata.location.update"; + + public static final boolean RENAME_UPDATE_LOCATION_DEFAULT = false; } diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index ab08f2a748a4..b02f012b2819 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -24,6 +24,7 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -37,6 +38,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileInfo; import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.io.SupportsPrefixOperations; @@ -101,15 +103,26 @@ private CatalogUtil() {} * @param metadata the last valid TableMetadata instance for a dropped table. */ public static void dropTableData(FileIO io, TableMetadata metadata) { - if (supportsDirectoryDelete(io, metadata)) { - deleteTableDirectory((SupportsPrefixOperations) io, metadata); - } else { - deleteTableFiles(io, metadata); - } + deleteTableFiles(io, metadata); + deleteTableDirectoryIfEmpty(io, metadata); } - public static void deleteTableDirectory(SupportsPrefixOperations io, TableMetadata metadata) { - io.deletePrefix(metadata.location()); + /** Remove the table base directory if it's empty. */ + public static void deleteTableDirectoryIfEmpty(FileIO io, TableMetadata metadata) { + if (!supportsDirectoryDelete(io, metadata)) { + return; + } + + SupportsPrefixOperations prefixIo = (SupportsPrefixOperations) io; + Iterator remaining = prefixIo.listPrefix(metadata.location()).iterator(); + if (!remaining.hasNext()) { + prefixIo.deletePrefix(metadata.location()); + } else { + LOG.info( + "Skipping base-directory purge for {}: file {} is not referenced by this table", + metadata.location(), + remaining.next().location()); + } } private static void deleteTableFiles(FileIO io, TableMetadata metadata) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 402264f760cd..4fdecbdaaa02 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -18,8 +18,10 @@ */ package org.apache.iceberg; +import java.util.List; import java.util.Set; import org.apache.iceberg.deletes.DeleteGranularity; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; public class TableProperties { @@ -278,6 +280,15 @@ private TableProperties() {} public static final String WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit"; public static final int WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0; + public static final List CUSTOM_WRITE_PATH_PROPERTIES = + ImmutableList.of( + TableProperties.WRITE_DATA_LOCATION, + TableProperties.WRITE_METADATA_LOCATION, + TableProperties.WRITE_LOCATION_PROVIDER_IMPL, + // legacy aliases still honored by LocationProviders + TableProperties.OBJECT_STORE_PATH, + TableProperties.WRITE_FOLDER_STORAGE_LOCATION); + /** * @deprecated will be removed in 2.0.0, writing manifest lists is always enabled */ diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index c154e43304c6..2a262a264b05 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.hive; +import static org.apache.iceberg.TableProperties.CUSTOM_WRITE_PATH_PROPERTIES; + import java.io.IOException; import java.util.List; import java.util.Map; @@ -65,6 +67,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.view.BaseMetastoreViewCatalog; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; @@ -366,6 +369,11 @@ private void renameTableOrView( String fromName = from.name(); try { + if (contentType == HiveOperationsBase.ContentType.TABLE && tryRelocateRename(from, to)) { + LOG.info("Updated metadata and renamed Table from {}, to {}", from, to); + return; + } + Table table = clients.run(client -> client.getTable(fromDatabase, fromName)); validateTableIsIcebergTableOrView(contentType, table, CatalogUtil.fullTableName(name, from)); @@ -406,6 +414,74 @@ private void renameTableOrView( } } + /** + * Renames a table by committing a metadata location update through {@link HiveTableOperations} so + * that the rename of {@code dbName} / {@code tableName} and the update of {@code + * metadata_location} happen in a single HMS {@code alter_table} RPC. + * + *

Controlled by {@link CatalogProperties#RENAME_UPDATE_METADATA_LOCATION}. Only tables sitting + * at the default warehouse location of the source identifier are relocated; tables created with + * an explicit {@code LOCATION} return {@code false} so the caller falls back to the plain rename + * path. + * + * @return {@code true} if the rename was performed by the relocate path; {@code false} if the + * caller should fall back to the standard {@code alter_table}-only rename. + */ + private boolean tryRelocateRename(TableIdentifier from, TableIdentifier to) { + if (!PropertyUtil.propertyAsBoolean( + catalogProperties, + CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, + CatalogProperties.RENAME_UPDATE_LOCATION_DEFAULT)) { + return false; + } + + String oldTableDefaultLocation = + LocationUtil.stripTrailingSlash(defaultWarehouseLocation(from)); + String newTableDefaultLocation = LocationUtil.stripTrailingSlash(defaultWarehouseLocation(to)); + if (oldTableDefaultLocation.equals(newTableDefaultLocation)) { + return false; + } + + HiveTableOperations ops = newTableOps(from, hmsTable -> updateHmsTableName(hmsTable, to)); + + TableMetadata current; + try { + current = ops.current(); + } catch (NoSuchTableException e) { + // let base path handle this exception + return false; + } + + if (current == null) { + return false; + } + + if (!LocationUtil.stripTrailingSlash(current.location()).equals(oldTableDefaultLocation)) { + LOG.info( + "Not updating location of renamed table {} -> {}: table has an explicit location {}", + from, + to, + current.location()); + return false; + } + + List customWriteProperties = customWritePathProperties(current.properties()); + if (!customWriteProperties.isEmpty()) { + LOG.info( + "Not updating location of renamed table {} -> {}: table sets custom write-path " + + "properties {} that override metadata.location for new writes", + from, + to, + customWriteProperties); + return false; + } + + // update metadata location + table name via HmsTablePreCommitHandler + ops.commit(current, current.updateLocation(newTableDefaultLocation)); + LOG.info("Updated location of renamed table {} -> {} to {}", from, to, newTableDefaultLocation); + return true; + } + private void validateTableIsIcebergTableOrView( HiveOperationsBase.ContentType contentType, Table table, String fullName) { switch (contentType) { @@ -417,6 +493,17 @@ private void validateTableIsIcebergTableOrView( } } + private void updateHmsTableName(Table hmsTable, TableIdentifier to) { + hmsTable.setDbName(to.namespace().level(0)); + hmsTable.setTableName(to.name()); + } + + private static List customWritePathProperties(Map properties) { + return CUSTOM_WRITE_PATH_PROPERTIES.stream() + .filter(properties::containsKey) + .collect(Collectors.toList()); + } + /** * Check whether table or metadata table exists. * @@ -692,10 +779,22 @@ private boolean isValidateNamespace(Namespace namespace) { @Override public TableOperations newTableOps(TableIdentifier tableIdentifier) { + return newTableOps(tableIdentifier, HmsTablePreCommitHandler.NO_OP_HANDLER); + } + + protected HiveTableOperations newTableOps( + TableIdentifier tableIdentifier, HmsTablePreCommitHandler hmsTablePreCommitHandler) { String dbName = tableIdentifier.namespace().level(0); String tableName = tableIdentifier.name(); return new HiveTableOperations( - conf, clients, fileIO, keyManagementClient, name, dbName, tableName); + conf, + clients, + fileIO, + keyManagementClient, + hmsTablePreCommitHandler, + name, + dbName, + tableName); } @Override diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 1f2bdab7cffb..336e6e6b99db 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -81,6 +81,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations private final FileIO fileIO; private final KeyManagementClient keyManagementClient; private final ClientPool metaClients; + private final HmsTablePreCommitHandler hmsTablePreCommitHandler; private EncryptionManager encryptionManager; private EncryptingFileIO encryptingFileIO; @@ -96,10 +97,31 @@ protected HiveTableOperations( String catalogName, String database, String table) { + this( + conf, + metaClients, + fileIO, + keyManagementClient, + HmsTablePreCommitHandler.NO_OP_HANDLER, + catalogName, + database, + table); + } + + protected HiveTableOperations( + Configuration conf, + ClientPool metaClients, + FileIO fileIO, + KeyManagementClient keyManagementClient, + HmsTablePreCommitHandler hmsTablePreCommitHandler, + String catalogName, + String database, + String table) { this.conf = conf; this.metaClients = metaClients; this.fileIO = fileIO; this.keyManagementClient = keyManagementClient; + this.hmsTablePreCommitHandler = hmsTablePreCommitHandler; this.fullName = catalogName + "." + database + "." + table; this.catalogName = catalogName; this.database = database; @@ -301,6 +323,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { maxHiveTablePropertySize, currentMetadataLocation()); + hmsTablePreCommitHandler.handle(tbl); + if (!keepHiveStats) { tbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE); tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HmsTablePreCommitHandler.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HmsTablePreCommitHandler.java new file mode 100644 index 000000000000..d9ad8215320a --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HmsTablePreCommitHandler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hive; + +import org.apache.hadoop.hive.metastore.api.Table; + +public interface HmsTablePreCommitHandler { + HmsTablePreCommitHandler NO_OP_HANDLER = hmsTable -> {}; + + void handle(Table hmsTable); +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 5c3907670c52..e036754e487b 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -87,6 +87,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; import org.junit.jupiter.params.provider.ValueSource; /** @@ -190,6 +191,108 @@ public void testInvalidIdentifiersWithRename() { .hasMessageContaining("Invalid identifier: " + invalidTo); } + @Test + public void testRenameUpdatesLocationWhenEnabled() { + HiveCatalog renameCatalog = + initCatalog( + "hive", ImmutableMap.of(CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, "true")); + TableIdentifier from = TableIdentifier.of(DB_NAME, "rename_src"); + TableIdentifier to = TableIdentifier.of(DB_NAME, "rename_dst"); + try { + Table original = renameCatalog.createTable(from, getTestSchema()); + String originalLocation = original.location(); + assertThat(originalLocation).endsWith("/rename_src"); + + renameCatalog.renameTable(from, to); + + Table renamed = renameCatalog.loadTable(to); + assertThat(renamed.location()).endsWith("/rename_dst").isNotEqualTo(originalLocation); + + // Re-creating a table with the old name reuses the old directory without sharing it with the + // renamed table. + Table recreated = renameCatalog.createTable(from, getTestSchema()); + assertThat(recreated.location()).isEqualTo(originalLocation); + assertThat(recreated.location()).isNotEqualTo(renamed.location()); + } finally { + renameCatalog.dropTable(from, false); + renameCatalog.dropTable(to, false); + } + } + + @Test + public void testRenameKeepsExplicitLocationWhenEnabled() { + HiveCatalog renameCatalog = + initCatalog( + "hive", ImmutableMap.of(CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, "true")); + TableIdentifier from = TableIdentifier.of(DB_NAME, "rename_explicit_src"); + TableIdentifier to = TableIdentifier.of(DB_NAME, "rename_explicit_dst"); + String explicitLocation = temp.resolve("explicit-loc").toString(); + try { + Table original = + renameCatalog.createTable( + from, getTestSchema(), PartitionSpec.unpartitioned(), explicitLocation, null); + assertThat(original.location()).isEqualTo(explicitLocation); + + renameCatalog.renameTable(from, to); + + // A table created with an explicit LOCATION is left untouched by rename. + assertThat(renameCatalog.loadTable(to).location()).isEqualTo(explicitLocation); + } finally { + renameCatalog.dropTable(to, false); + } + } + + @ParameterizedTest + @FieldSource("org.apache.iceberg.TableProperties#CUSTOM_WRITE_PATH_PROPERTIES") + public void testRenameKeepsLocationWhenCustomWritePathSet(String writePathProperty) { + HiveCatalog renameCatalog = + initCatalog( + "hive", ImmutableMap.of(CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, "true")); + String suffix = writePathProperty.replaceAll("[^a-z0-9]", "_"); + TableIdentifier from = TableIdentifier.of(DB_NAME, "rename_custom_src_" + suffix); + TableIdentifier to = TableIdentifier.of(DB_NAME, "rename_custom_dst_" + suffix); + String customValue = + writePathProperty.equals(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) + ? "org.apache.iceberg.LocationProviders$DefaultLocationProvider" + : temp.resolve("custom-write-path").toString(); + try { + Table original = + renameCatalog.createTable( + from, + getTestSchema(), + PartitionSpec.unpartitioned(), + null, + ImmutableMap.of(writePathProperty, customValue)); + String originalLocation = original.location(); + assertThat(originalLocation).endsWith("/" + from.name()); + + renameCatalog.renameTable(from, to); + + Table renamed = renameCatalog.loadTable(to); + assertThat(renamed.location()).isEqualTo(originalLocation); + assertThat(renamed.properties()).containsEntry(writePathProperty, customValue); + } finally { + renameCatalog.dropTable(to, false); + } + } + + @Test + public void testRenameKeepsLocationWhenDisabled() { + TableIdentifier from = TableIdentifier.of(DB_NAME, "rename_default_src"); + TableIdentifier to = TableIdentifier.of(DB_NAME, "rename_default_dst"); + try { + Table original = catalog.createTable(from, getTestSchema()); + String originalLocation = original.location(); + + catalog.renameTable(from, to); + + // Default (flag off) behavior keeps the original location, matching the Iceberg spec. + assertThat(catalog.loadTable(to).location()).isEqualTo(originalLocation); + } finally { + catalog.dropTable(to, false); + } + } + @Test public void testCreateTableBuilder() throws Exception { Schema schema = getTestSchema(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 82d663d3a617..b6f3c3a08715 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -53,7 +53,6 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -388,12 +387,8 @@ public boolean purgeTable(Identifier ident) { } private void deleteTableFiles(FileIO io, TableMetadata tableMetadata) { - if (CatalogUtil.supportsDirectoryDelete(io, tableMetadata)) { - CatalogUtil.deleteTableDirectory((SupportsPrefixOperations) io, tableMetadata); - return; - } - SparkActions.get().deleteReachableFiles(tableMetadata.metadataFileLocation()).io(io).execute(); + CatalogUtil.deleteTableDirectoryIfEmpty(io, tableMetadata); } private boolean dropTableWithoutPurging(Identifier ident) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java index 32fd4d80824e..e4018e0e5173 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java @@ -19,15 +19,19 @@ package org.apache.iceberg.spark.sql; import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.spark.SparkException; @@ -308,6 +312,70 @@ public void testTableRename() { assertThat(validationCatalog.tableExists(renamedIdent)).as("New name should exist").isTrue(); } + @TestTemplate + public void testRenameProtectsAgainstBaseDirectoryDropEnabled() { + runRenameProtectsAgainstBaseDirectoryDrop(true); + } + + @TestTemplate + public void testRenameProtectsAgainstBaseDirectoryDropDisabled() { + runRenameProtectsAgainstBaseDirectoryDrop(false); + } + + private void runRenameProtectsAgainstBaseDirectoryDrop(boolean dropBaseDirEnabled) { + assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE)) + .as("rename.metadata.location.update is only implemented for the Hive catalog") + .isEqualTo(ICEBERG_CATALOG_TYPE_HIVE); + + String renameCatalog = "rename_loc_cat"; + spark.conf().set("spark.sql.catalog." + renameCatalog, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + renameCatalog + ".type", ICEBERG_CATALOG_TYPE_HIVE); + spark + .conf() + .set( + "spark.sql.catalog." + + renameCatalog + + "." + + CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, + "true"); + + String suffix = dropBaseDirEnabled ? "enabled" : "disabled"; + String src = renameCatalog + ".default.rename_drop_src_" + suffix; + String dst = renameCatalog + ".default.rename_drop_dst_" + suffix; + String tblProperties = + "TBLPROPERTIES ('drop.base-directory.enabled' = '" + dropBaseDirEnabled + "')"; + + try { + sql("CREATE NAMESPACE IF NOT EXISTS %s.default", renameCatalog); + sql("CREATE TABLE %s (id INT, name STRING) USING iceberg %s", src, tblProperties); + + // Pre-rename write: data file lands under the old src directory. + sql("INSERT INTO %s VALUES (0, 'old-data')", src); + + sql("ALTER TABLE %s RENAME TO %s", src, dst); + + // Post-rename write: data file lands under the new dst directory. + sql("INSERT INTO %s VALUES (1, 'post-rename')", dst); + + // Re-create the old name. It takes the now-vacated default directory. + sql("CREATE TABLE %s (id INT, name STRING) USING iceberg %s", src, tblProperties); + sql("INSERT INTO %s VALUES (2, 'new-src-data')", src); + + // DROP PURGE runs the per-file purge for the new src table. When the table-property is + // true, the opportunistic directory delete also runs and skips because dst's pre-rename + // files remain under the prefix. Either way, dst's data must be untouched. + sql("DROP TABLE %s PURGE", src); + + assertEquals( + "Renamed table data must survive purge of the old name", + ImmutableList.of(row(0, "old-data"), row(1, "post-rename")), + sql("SELECT * FROM %s ORDER BY id", dst)); + } finally { + sql("DROP TABLE IF EXISTS %s", src); + sql("DROP TABLE IF EXISTS %s", dst); + } + } + @TestTemplate public void testSetTableProperties() { sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java index 7a10e89c7f18..751d230be2a5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; import java.util.List; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Streams; @@ -68,7 +70,7 @@ private void dropTableInternal() throws IOException { ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName)); - List manifestAndFiles = manifestsAndFiles(); + List manifestAndFiles = manifestsAndFiles(tableName); assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); @@ -89,32 +91,63 @@ private void dropTableInternal() throws IOException { @TestTemplate public void testPurgeTable() throws IOException { - assertEquals( - "Should have expected rows", - ImmutableList.of(row(1, "test")), - sql("SELECT * FROM %s", tableName)); - - List manifestAndFiles = manifestsAndFiles(); - assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); - assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); - - sql("DROP TABLE %s PURGE", tableName); - assertThat(validationCatalog.tableExists(tableIdent)).as("Table should not exist").isFalse(); - assertThat(checkFilesExist(manifestAndFiles, false)).as("All files should be deleted").isTrue(); + testPurgeTable(tableIdent); } @TestTemplate public void testPurgeTableWithDeleteDirectoryEnabled() throws IOException { - String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); - sql("ALTER TABLE %s SET TBLPROPERTIES ('drop.base-directory.enabled' = 'true')", tableName); + String table = "testPurgeData"; + TableIdentifier tableIdent = TableIdentifier.of("default", table); - testPurgeTable(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING iceberg TBLPROPERTIES " + + "('drop.base-directory.enabled' = 'true')", + tableName(table)); + sql("INSERT INTO %s VALUES (1, 'test')", tableName(table)); + + String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); + testPurgeTable(tableIdent); assertThat(checkFilesExist(ImmutableList.of(tableBaseDir), false)) .as("Base table directory should be deleted") .isTrue(); } + @TestTemplate + public void testPurgeTableWithDeleteDirectoryEnabledSkipsDirWhenForeignFilesPresent() + throws IOException { + // HadoopCatalog deletes the base directory itself on drop, independent of this code path. + assumeThat(validationCatalog) + .as("HadoopCatalog drops the warehouse directory directly") + .isNotInstanceOf(org.apache.iceberg.hadoop.HadoopCatalog.class); + + String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); + sql("ALTER TABLE %s SET TBLPROPERTIES ('drop.base-directory.enabled' = 'true')", tableName); + + // A file under the table prefix that the dropped table does not reference. This stands in for + // a co-located table whose files share the same base directory after RENAME + re-create. + Path foreignFile = new Path(tableBaseDir, "foreign-dir/foreign-file"); + FileSystem fs = foreignFile.getFileSystem(hiveConf); + fs.mkdirs(foreignFile.getParent()); + fs.create(foreignFile).close(); + + try { + sql("DROP TABLE %s PURGE", tableName); + + assertThat(validationCatalog.tableExists(tableIdent)).as("Table should not exist").isFalse(); + assertThat(fs.exists(foreignFile)) + .as("Foreign file should survive opportunistic directory delete") + .isTrue(); + assertThat(checkFilesExist(ImmutableList.of(tableBaseDir), true)) + .as("Base table directory should be preserved when foreign files remain") + .isTrue(); + } finally { + // The next test in this JVM re-creates a table at the same default warehouse path; + // clear the surviving foreign file so it doesn't pollute that test's directory delete. + fs.delete(new Path(tableBaseDir), true); + } + } + @TestTemplate public void testPurgeTableWithDeleteDirectoryEnabledAndGcDisabled() throws IOException { String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); @@ -136,7 +169,7 @@ public void testPurgeTableGCDisabled() throws IOException { ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName)); - List manifestAndFiles = manifestsAndFiles(); + List manifestAndFiles = manifestsAndFiles(tableName); assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); @@ -153,10 +186,25 @@ public void testPurgeTableGCDisabled() throws IOException { .isTrue(); } - private List manifestsAndFiles() { - List files = sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.FILES); - List manifests = - sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS); + private void testPurgeTable(TableIdentifier tableId) throws IOException { + String table = tableName(tableId.name()); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "test")), + sql("SELECT * FROM %s", table)); + + List manifestAndFiles = manifestsAndFiles(table); + assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); + assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); + + sql("DROP TABLE %s PURGE", table); + assertThat(validationCatalog.tableExists(tableId)).as("Table should not exist").isFalse(); + assertThat(checkFilesExist(manifestAndFiles, false)).as("All files should be deleted").isTrue(); + } + + private List manifestsAndFiles(String table) { + List files = sql("SELECT file_path FROM %s.%s", table, MetadataTableType.FILES); + List manifests = sql("SELECT path FROM %s.%s", table, MetadataTableType.MANIFESTS); return Streams.concat(files.stream(), manifests.stream()) .map(row -> (String) row[0]) .collect(Collectors.toList()); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 305b93b58f9b..bc1f5847653b 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -53,7 +53,6 @@ import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.SupportsPrefixOperations; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Splitter; @@ -389,12 +388,8 @@ public boolean purgeTable(Identifier ident) { } private void deleteTableFiles(FileIO io, TableMetadata tableMetadata) { - if (CatalogUtil.supportsDirectoryDelete(io, tableMetadata)) { - CatalogUtil.deleteTableDirectory((SupportsPrefixOperations) io, tableMetadata); - return; - } - SparkActions.get().deleteReachableFiles(tableMetadata.metadataFileLocation()).io(io).execute(); + CatalogUtil.deleteTableDirectoryIfEmpty(io, tableMetadata); } private boolean dropTableWithoutPurging(Identifier ident) { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java index 6ba7e01c402d..fe2b53e0afe6 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestAlterTable.java @@ -19,15 +19,19 @@ package org.apache.iceberg.spark.sql; import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE; +import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; import static org.apache.iceberg.CatalogUtil.ICEBERG_CATALOG_TYPE_REST; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.NestedField; import org.apache.spark.SparkException; @@ -307,6 +311,70 @@ public void testTableRename() { assertThat(validationCatalog.tableExists(renamedIdent)).as("New name should exist").isTrue(); } + @TestTemplate + public void testRenameProtectsAgainstBaseDirectoryDropEnabled() { + runRenameProtectsAgainstBaseDirectoryDrop(true); + } + + @TestTemplate + public void testRenameProtectsAgainstBaseDirectoryDropDisabled() { + runRenameProtectsAgainstBaseDirectoryDrop(false); + } + + private void runRenameProtectsAgainstBaseDirectoryDrop(boolean dropBaseDirEnabled) { + assumeThat(catalogConfig.get(ICEBERG_CATALOG_TYPE)) + .as("rename.metadata.location.update is only implemented for the Hive catalog") + .isEqualTo(ICEBERG_CATALOG_TYPE_HIVE); + + String renameCatalog = "rename_loc_cat"; + spark.conf().set("spark.sql.catalog." + renameCatalog, SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog." + renameCatalog + ".type", ICEBERG_CATALOG_TYPE_HIVE); + spark + .conf() + .set( + "spark.sql.catalog." + + renameCatalog + + "." + + CatalogProperties.RENAME_UPDATE_METADATA_LOCATION, + "true"); + + String suffix = dropBaseDirEnabled ? "enabled" : "disabled"; + String src = renameCatalog + ".default.rename_drop_src_" + suffix; + String dst = renameCatalog + ".default.rename_drop_dst_" + suffix; + String tblProperties = + "TBLPROPERTIES ('drop.base-directory.enabled' = '" + dropBaseDirEnabled + "')"; + + try { + sql("CREATE NAMESPACE IF NOT EXISTS %s.default", renameCatalog); + sql("CREATE TABLE %s (id INT, name STRING) USING iceberg %s", src, tblProperties); + + // Pre-rename write: data file lands under the old src directory. + sql("INSERT INTO %s VALUES (0, 'old-data')", src); + + sql("ALTER TABLE %s RENAME TO %s", src, dst); + + // Post-rename write: data file lands under the new dst directory. + sql("INSERT INTO %s VALUES (1, 'post-rename')", dst); + + // Re-create the old name. It takes the now-vacated default directory. + sql("CREATE TABLE %s (id INT, name STRING) USING iceberg %s", src, tblProperties); + sql("INSERT INTO %s VALUES (2, 'new-src-data')", src); + + // DROP PURGE runs the per-file purge for the new src table. When the table-property is + // true, the opportunistic directory delete also runs and skips because dst's pre-rename + // files remain under the prefix. Either way, dst's data must be untouched. + sql("DROP TABLE %s PURGE", src); + + assertEquals( + "Renamed table data must survive purge of the old name", + ImmutableList.of(row(0, "old-data"), row(1, "post-rename")), + sql("SELECT * FROM %s ORDER BY id", dst)); + } finally { + sql("DROP TABLE IF EXISTS %s", src); + sql("DROP TABLE IF EXISTS %s", dst); + } + } + @TestTemplate public void testSetTableProperties() { sql("ALTER TABLE %s SET TBLPROPERTIES ('prop'='value')", tableName); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java index 7a10e89c7f18..751d230be2a5 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; import java.util.List; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Streams; @@ -68,7 +70,7 @@ private void dropTableInternal() throws IOException { ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName)); - List manifestAndFiles = manifestsAndFiles(); + List manifestAndFiles = manifestsAndFiles(tableName); assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); @@ -89,32 +91,63 @@ private void dropTableInternal() throws IOException { @TestTemplate public void testPurgeTable() throws IOException { - assertEquals( - "Should have expected rows", - ImmutableList.of(row(1, "test")), - sql("SELECT * FROM %s", tableName)); - - List manifestAndFiles = manifestsAndFiles(); - assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); - assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); - - sql("DROP TABLE %s PURGE", tableName); - assertThat(validationCatalog.tableExists(tableIdent)).as("Table should not exist").isFalse(); - assertThat(checkFilesExist(manifestAndFiles, false)).as("All files should be deleted").isTrue(); + testPurgeTable(tableIdent); } @TestTemplate public void testPurgeTableWithDeleteDirectoryEnabled() throws IOException { - String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); - sql("ALTER TABLE %s SET TBLPROPERTIES ('drop.base-directory.enabled' = 'true')", tableName); + String table = "testPurgeData"; + TableIdentifier tableIdent = TableIdentifier.of("default", table); - testPurgeTable(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING iceberg TBLPROPERTIES " + + "('drop.base-directory.enabled' = 'true')", + tableName(table)); + sql("INSERT INTO %s VALUES (1, 'test')", tableName(table)); + + String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); + testPurgeTable(tableIdent); assertThat(checkFilesExist(ImmutableList.of(tableBaseDir), false)) .as("Base table directory should be deleted") .isTrue(); } + @TestTemplate + public void testPurgeTableWithDeleteDirectoryEnabledSkipsDirWhenForeignFilesPresent() + throws IOException { + // HadoopCatalog deletes the base directory itself on drop, independent of this code path. + assumeThat(validationCatalog) + .as("HadoopCatalog drops the warehouse directory directly") + .isNotInstanceOf(org.apache.iceberg.hadoop.HadoopCatalog.class); + + String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); + sql("ALTER TABLE %s SET TBLPROPERTIES ('drop.base-directory.enabled' = 'true')", tableName); + + // A file under the table prefix that the dropped table does not reference. This stands in for + // a co-located table whose files share the same base directory after RENAME + re-create. + Path foreignFile = new Path(tableBaseDir, "foreign-dir/foreign-file"); + FileSystem fs = foreignFile.getFileSystem(hiveConf); + fs.mkdirs(foreignFile.getParent()); + fs.create(foreignFile).close(); + + try { + sql("DROP TABLE %s PURGE", tableName); + + assertThat(validationCatalog.tableExists(tableIdent)).as("Table should not exist").isFalse(); + assertThat(fs.exists(foreignFile)) + .as("Foreign file should survive opportunistic directory delete") + .isTrue(); + assertThat(checkFilesExist(ImmutableList.of(tableBaseDir), true)) + .as("Base table directory should be preserved when foreign files remain") + .isTrue(); + } finally { + // The next test in this JVM re-creates a table at the same default warehouse path; + // clear the surviving foreign file so it doesn't pollute that test's directory delete. + fs.delete(new Path(tableBaseDir), true); + } + } + @TestTemplate public void testPurgeTableWithDeleteDirectoryEnabledAndGcDisabled() throws IOException { String tableBaseDir = validationCatalog.loadTable(tableIdent).location(); @@ -136,7 +169,7 @@ public void testPurgeTableGCDisabled() throws IOException { ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName)); - List manifestAndFiles = manifestsAndFiles(); + List manifestAndFiles = manifestsAndFiles(tableName); assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); @@ -153,10 +186,25 @@ public void testPurgeTableGCDisabled() throws IOException { .isTrue(); } - private List manifestsAndFiles() { - List files = sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.FILES); - List manifests = - sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS); + private void testPurgeTable(TableIdentifier tableId) throws IOException { + String table = tableName(tableId.name()); + assertEquals( + "Should have expected rows", + ImmutableList.of(row(1, "test")), + sql("SELECT * FROM %s", table)); + + List manifestAndFiles = manifestsAndFiles(table); + assertThat(manifestAndFiles).as("There should be 2 files for manifests and files").hasSize(2); + assertThat(checkFilesExist(manifestAndFiles, true)).as("All files should exist").isTrue(); + + sql("DROP TABLE %s PURGE", table); + assertThat(validationCatalog.tableExists(tableId)).as("Table should not exist").isFalse(); + assertThat(checkFilesExist(manifestAndFiles, false)).as("All files should be deleted").isTrue(); + } + + private List manifestsAndFiles(String table) { + List files = sql("SELECT file_path FROM %s.%s", table, MetadataTableType.FILES); + List manifests = sql("SELECT path FROM %s.%s", table, MetadataTableType.MANIFESTS); return Streams.concat(files.stream(), manifests.stream()) .map(row -> (String) row[0]) .collect(Collectors.toList());