Skip to content

Commit

Permalink
[SPARK-33305][SQL] DSv2: DROP TABLE command should also invalidate cache
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This changes `DropTableExec` to also invalidate caches referencing the table to be dropped, in a cascading manner.

### Why are the changes needed?

In DSv1, `DROP TABLE` command also invalidate caches as described in [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765). However in DSv2 the same command only drops the table but doesn't handle the caches. This could lead to correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes. Now DSv2 `DROP TABLE` command also invalidates cache.

### How was this patch tested?

Added a new UT

Closes #30211 from sunchao/SPARK-33305.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
sunchao authored and cloud-fan committed Nov 10, 2020
1 parent 27bb40b commit 4934da5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
throw new AnalysisException("Describing columns is not supported for v2 tables.")

case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(r.catalog, r.identifier, ifExists, purge) :: Nil
DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil

case _: NoopDropTable =>
LocalTableScanExec(Nil, Nil) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}

/**
* Physical plan node for dropping a table.
*/
case class DropTableExec(
session: SparkSession,
catalog: TableCatalog,
table: Table,
ident: Identifier,
ifExists: Boolean,
purge: Boolean) extends V2CommandExec {

override def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
catalog.dropTable(ident, purge)
} else if (!ifExists) {
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,22 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-33305: DROP TABLE should also invalidate cache") {
val t = "testcat.ns.t"
val view = "view"
withTable(t) {
withTempView(view) {
sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
sql(s"CACHE TABLE $view AS SELECT id FROM $t")
checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))

sql(s"DROP TABLE $t")
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
}
}
}

test("Relation: basic") {
val t1 = "testcat.ns1.ns2.tbl"
withTable(t1) {
Expand Down

0 comments on commit 4934da5

Please sign in to comment.