diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index fd700f1ee0130..e05a9bfcc66da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -412,7 +412,8 @@ class SessionCatalog( def invalidateCachedTable(name: TableIdentifier): Unit = { val qualified = qualifyIdentifier(name) invalidateCachedTable(QualifiedTableName( - qualified.catalog.get, qualified.database.get, qualified.table)) + qualified.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + qualified.database.get, qualified.table)) } /** This method provides a way to invalidate all the cached plans. */ @@ -439,7 +440,8 @@ class SessionCatalog( private def requireTableExists(name: TableIdentifier): Unit = { if (!tableExists(name)) { throw new NoSuchTableException( - Seq(name.catalog.get, name.database.get, name.table)) + Seq(name.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + name.database.get, name.table)) } } @@ -1450,7 +1452,8 @@ class SessionCatalog( getLocalOrGlobalTempView(name).map(_.refresh()).getOrElse { val qualifiedIdent = qualifyIdentifier(name) val qualifiedTableName = QualifiedTableName( - qualifiedIdent.catalog.get, qualifiedIdent.database.get, qualifiedIdent.table) + qualifiedIdent.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + qualifiedIdent.database.get, qualifiedIdent.table) tableRelationCache.invalidate(qualifiedTableName) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index d5a9cc723bc34..7b4b9334f02da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1090,7 +1090,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ident: TableIdentifier, operation: String): Throwable = { unsupportedTableOperationError( - Seq(ident.catalog.get, ident.database.get, ident.table), operation) + Seq(ident.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + ident.database.get, ident.table), operation) } private def unsupportedTableOperationError( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 7aff4ed1e3de5..f664f1b04118c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, StreamingSo import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.{GeneratedColumn, IdentityColumn, PushableExpression, ResolveDefaultColumns} import org.apache.spark.sql.classic.{SparkSession, Strategy} -import org.apache.spark.sql.connector.catalog.{SupportsRead, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsRead, V1Table} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.expressions.{Expression => V2Expression, NullOrdering, SortDirection, SortOrder => V2SortOrder, SortValue} import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggregation} @@ -254,7 +254,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] private def readDataSourceTable( table: CatalogTable, extraOptions: CaseInsensitiveStringMap): LogicalPlan = { val qualifiedTableName = - QualifiedTableName(table.identifier.catalog.get, table.database, table.identifier.table) + QualifiedTableName( + table.identifier.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + table.database, table.identifier.table) val catalog = sparkSession.sessionState.catalog val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, table) val readFileSourceTableCacheIgnoreOptions = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index c268cd963b802..056d01279dbcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -94,7 +94,8 @@ class V2SessionCatalog(catalog: SessionCatalog) // `V1Table` if the custom session catalog is present. if (table.provider.isDefined && !hasCustomSessionCatalog) { val qualifiedTableName = QualifiedTableName( - table.identifier.catalog.get, table.database, table.identifier.table) + table.identifier.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), + table.database, table.identifier.table) // Check if the table is in the v1 table cache to skip the v2 table lookup. if (catalog.getCachedTable(qualifiedTableName) != null) { return V1Table(table) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 766624a8c48b2..1c7c1736d02a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -206,7 +206,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileType: String, isWrite: Boolean): LogicalRelation = { val metastoreSchema = relation.tableMeta.schema - val tableIdentifier = QualifiedTableName(relation.tableMeta.identifier.catalog.get, + val tableIdentifier = QualifiedTableName( + relation.tableMeta.identifier.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME), relation.tableMeta.database, relation.tableMeta.identifier.table) val lazyPruningEnabled = sparkSession.sessionState.conf.manageFilesourcePartitions diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index ec18c97b640d9..ca18ef133c3bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -129,6 +129,31 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton with QueryTest { assert(schema == expectedSchema) } } + + test("SPARK-49211: read convertible Hive table with v1IdentifierNoCatalog enabled") { + withTable("t") { + sql("CREATE TABLE t (id INT) STORED AS PARQUET") + sql("INSERT INTO t VALUES (1)") + withSQLConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME.key -> "true") { + assert(spark.table("t").collect() === Array(Row(1))) + sql("REFRESH TABLE t") + assert(spark.table("t").collect() === Array(Row(1))) + } + } + } + + test("SPARK-49211: unsupported v1 table operation reports a clean error " + + "with v1IdentifierNoCatalog enabled") { + withTable("t") { + sql("CREATE TABLE t (id INT) STORED AS PARQUET") + withSQLConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME.key -> "true") { + val e = intercept[AnalysisException] { + sql("ALTER TABLE t REPLACE COLUMNS (id INT)") + } + assert(e.getCondition == "UNSUPPORTED_FEATURE.TABLE_OPERATION") + } + } + } } class DataSourceWithHiveMetastoreCatalogSuite