Skip to content

Commit

Permalink
[SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching t…
Browse files Browse the repository at this point in the history
…able/view

### What changes were proposed in this pull request?

Some commands like ALTER TABLE and REPLACE VIEW need to uncache the table/view. Today the implementation simply does `sparkSession.catalog.uncacheTable(name)`, which looks up the table/view. This PR improves it by using the existing `CacheManager.uncacheTableOrView` function, which does not need to look up the table view.

### Why are the changes needed?

small perf improvement (reduce metastore RPC calls)

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #45289 from cloud-fan/view.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Feb 28, 2024
1 parent 537a2d9 commit 6d41866
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ private[sql] object CatalogV2Implicits {
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
}

def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = {
(catalog.name() +: ident.namespace() :+ ident.name()).toImmutableArraySeq
}
}

implicit class MultipartIdentifierHelper(parts: Seq[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
import org.apache.spark.util.ArrayImplicits._

/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
Expand Down Expand Up @@ -170,22 +169,20 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
plan match {
case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) =>
val v1Ident = catalogTable.identifier
isSameName(ident.qualifier :+ ident.name) &&
isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)

case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) =>
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
isSameName(ident.qualifier :+ ident.name) &&
isSameName((catalog.name() +: v2Ident.namespace() :+ v2Ident.name()).toImmutableArraySeq)
isSameName(v2Ident.toQualifiedNameParts(catalog))

case SubqueryAlias(ident, View(catalogTable, _, _)) =>
val v1Ident = catalogTable.identifier
isSameName(ident.qualifier :+ ident.name) &&
isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)

case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) =>
val v1Ident = catalogTable.identifier
isSameName(ident.qualifier :+ ident.name) &&
isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)

case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -462,12 +462,17 @@ object CommandUtils extends Logging {
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
}

def uncacheTableOrView(sparkSession: SparkSession, name: String): Unit = {
try {
sparkSession.catalog.uncacheTable(name)
} catch {
case NonFatal(e) => logWarning(s"Exception when attempting to uncache $name", e)
}
def uncacheTableOrView(sparkSession: SparkSession, ident: ResolvedIdentifier): Unit = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
uncacheTableOrView(sparkSession, ident.identifier.toQualifiedNameParts(ident.catalog))
}

def uncacheTableOrView(sparkSession: SparkSession, ident: TableIdentifier): Unit = {
uncacheTableOrView(sparkSession, ident.nameParts)
}

private def uncacheTableOrView(sparkSession: SparkSession, name: Seq[String]): Unit = {
sparkSession.sharedState.cacheManager.uncacheTableOrView(sparkSession, name, cascade = true)
}

def calculateRowCountsPerPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ case class AlterTableRenameCommand(
sparkSession.table(oldName.unquotedString))
val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
if (optStorageLevel.isDefined) {
CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
CommandUtils.uncacheTableOrView(sparkSession, oldName)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
Expand Down Expand Up @@ -235,7 +235,7 @@ case class AlterTableAddColumnsCommand(
val colsWithProcessedDefaults =
constantFoldCurrentDefaultsToExistDefaults(sparkSession, catalogTable.provider)

CommandUtils.uncacheTableOrView(sparkSession, table.quotedString)
CommandUtils.uncacheTableOrView(sparkSession, table)
catalog.refreshTable(table)

SchemaUtils.checkColumnNameDuplication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ case class CreateViewCommand(

// uncache the cached data before replacing an exists view
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
CommandUtils.uncacheTableOrView(sparkSession, viewIdent)

// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
// Nothing we need to retain from the old view, so just drop and create a new one
Expand Down Expand Up @@ -298,7 +298,7 @@ case class AlterViewAsCommand(
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)

logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)
CommandUtils.uncacheTableOrView(session, viewIdent)

val newProperties = generateViewProperties(
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)
Expand Down Expand Up @@ -667,7 +667,7 @@ object ViewHelper extends SQLConfHelper with Logging {
// view is already converted to the underlying tables. So no cyclic views.
checkCyclicViewReference(analyzedPlan, Seq(name), name)
}
CommandUtils.uncacheTableOrView(session, name.quotedString)
CommandUtils.uncacheTableOrView(session, name)
}
if (!storeAnalyzedPlanForView) {
TemporaryViewRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
Expand Down Expand Up @@ -348,10 +349,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView(
session,
(r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name()).toImmutableArraySeq,
cascade = true)
val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r)
DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil

case _: NoopCommand =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ case class InsertIntoHiveTable(
}

// un-cache this table.
CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
CommandUtils.uncacheTableOrView(sparkSession, table.identifier)
sparkSession.sessionState.catalog.refreshTable(table.identifier)

CommandUtils.updateTableStats(sparkSession, table)
Expand Down

0 comments on commit 6d41866

Please sign in to comment.