diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceIceberg.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceIceberg.scala index 93a50964..8c76c697 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceIceberg.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceIceberg.scala @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory import za.co.absa.pramen.api.{CatalogTable, PartitionScheme} import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig +import za.co.absa.pramen.core.utils.CatalogUtils import za.co.absa.pramen.core.utils.hive.QueryExecutor import java.sql.Date @@ -57,7 +58,7 @@ class MetastorePersistenceIceberg(table: CatalogTable, case _ => (false, "Writing to") } - val tableExists = doesTableExist(table) + val tableExists = CatalogUtils.doesTableExist(table) if (tableExists) { log.info(s"$operationStr to table $fullTableName...") @@ -106,29 +107,6 @@ class MetastorePersistenceIceberg(table: CatalogTable, throw new UnsupportedOperationException("Iceberg only operates on tables in a catalog. Separate Hive options are not supported.") } - def doesTableExist(catalogTable: CatalogTable)(implicit spark: SparkSession): Boolean = { - getExistingTable(catalogTable).isDefined - } - - def getExistingTable(catalogTable: CatalogTable)(implicit spark: SparkSession): Option[DataFrame] = { - try { - val df = spark.table(catalogTable.getFullTableName) - // Force analysis to surface TABLE_OR_VIEW_NOT_FOUND at this point. - // Technically, not needed, but Spark can potentially skip analysis until the schema is requested. - val _ = df.schema - Some(df) - } catch { - // This is a common error - case ex: AnalysisException if ex.getMessage().contains("Table or view not found") || ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND") => - None - // This is the exception, needs to be re-thrown. - case ex: AnalysisException if ex.getMessage().contains("TableType cannot be null for table:") => - throw new IllegalArgumentException("Attempt to use a catalog not supported by the file format. " + - "Ensure you are using the iceberg catalog and/or it is set as the default catalog with (spark.sql.defaultCatalog) " + - "or the catalog is specified explicitly as the table name.", ex) - } - } - def getFilter(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): Column = { if (partitionScheme == PartitionScheme.Overwrite) { if (infoDateFrom.isDefined || infoDateTo.isDefined) { @@ -187,5 +165,4 @@ object MetastorePersistenceIceberg { throw new UnsupportedOperationException(s"Partition scheme $partitionScheme is not supported for adding generated columns.") } } - } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CatalogUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CatalogUtils.scala new file mode 100644 index 00000000..bbc5f258 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CatalogUtils.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.utils + +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} +import za.co.absa.pramen.api.CatalogTable + +object CatalogUtils { + /** Checks if a catalog table exists. Supports Iceberg, Delta, Parquet tables. */ + def doesTableExist(fullTableName: String)(implicit spark: SparkSession): Boolean = { + getExistingTable(fullTableName).isDefined + } + + /** Checks if a catalog table exists. The table is a structured object. Supports Iceberg, Delta, Parquet tables. */ + def doesTableExist(catalogTable: CatalogTable)(implicit spark: SparkSession): Boolean = { + getExistingTable(catalogTable).isDefined + } + + /** + * Fetches an existing table as a DataFrame based on the provided `CatalogTable`. + * If the table is not found, returns `None`. If an unsupported catalog is used, + * throws an `IllegalArgumentException`. + * + * Supports Iceberg, Delta, Parquet tables + * + * @param fullTableName The catalog table representing the metadata of the desired table. + * @param spark An implicit SparkSession instance used to interact with the table. + * @return An `Option[DataFrame]` containing the table as a DataFrame if it exists, + * or `None` if the table is not found. + */ + def getExistingTable(fullTableName: String)(implicit spark: SparkSession): Option[DataFrame] = { + try { + val df = spark.table(fullTableName) + // Force analysis to surface TABLE_OR_VIEW_NOT_FOUND at this point. + // Technically, not needed, but Spark can potentially skip analysis until the schema is requested. + val _ = df.schema + Some(df) + } catch { + // This is a common error + case ex: AnalysisException if ex.getMessage().contains("Table or view not found") || ex.getMessage().contains("TABLE_OR_VIEW_NOT_FOUND") => + None + // This is the exception, needs to be re-thrown. This happens when attempting to read Iceberg table when Spark catalog is not configured + // properly. + case ex: AnalysisException if ex.getMessage().contains("TableType cannot be null for table:") => + throw new IllegalArgumentException("Attempt to use a catalog not supported by the file format. " + + "Ensure you are using the iceberg catalog and/or it is set as the default catalog with (spark.sql.defaultCatalog) " + + "or the catalog is specified explicitly as the table name.", ex) + } + } + + /** Same as above, but uses a structured catalog table name. */ + def getExistingTable(catalogTable: CatalogTable)(implicit spark: SparkSession): Option[DataFrame] = { + getExistingTable(catalogTable.getFullTableName) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/CatalogUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/CatalogUtilsSuite.scala new file mode 100644 index 00000000..67bddc22 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/CatalogUtilsSuite.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.utils + +import org.apache.spark.sql.Row +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.utils.CatalogUtils + +import scala.collection.JavaConverters._ + +class CatalogUtilsSuite extends AnyWordSpec with SparkTestBase{ + "doesTableExist" should { + "return false for non-existing table" in { + val tableName = "non_existing_table_xyz_123" + + val exists = CatalogUtils.doesTableExist(tableName) + + assert(!exists) + } + + "return true for existing temporary view" in { + val tableName = "test_temp_view" + val data = Seq(Row(1, "a"), Row(2, "b")) + val schema = "id INT, name STRING" + + val df = spark.createDataFrame(data.asJava, spark.sessionState.sqlParser.parseTableSchema(schema)) + df.createTempView(tableName) + + val exists = CatalogUtils.doesTableExist(tableName) + + assert(exists) + + spark.catalog.dropTempView(tableName) + } + + "return false after dropping temporary view" in { + val tableName = "test_temp_view_to_drop" + val data = Seq(Row(1, "a")) + val schema = "id INT, name STRING" + + val df = spark.createDataFrame(data.asJava, spark.sessionState.sqlParser.parseTableSchema(schema)) + df.createTempView(tableName) + spark.catalog.dropTempView(tableName) + + val exists = CatalogUtils.doesTableExist(tableName) + + assert(!exists) + } + + "return false for non-existing table with schema prefix" in { + val tableName = "default.non_existing_table_with_schema" + + val exists = CatalogUtils.doesTableExist(tableName) + + assert(!exists) + } + } + +}