From 6d41866e631403d994e8ad256c5323abc3548532 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Feb 2024 13:48:18 +0800 Subject: [PATCH] [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view ### What changes were proposed in this pull request? Some commands like ALTER TABLE and REPLACE VIEW need to uncache the table/view. Today the implementation simply does `sparkSession.catalog.uncacheTable(name)`, which looks up the table/view. This PR improves it by using the existing `CacheManager.uncacheTableOrView` function, which does not need to look up the table view. ### Why are the changes needed? small perf improvement (reduce metastore RPC calls) ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #45289 from cloud-fan/view. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../catalog/CatalogV2Implicits.scala | 4 ++++ .../spark/sql/execution/CacheManager.scala | 13 +++++-------- .../sql/execution/command/CommandUtils.scala | 19 ++++++++++++------- .../spark/sql/execution/command/tables.scala | 4 ++-- .../spark/sql/execution/command/views.scala | 6 +++--- .../datasources/v2/DataSourceV2Strategy.scala | 6 ++---- .../hive/execution/InsertIntoHiveTable.scala | 2 +- 7 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 2b712241633be..bf4cd2eedc83c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -169,6 +169,10 @@ private[sql] object CatalogV2Implicits { case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName)) case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original) } + + def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = { + (catalog.name() +: ident.namespace() :+ ident.name()).toImmutableArraySeq + } } implicit class MultipartIdentifierHelper(parts: Seq[String]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index db6266fe1756c..6c5639ef99d49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK -import org.apache.spark.util.ArrayImplicits._ /** Holds a cached logical plan and its data */ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) @@ -170,22 +169,20 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { plan match { case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) => val v1Ident = catalogTable.identifier - isSameName(ident.qualifier :+ ident.name) && - isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts) case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) => + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper isSameName(ident.qualifier :+ ident.name) && - isSameName((catalog.name() +: v2Ident.namespace() :+ v2Ident.name()).toImmutableArraySeq) + isSameName(v2Ident.toQualifiedNameParts(catalog)) case SubqueryAlias(ident, View(catalogTable, _, _)) => val v1Ident = catalogTable.identifier - isSameName(ident.qualifier :+ ident.name) && - isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts) case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) => val v1Ident = catalogTable.identifier - isSameName(ident.qualifier :+ ident.name) && - isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table) + isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 73478272a6841..eccf16ecea13f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ @@ -462,12 +462,17 @@ object CommandUtils extends Logging { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - def uncacheTableOrView(sparkSession: SparkSession, name: String): Unit = { - try { - sparkSession.catalog.uncacheTable(name) - } catch { - case NonFatal(e) => logWarning(s"Exception when attempting to uncache $name", e) - } + def uncacheTableOrView(sparkSession: SparkSession, ident: ResolvedIdentifier): Unit = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper + uncacheTableOrView(sparkSession, ident.identifier.toQualifiedNameParts(ident.catalog)) + } + + def uncacheTableOrView(sparkSession: SparkSession, ident: TableIdentifier): Unit = { + uncacheTableOrView(sparkSession, ident.nameParts) + } + + private def uncacheTableOrView(sparkSession: SparkSession, name: Seq[String]): Unit = { + sparkSession.sharedState.cacheManager.uncacheTableOrView(sparkSession, name, cascade = true) } def calculateRowCountsPerPartition( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index fa288fd94ea9b..1a97b965da2bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -203,7 +203,7 @@ case class AlterTableRenameCommand( sparkSession.table(oldName.unquotedString)) val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel) if (optStorageLevel.isDefined) { - CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString) + CommandUtils.uncacheTableOrView(sparkSession, oldName) } // Invalidate the table last, otherwise uncaching the table would load the logical plan // back into the hive metastore cache @@ -235,7 +235,7 @@ case class AlterTableAddColumnsCommand( val colsWithProcessedDefaults = constantFoldCurrentDefaultsToExistDefaults(sparkSession, catalogTable.provider) - CommandUtils.uncacheTableOrView(sparkSession, table.quotedString) + CommandUtils.uncacheTableOrView(sparkSession, table) catalog.refreshTable(table) SchemaUtils.checkColumnNameDuplication( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 3a761541a00e2..d71d0d43683cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -155,7 +155,7 @@ case class CreateViewCommand( // uncache the cached data before replacing an exists view logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.") - CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) + CommandUtils.uncacheTableOrView(sparkSession, viewIdent) // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` // Nothing we need to retain from the old view, so just drop and create a new one @@ -298,7 +298,7 @@ case class AlterViewAsCommand( checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent) logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.") - CommandUtils.uncacheTableOrView(session, viewIdent.quotedString) + CommandUtils.uncacheTableOrView(session, viewIdent) val newProperties = generateViewProperties( viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames) @@ -667,7 +667,7 @@ object ViewHelper extends SQLConfHelper with Logging { // view is already converted to the underlying tables. So no cyclic views. checkCyclicViewReference(analyzedPlan, Seq(name), name) } - CommandUtils.uncacheTableOrView(session, name.quotedString) + CommandUtils.uncacheTableOrView(session, name) } if (!storeAnalyzedPlanForView) { TemporaryViewRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 30d05350aa72a..d33ecea6e0db7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat import org.apache.spark.sql.connector.write.V1Write import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH @@ -348,10 +349,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat } case DropTable(r: ResolvedIdentifier, ifExists, purge) => - val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView( - session, - (r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name()).toImmutableArraySeq, - cascade = true) + val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r) DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil case _: NoopCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 74d131d6664fd..4a92bfd840405 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -108,7 +108,7 @@ case class InsertIntoHiveTable( } // un-cache this table. - CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString) + CommandUtils.uncacheTableOrView(sparkSession, table.identifier) sparkSession.sessionState.catalog.refreshTable(table.identifier) CommandUtils.updateTableStats(sparkSession, table)