From 9017ac0d811c0a42ba8ac45720bddf06c8f17e63 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 13 Oct 2022 01:40:49 -0700 Subject: [PATCH] Allow schema pruning for delete first pass Resolves https://github.com/delta-io/delta/issues/1411 Re-orders the delete find files to rewrite command to put the empty project (input_file_name) before the non-determinstic filter udf. This allows for top level schema pruning and it Spark 3.4 should allow for nested pruning as well. Also updated the formatting (and `Column(InputFileName())` -> `input_file_name()`) to match Update. New UTs. Performance improvement on delete on data condition. Closes delta-io/delta#1412 Signed-off-by: Shixiong Zhu GitOrigin-RevId: abfee4cf9f8d8ffaef9e397ad9c237c576b8b807 --- .../sql/delta/commands/DeleteCommand.scala | 12 +++--- .../spark/sql/delta/DeleteSuiteBase.scala | 43 +++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala index 1ef4b22a9c3..badfbfe9664 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala @@ -26,13 +26,14 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.spark.SparkContext import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualNullSafe, Expression, If, InputFileName, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualNullSafe, Expression, If, Literal, Not} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, createTimingMetric} +import org.apache.spark.sql.functions.input_file_name import org.apache.spark.sql.types.LongType trait DeleteCommandMetrics { self: LeafRunnableCommand => @@ -215,11 +216,12 @@ case class DeleteCommand( if (candidateFiles.isEmpty) { Array.empty[String] } else { - data - .filter(new Column(cond)) + data.filter(new Column(cond)) + .select(input_file_name()) .filter(deletedRowUdf()) - .select(new Column(InputFileName())).distinct() - .as[String].collect() + .distinct() + .as[String] + .collect() } } diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala index c191b40fe3d..b080a3f661b 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala @@ -24,7 +24,10 @@ import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.functions.struct import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils abstract class DeleteSuiteBase extends QueryTest @@ -356,6 +359,46 @@ abstract class DeleteSuiteBase extends QueryTest assert(e4.contains("Subqueries are not supported")) } + test("schema pruning on data condition") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + append(input, Nil) + + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { + checkDelete(Some("key = 2"), + Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil) + } + + val scans = executedPlans.flatMap(_.collect { + case f: FileSourceScanExec => f + }) + + // The first scan is for finding files to delete. We only are matching against the key + // so that should be the only field in the schema + assert(scans.head.schema.findNestedField(Seq("key")).nonEmpty) + assert(scans.head.schema.findNestedField(Seq("value")).isEmpty) + } + + + test("nested schema pruning on data condition") { + val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value") + .select(struct("key", "value").alias("nested")) + append(input, Nil) + + val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) { + checkDelete(Some("nested.key = 2"), + Row(Row(1, 4)) :: Row(Row(1, 1)) :: Row(Row(0, 3)) :: Nil) + } + + val scans = executedPlans.flatMap(_.collect { + case f: FileSourceScanExec => f + }) + + // Currently nested schemas can't be pruned, but Spark 3.4 loosens some of the restrictions + // on non-determinstic expressions, and this should be pruned to just "nested STRUCT" + // after upgrading + assert(scans.head.schema == StructType.fromDDL("nested STRUCT")) + } + /** * @param function the unsupported function. * @param functionType The type of the unsupported expression to be tested.