From 962ea23e7af1a01c40793aea86ac7114c719d599 Mon Sep 17 00:00:00 2001 From: Andrei Shitov Date: Thu, 18 Jun 2026 10:09:40 +0300 Subject: [PATCH] feat(sql): support v1IdentifierNoCatalog for convertible Hive table reads --- .../sql/catalyst/catalog/SessionCatalog.scala | 6 +++-- .../sql/errors/QueryCompilationErrors.scala | 3 ++- .../datasources/DataSourceStrategy.scala | 6 +++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 3 ++- .../sql/hive/HiveMetastoreCatalogSuite.scala | 25 +++++++++++++++++++ 5 files changed, 37 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0de9673a5f968..ebdacee54eab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -229,7 +229,8 @@ class SessionCatalog( def invalidateCachedTable(name: TableIdentifier): Unit = { val qualified = qualifyIdentifier(name) invalidateCachedTable(QualifiedTableName( - qualified.catalog.get, qualified.database.get, qualified.table)) + qualified.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + qualified.database.get, qualified.table)) } /** This method provides a way to invalidate all the cached plans. */ @@ -1133,7 +1134,8 @@ class SessionCatalog( getLocalOrGlobalTempView(name).map(_.refresh).getOrElse { val qualifiedIdent = qualifyIdentifier(name) val qualifiedTableName = QualifiedTableName( - qualifiedIdent.catalog.get, qualifiedIdent.database.get, qualifiedIdent.table) + qualifiedIdent.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + qualifiedIdent.database.get, qualifiedIdent.table) tableRelationCache.invalidate(qualifiedTableName) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index ec58298babdb7..e6b80803a2312 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -834,7 +834,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ident: TableIdentifier, operation: String): Throwable = { unsupportedTableOperationError( - Seq(ident.catalog.get, ident.database.get, ident.table), operation) + Seq(ident.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + ident.database.get, ident.table), operation) } private def unsupportedTableOperationError( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 431480bb2edf2..3e7e9042364a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns, V2ExpressionBuilder} -import org.apache.spark.sql.connector.catalog.{SupportsRead, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsRead, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation} @@ -242,7 +242,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] private def readDataSourceTable( table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = { val qualifiedTableName = - QualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table) + QualifiedTableName( + table.identifier.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) catalog.getCachedPlan(qualifiedTableName, () => { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 54b9db967d2dd..2638cde57ec72 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -193,7 +193,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileType: String, isWrite: Boolean): LogicalRelation = { val metastoreSchema = relation.tableMeta.schema - val tableIdentifier = QualifiedTableName(relation.tableMeta.identifier.catalog.get, + val tableIdentifier = QualifiedTableName( + relation.tableMeta.identifier.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), relation.tableMeta.database, relation.tableMeta.identifier.table) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index af3d4555bc5cf..60a9bdb5a0dd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -129,6 +129,31 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with SQLTestUtils { assert(schema == expectedSchema) } } + + test("SPARK-49211: read convertible Hive table with v1IdentifierNoCatalog enabled") { + withTable("t") { + sql("CREATE TABLE t (id INT) STORED AS PARQUET") + sql("INSERT INTO t VALUES (1)") + withSQLConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME.key -> "true") { + assert(spark.table("t").collect() === Array(Row(1))) + sql("REFRESH TABLE t") + assert(spark.table("t").collect() === Array(Row(1))) + } + } + } + + test("SPARK-49211: unsupported v1 table operation reports a clean error " + + "with v1IdentifierNoCatalog enabled") { + withTable("t") { + sql("CREATE TABLE t (id INT) STORED AS PARQUET") + withSQLConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME.key -> "true") { + val e = intercept[AnalysisException] { + sql("ALTER TABLE t REPLACE COLUMNS (id INT)") + } + assert(e.getErrorClass == "UNSUPPORTED_FEATURE.TABLE_OPERATION") + } + } + } } class DataSourceWithHiveMetastoreCatalogSuite