From 6313782104a5c1fd12968efb4ba9a388458d9d5f Mon Sep 17 00:00:00 2001 From: Yingyi Bu Date: Sat, 14 Jan 2023 16:06:21 -0800 Subject: [PATCH] Refactor tests in DataSkippingDeltaTests GitOrigin-RevId: 924cc50af29f7f7c8522d061b9c7024f02dc43c3 --- .../delta/stats/DataSkippingDeltaTests.scala | 821 +++++++++--------- 1 file changed, 412 insertions(+), 409 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala b/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala index 4b2e5790c64..65fd6db84d2 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala @@ -629,61 +629,61 @@ trait DataSkippingDeltaTestsBase extends QueryTest } test("data skipping stats before and after optimize") { - val tempDir = Utils.createTempDir() - var r = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + val tempDir = Utils.createTempDir() + var r = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - val (numTuples, numFiles) = (10, 2) - val data = spark.range(0, numTuples, 1, 2).repartition(numFiles) - data.write.format("delta").save(r.dataPath.toString) - r = checkpointAndCreateNewLogIfNecessary(r) - def rStats: DataFrame = - getStatsDf(r, $"numRecords", $"minValues.id".as("id_min"), $"maxValues.id".as("id_max")) + val (numTuples, numFiles) = (10, 2) + val data = spark.range(0, numTuples, 1, 2).repartition(numFiles) + data.write.format("delta").save(r.dataPath.toString) + r = checkpointAndCreateNewLogIfNecessary(r) + def rStats: DataFrame = + getStatsDf(r, $"numRecords", $"minValues.id".as("id_min"), $"maxValues.id".as("id_max")) - checkAnswer(rStats, Seq(Row(4, 0, 8), Row(6, 1, 9))) - sql(s"OPTIMIZE '$tempDir'") - checkAnswer(rStats, Seq(Row(10, 0, 9))) + checkAnswer(rStats, Seq(Row(4, 0, 8), Row(6, 1, 9))) + sql(s"OPTIMIZE '$tempDir'") + checkAnswer(rStats, Seq(Row(10, 0, 9))) } test("number of indexed columns") { - val numTotalCols = defaultNumIndexedCols + 5 - val path = Utils.createTempDir().getCanonicalPath - var r = DeltaLog.forTable(spark, new Path(path)) - val data = spark.range(10).select(Seq.tabulate(numTotalCols)(i => lit(i) as s"col$i"): _*) - data.coalesce(1).write.format("delta").save(r.dataPath.toString) - - def checkNumIndexedCol(numIndexedCols: Int): Unit = { - if (defaultNumIndexedCols != numTotalCols) { - setNumIndexedColumns(r.dataPath.toString, numIndexedCols) - } - data.coalesce(1).write.format("delta").mode("overwrite").save(r.dataPath.toString) - r = checkpointAndCreateNewLogIfNecessary(r) - - if (numIndexedCols == 0) { - intercept[AnalysisException] { - getStatsDf(r, $"numRecords", $"minValues.col0").first() - } - } else if (numIndexedCols < numTotalCols) { - checkAnswer( - getStatsDf(r, $"numRecords", $"minValues.col${numIndexedCols - 1}"), - Seq(Row(10, numIndexedCols - 1))) - intercept[AnalysisException] { - getStatsDf(r, $"minValues.col$numIndexedCols").first() + val numTotalCols = defaultNumIndexedCols + 5 + val path = Utils.createTempDir().getCanonicalPath + var r = DeltaLog.forTable(spark, new Path(path)) + val data = spark.range(10).select(Seq.tabulate(numTotalCols)(i => lit(i) as s"col$i"): _*) + data.coalesce(1).write.format("delta").save(r.dataPath.toString) + + def checkNumIndexedCol(numIndexedCols: Int): Unit = { + if (defaultNumIndexedCols != numTotalCols) { + setNumIndexedColumns(r.dataPath.toString, numIndexedCols) } - } else { - checkAnswer( - getStatsDf(r, $"numRecords", $"minValues.col${numTotalCols - 1}"), - Seq(Row(10, numTotalCols - 1))) - intercept[AnalysisException] { - getStatsDf(r, $"minValues.col$numTotalCols").first() + data.coalesce(1).write.format("delta").mode("overwrite").save(r.dataPath.toString) + r = checkpointAndCreateNewLogIfNecessary(r) + + if (numIndexedCols == 0) { + intercept[AnalysisException] { + getStatsDf(r, $"numRecords", $"minValues.col0").first() + } + } else if (numIndexedCols < numTotalCols) { + checkAnswer( + getStatsDf(r, $"numRecords", $"minValues.col${numIndexedCols - 1}"), + Seq(Row(10, numIndexedCols - 1))) + intercept[AnalysisException] { + getStatsDf(r, $"minValues.col$numIndexedCols").first() + } + } else { + checkAnswer( + getStatsDf(r, $"numRecords", $"minValues.col${numTotalCols - 1}"), + Seq(Row(10, numTotalCols - 1))) + intercept[AnalysisException] { + getStatsDf(r, $"minValues.col$numTotalCols").first() + } } } - } - checkNumIndexedCol(defaultNumIndexedCols) - checkNumIndexedCol(numTotalCols - 1) - checkNumIndexedCol(numTotalCols) - checkNumIndexedCol(numTotalCols + 1) - checkNumIndexedCol(0) + checkNumIndexedCol(defaultNumIndexedCols) + checkNumIndexedCol(numTotalCols - 1) + checkNumIndexedCol(numTotalCols) + checkNumIndexedCol(numTotalCols + 1) + checkNumIndexedCol(0) } test("remove redundant stats column references in data skipping expression") { @@ -1030,178 +1030,179 @@ trait DataSkippingDeltaTestsBase extends QueryTest test("data skipping with missing columns in DataFrame", tableSchemaOnlyTag) { // case-1: dataframe schema has less columns than the dataSkippingNumIndexedCols - withTable("table") { - sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta PARTITIONED BY(b)") - val r = DeltaLog.forTable(spark, new TableIdentifier("table")) - // Only index the first three columns, excluding partition column b - setNumIndexedColumns(r.dataPath.toString, 3) - val dataSeq = Seq((1, 2, 3, 4, 5)) - - dataSeq.toDF("a", "b", "c", "d", "e") - .select("a", "b") // DataFrame schema order - .write.mode("append").format("delta") - .save(r.dataPath.toString) - - val hits = Seq( - // These values are in the table - "a = 1", - "b = 2", - "c <=> null", - "d is null", - // No stats for e - "e = 10" - ) - val misses = Seq( - "a = 10", - "b = 10", - "c = 10", - "c is not null", - "d = 10", - "isnotnull(d)" - ) - checkSkipping(r, hits, misses, dataSeq.toString(), false) - } - - // case-2: dataframe schema lacks columns that are supposed to be part of the stats schema, - // but has an additional column that should not collect stats on - withTable("table") { - sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta PARTITIONED BY(b)") - val r = DeltaLog.forTable(spark, new TableIdentifier("table")) - // Only index the first three columns, excluding partition column b - setNumIndexedColumns(r.dataPath.toString, 3) - val dataSeq = Seq((1, 2, 3, 4, 5)) - - dataSeq.toDF("a", "b", "c", "d", "e") - .select("a", "b", "d", "e") // DataFrame schema order - .write.mode("append").format("delta") - .save(r.dataPath.toString) + withTable("table") { + sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta PARTITIONED BY(b)") + val r = DeltaLog.forTable(spark, new TableIdentifier("table")) + // Only index the first three columns, excluding partition column b + setNumIndexedColumns(r.dataPath.toString, 3) + val dataSeq = Seq((1, 2, 3, 4, 5)) - val hits = Seq( - "a = 1", // In table - "isnull(c)", // In table - "e = 20" // No stats - ) - val misses = Seq( - "a = 20", - "b = 20", - "c = 20", - "d = 20", - "a = 20 and c = 20", - "a = 20 and e = 20" - ) - checkSkipping(r, hits, misses, dataSeq.toString(), false) - } + dataSeq.toDF("a", "b", "c", "d", "e") + .select("a", "b") // DataFrame schema order + .write.mode("append").format("delta") + .save(r.dataPath.toString) - // case-3: Structured data with some columns missing and some additional columns - withTempDir { dir => - val structureData = Seq( - Row(Row("James ", "", "Smith"), "36636", "M", 3100) - ) + val hits = Seq( + // These values are in the table + "a = 1", + "b = 2", + "c <=> null", + "d is null", + // No stats for e + "e = 10" + ) + val misses = Seq( + "a = 10", + "b = 10", + "c = 10", + "c is not null", + "d = 10", + "isnotnull(d)" + ) + checkSkipping(r, hits, misses, dataSeq.toString(), false) + } - val structureDataSchema = new StructType() - .add("name", new StructType() - .add("firstname", StringType) - .add("middlename", StringType) - .add("lastname", StringType)) - .add("id", StringType) - .add("gender", StringType) - .add("salary", IntegerType) + // case-2: dataframe schema lacks columns that are supposed to be part of the stats schema, + // but has an additional column that should not collect stats on + withTable("table") { + sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta PARTITIONED BY(b)") + val r = DeltaLog.forTable(spark, new TableIdentifier("table")) + // Only index the first three columns, excluding partition column b + setNumIndexedColumns(r.dataPath.toString, 3) + val dataSeq = Seq((1, 2, 3, 4, 5)) - val data = spark.createDataFrame( - spark.sparkContext.parallelize(structureData), structureDataSchema) + dataSeq.toDF("a", "b", "c", "d", "e") + .select("a", "b", "d", "e") // DataFrame schema order + .write.mode("append").format("delta") + .save(r.dataPath.toString) - data.write.partitionBy("id").format("delta").save(dir.getAbsolutePath) - // Only index the first three columns (unnested), excluding partition column id - val deltaLog = DeltaLog.forTable(spark, new Path(dir.getCanonicalPath)) - setNumIndexedColumns(deltaLog.dataPath.toString, 3) + val hits = Seq( + "a = 1", // In table + "isnull(c)", // In table + "e = 20" // No stats + ) + val misses = Seq( + "a = 20", + "b = 20", + "c = 20", + "d = 20", + "a = 20 and c = 20", + "a = 20 and e = 20" + ) + checkSkipping(r, hits, misses, dataSeq.toString(), false) + } - val structureDfData = Seq( - Row(2000, Row("Robert ", "Johnson"), "40000") - ) - val structureDfSchema = new StructType() - .add("salary", IntegerType) - .add("name", new StructType() - .add("firstname", StringType) - .add("lastname", StringType)) - .add("id", StringType) + // case-3: Structured data with some columns missing and some additional columns + withTempDir { dir => + val structureData = Seq( + Row(Row("James ", "", "Smith"), "36636", "M", 3100) + ) - // middlename is missing, but we collect NULL_COUNT for it - val df = spark.createDataFrame( - spark.sparkContext.parallelize(structureDfData), structureDfSchema) - df.write.mode("append").format("delta").save(dir.getAbsolutePath) + val structureDataSchema = new StructType() + .add("name", new StructType() + .add("firstname", StringType) + .add("middlename", StringType) + .add("lastname", StringType)) + .add("id", StringType) + .add("gender", StringType) + .add("salary", IntegerType) + + val data = spark.createDataFrame( + spark.sparkContext.parallelize(structureData), structureDataSchema) + + data.write.partitionBy("id").format("delta").save(dir.getAbsolutePath) + // Only index the first three columns (unnested), excluding partition column id + val deltaLog = DeltaLog.forTable(spark, new Path(dir.getCanonicalPath)) + setNumIndexedColumns(deltaLog.dataPath.toString, 3) + + val structureDfData = Seq( + Row(2000, Row("Robert ", "Johnson"), "40000") + ) + val structureDfSchema = new StructType() + .add("salary", IntegerType) + .add("name", new StructType() + .add("firstname", StringType) + .add("lastname", StringType)) + .add("id", StringType) + + // middlename is missing, but we collect NULL_COUNT for it + val df = spark.createDataFrame( + spark.sparkContext.parallelize(structureDfData), structureDfSchema) + df.write.mode("append").format("delta").save(dir.getAbsolutePath) - val hits = Seq( - "gender = 'M'", // No stats - "salary = 1000" // No stats - ) - val misses = Seq( - "name.firstname = 'Michael'", - "name.middlename = 'L'", - "name.lastname = 'Miller'", - "id = '10000'", - "name.firstname = 'Robert' and name.middlename = 'L'" - ) - checkSkipping(deltaLog, hits, misses, structureDfData.toString(), false) - } + val hits = Seq( + "gender = 'M'", // No stats + "salary = 1000" // No stats + ) + val misses = Seq( + "name.firstname = 'Michael'", + "name.middlename = 'L'", + "name.lastname = 'Miller'", + "id = '10000'", + "name.firstname = 'Robert' and name.middlename = 'L'" + ) + checkSkipping(deltaLog, hits, misses, structureDfData.toString(), false) + } - // case-4: dataframe schema does not have any columns within the first - // dataSkippingNumIndexedCols columns of the table schema - withTable("table") { - sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta") - val r = DeltaLog.forTable(spark, new TableIdentifier("table")) - // Only index the first three columns - setNumIndexedColumns(r.dataPath.toString, 3) - val dataSeq = Seq((1, 2, 3, 4, 5)) + // case-4: dataframe schema does not have any columns within the first + // dataSkippingNumIndexedCols columns of the table schema + withTable("table") { + sql("CREATE TABLE table (a Int, b Int, c Int, d Int, e Int) USING delta") + val r = DeltaLog.forTable(spark, new TableIdentifier("table")) + // Only index the first three columns + setNumIndexedColumns(r.dataPath.toString, 3) + val dataSeq = Seq((1, 2, 3, 4, 5)) - dataSeq.toDF("a", "b", "c", "d", "e") - .select("d", "e") // DataFrame schema order - .write.mode("append").format("delta") - .save(r.dataPath.toString) + dataSeq.toDF("a", "b", "c", "d", "e") + .select("d", "e") // DataFrame schema order + .write.mode("append").format("delta") + .save(r.dataPath.toString) - val hits = Seq( - "d = 40", // No stats - "e = 40" // No stats - ) - // We can still collect NULL_COUNT for a, b, and c - val misses = Seq( - "a = 40", - "b = 40", - "c = 40" - ) - checkSkipping(r, hits, misses, dataSeq.toString(), false) - } + val hits = Seq( + "d = 40", // No stats + "e = 40" // No stats + ) + // We can still collect NULL_COUNT for a, b, and c + val misses = Seq( + "a = 40", + "b = 40", + "c = 40" + ) + checkSkipping(r, hits, misses, dataSeq.toString(), false) + } - // case-5: The first dataSkippingNumIndexedCols columns of the table schema has map or array - // types, which we only collect NULL_COUNT - withTable("table") { - sql("CREATE TABLE table (a Int, b Map, c Array, d Int, e Int) USING delta") - val r = DeltaLog.forTable(spark, new TableIdentifier("table")) - // Only index the first three columns - setNumIndexedColumns(r.dataPath.toString, 3) - val dataSeq = Seq((1, Map("key" -> 2), Seq(3, 3, 3), 4, 5)) + // case-5: The first dataSkippingNumIndexedCols columns of the table schema has map or array + // types, which we only collect NULL_COUNT + withTable("table") { + sql("CREATE TABLE table (a Int, b Map, c Array, d Int, e Int)" + + " USING delta") + val r = DeltaLog.forTable(spark, new TableIdentifier("table")) + // Only index the first three columns + setNumIndexedColumns(r.dataPath.toString, 3) + val dataSeq = Seq((1, Map("key" -> 2), Seq(3, 3, 3), 4, 5)) - dataSeq.toDF("a", "b", "c", "d", "e") - .select("b", "c", "d") // DataFrame schema order - .write.mode("append").format("delta") - .save(r.dataPath.toString) + dataSeq.toDF("a", "b", "c", "d", "e") + .select("b", "c", "d") // DataFrame schema order + .write.mode("append").format("delta") + .save(r.dataPath.toString) - val hits = Seq( - "d = 50", // No stats - "e = 50", // No stats - // No min/max stats for c. We couldn't check = for b since EqualTo does not support - // ordering on type maP - "c = array(50, 50)", - // b and c should have NULL_COUNT stats, but currently they're not SkippingEligibleColumn - // (since they're not AtomicType), we couldn't skip for them - "isnull(b)", - "c is null" - ) - val misses = Seq( - // a has NULL_COUNT stats since it's missing from DataFrame schema - "a = 50" - ) - checkSkipping(r, hits, misses, dataSeq.toString(), false) - } + val hits = Seq( + "d = 50", // No stats + "e = 50", // No stats + // No min/max stats for c. We couldn't check = for b since EqualTo does not support + // ordering on type maP + "c = array(50, 50)", + // b and c should have NULL_COUNT stats, but currently they're not SkippingEligibleColumn + // (since they're not AtomicType), we couldn't skip for them + "isnull(b)", + "c is null" + ) + val misses = Seq( + // a has NULL_COUNT stats since it's missing from DataFrame schema + "a = 50" + ) + checkSkipping(r, hits, misses, dataSeq.toString(), false) + } } @@ -1248,150 +1249,150 @@ trait DataSkippingDeltaTestsBase extends QueryTest } test("data skipping by partitions and data values - nulls") { - val tableDir = Utils.createTempDir().getAbsolutePath - val dataSeqs = Seq( // each sequence produce a single file - Seq((null, null)), - Seq((null, "a")), - Seq((null, "b")), - Seq(("a", "a"), ("a", null)), - Seq(("b", null)) - ) - dataSeqs.foreach { seq => - seq.toDF("key", "value").coalesce(1) - .write.format("delta").partitionBy("key").mode("append").save(tableDir) - } - val allData = dataSeqs.flatten - - def checkResults( - predicate: String, - expResults: Seq[(String, String)], - expNumPartitions: Int, - expNumFiles: Long): Unit = - checkResultsWithPartitions(tableDir, predicate, expResults, expNumPartitions, expNumFiles) - - // Trivial base case - checkResults( - predicate = "True", - expResults = allData, - expNumPartitions = 3, - expNumFiles = 5) - - // Conditions on partition key - checkResults( - predicate = "key IS NULL", - expResults = allData.filter(_._1 == null), - expNumPartitions = 1, - expNumFiles = 3) // 3 files with key = null - - checkResults( - predicate = "key IS NOT NULL", - expResults = allData.filter(_._1 != null), - expNumPartitions = 2, - expNumFiles = 2) // 2 files with key = 'a', and 1 file with key = 'b' - - checkResults( - predicate = "key <=> NULL", - expResults = allData.filter(_._1 == null), - expNumPartitions = 1, - expNumFiles = 3) // 3 files with key = null - - checkResults( - predicate = "key = 'a'", - expResults = allData.filter(_._1 == "a"), - expNumPartitions = 1, - expNumFiles = 1) // 1 files with key = 'a' - - checkResults( - predicate = "key <=> 'a'", - expResults = allData.filter(_._1 == "a"), - expNumPartitions = 1, - expNumFiles = 1) // 1 files with key <=> 'a' - - checkResults( - predicate = "key = 'b'", - expResults = allData.filter(_._1 == "b"), - expNumPartitions = 1, - expNumFiles = 1) // 1 files with key = 'b' - - checkResults( - predicate = "key <=> 'b'", - expResults = allData.filter(_._1 == "b"), - expNumPartitions = 1, - expNumFiles = 1) // 1 files with key <=> 'b' - - // Conditions on partitions keys and values - checkResults( - predicate = "value IS NULL", - expResults = allData.filter(_._2 == null), - expNumPartitions = 3, - expNumFiles = 3) // files with all non-NULL values get skipped - - checkResults( - predicate = "value IS NOT NULL", - expResults = allData.filter(_._2 != null), - expNumPartitions = 2, // one of the partitions has no files left after data skipping - expNumFiles = 3) // files with all NULL values get skipped - - checkResults( - predicate = "value <=> NULL", - expResults = allData.filter(_._2 == null), - expNumPartitions = 3, - expNumFiles = 3) // same as IS NULL case above - - checkResults( - predicate = "value = 'a'", - expResults = allData.filter(_._2 == "a"), - expNumPartitions = 2, // one partition has no files left after data skipping - expNumFiles = 2) // only two files contain "a" - - checkResults( - predicate = "value <=> 'a'", - expResults = allData.filter(_._2 == "a"), - expNumPartitions = 2, // one partition has no files left after data skipping - expNumFiles = 2) // only two files contain "a" - - checkResults( - predicate = "value <> 'a'", - expResults = allData.filter(x => x._2 != "a" && x._2 != null), // i.e., only (null, b) - expNumPartitions = 1, - expNumFiles = 1) // only one file contains 'b' - - checkResults( - predicate = "value = 'b'", - expResults = allData.filter(_._2 == "b"), - expNumPartitions = 1, - expNumFiles = 1) // same as previous case - - checkResults( - predicate = "value <=> 'b'", - expResults = allData.filter(_._2 == "b"), - expNumPartitions = 1, - expNumFiles = 1) // same as previous case - - // Conditions on both, partition keys and values - checkResults( - predicate = "key IS NULL AND value = 'a'", - expResults = Seq((null, "a")), - expNumPartitions = 1, - expNumFiles = 1) // only one file in the partition has (*, "a") - - checkResults( - predicate = "key IS NOT NULL AND value IS NOT NULL", - expResults = Seq(("a", "a")), - expNumPartitions = 1, - expNumFiles = 1) // 1 file with (*, a) - - checkResults( - predicate = "key <=> NULL AND value <=> NULL", - expResults = Seq((null, null)), - expNumPartitions = 1, - expNumFiles = 1) // 3 files with key = null, but only 1 with val = null. - - checkResults( - predicate = "key <=> NULL OR value <=> NULL", - expResults = allData.filter(_ != (("a", "a"))), - expNumPartitions = 3, - expNumFiles = 5) // all 5 files + val tableDir = Utils.createTempDir().getAbsolutePath + val dataSeqs = Seq( // each sequence produce a single file + Seq((null, null)), + Seq((null, "a")), + Seq((null, "b")), + Seq(("a", "a"), ("a", null)), + Seq(("b", null)) + ) + dataSeqs.foreach { seq => + seq.toDF("key", "value").coalesce(1) + .write.format("delta").partitionBy("key").mode("append").save(tableDir) + } + val allData = dataSeqs.flatten + + def checkResults( + predicate: String, + expResults: Seq[(String, String)], + expNumPartitions: Int, + expNumFiles: Long): Unit = + checkResultsWithPartitions(tableDir, predicate, expResults, expNumPartitions, expNumFiles) + + // Trivial base case + checkResults( + predicate = "True", + expResults = allData, + expNumPartitions = 3, + expNumFiles = 5) + + // Conditions on partition key + checkResults( + predicate = "key IS NULL", + expResults = allData.filter(_._1 == null), + expNumPartitions = 1, + expNumFiles = 3) // 3 files with key = null + + checkResults( + predicate = "key IS NOT NULL", + expResults = allData.filter(_._1 != null), + expNumPartitions = 2, + expNumFiles = 2) // 2 files with key = 'a', and 1 file with key = 'b' + + checkResults( + predicate = "key <=> NULL", + expResults = allData.filter(_._1 == null), + expNumPartitions = 1, + expNumFiles = 3) // 3 files with key = null + + checkResults( + predicate = "key = 'a'", + expResults = allData.filter(_._1 == "a"), + expNumPartitions = 1, + expNumFiles = 1) // 1 files with key = 'a' + + checkResults( + predicate = "key <=> 'a'", + expResults = allData.filter(_._1 == "a"), + expNumPartitions = 1, + expNumFiles = 1) // 1 files with key <=> 'a' + + checkResults( + predicate = "key = 'b'", + expResults = allData.filter(_._1 == "b"), + expNumPartitions = 1, + expNumFiles = 1) // 1 files with key = 'b' + + checkResults( + predicate = "key <=> 'b'", + expResults = allData.filter(_._1 == "b"), + expNumPartitions = 1, + expNumFiles = 1) // 1 files with key <=> 'b' + + // Conditions on partitions keys and values + checkResults( + predicate = "value IS NULL", + expResults = allData.filter(_._2 == null), + expNumPartitions = 3, + expNumFiles = 3) // files with all non-NULL values get skipped + + checkResults( + predicate = "value IS NOT NULL", + expResults = allData.filter(_._2 != null), + expNumPartitions = 2, // one of the partitions has no files left after data skipping + expNumFiles = 3) // files with all NULL values get skipped + + checkResults( + predicate = "value <=> NULL", + expResults = allData.filter(_._2 == null), + expNumPartitions = 3, + expNumFiles = 3) // same as IS NULL case above + + checkResults( + predicate = "value = 'a'", + expResults = allData.filter(_._2 == "a"), + expNumPartitions = 2, // one partition has no files left after data skipping + expNumFiles = 2) // only two files contain "a" + + checkResults( + predicate = "value <=> 'a'", + expResults = allData.filter(_._2 == "a"), + expNumPartitions = 2, // one partition has no files left after data skipping + expNumFiles = 2) // only two files contain "a" + + checkResults( + predicate = "value <> 'a'", + expResults = allData.filter(x => x._2 != "a" && x._2 != null), // i.e., only (null, b) + expNumPartitions = 1, + expNumFiles = 1) // only one file contains 'b' + + checkResults( + predicate = "value = 'b'", + expResults = allData.filter(_._2 == "b"), + expNumPartitions = 1, + expNumFiles = 1) // same as previous case + + checkResults( + predicate = "value <=> 'b'", + expResults = allData.filter(_._2 == "b"), + expNumPartitions = 1, + expNumFiles = 1) // same as previous case + + // Conditions on both, partition keys and values + checkResults( + predicate = "key IS NULL AND value = 'a'", + expResults = Seq((null, "a")), + expNumPartitions = 1, + expNumFiles = 1) // only one file in the partition has (*, "a") + + checkResults( + predicate = "key IS NOT NULL AND value IS NOT NULL", + expResults = Seq(("a", "a")), + expNumPartitions = 1, + expNumFiles = 1) // 1 file with (*, a) + + checkResults( + predicate = "key <=> NULL AND value <=> NULL", + expResults = Seq((null, null)), + expNumPartitions = 1, + expNumFiles = 1) // 3 files with key = null, but only 1 with val = null. + + checkResults( + predicate = "key <=> NULL OR value <=> NULL", + expResults = allData.filter(_ != (("a", "a"))), + expNumPartitions = 3, + expNumFiles = 5) // all 5 files } // Note that we cannot use testSkipping here, because the JSON parsing bug we're working around @@ -1489,67 +1490,69 @@ trait DataSkippingDeltaTestsBase extends QueryTest } test("data skipping get specific files with Stats API") { - withTempDir { tempDir => - val tableDirPath = tempDir.getCanonicalPath - - val fileCount = 5 - // Create 5 files each having 1 row - x=1/x=2/x=3/x=4/x=5 - val data = spark.range(1, fileCount).toDF("x").repartition(fileCount, col("x")) - data.write.format("delta").save(tableDirPath) - - var deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - - // Get name of file corresponding to row x=1 - val file1 = getFilesRead(deltaLog, "x = 1").head.path - // Get name of file corresponding to row x=2 - val file2 = getFilesRead(deltaLog, "x = 2").head.path - // Get name of file corresponding to row x=3 - val file3 = getFilesRead(deltaLog, "x = 3").head.path - - deltaLog = checkpointAndCreateNewLogIfNecessary(deltaLog) - // Delete rows/files for x >= 3 from snapshot - sql(s"DELETE FROM delta.`$tableDirPath` WHERE x >= 3") - // Add another file with just one row x=6 in snapshot - sql(s"INSERT INTO delta.`$tableDirPath` VALUES (6)") - - // We want the file from the INSERT VALUES (6) stmt. However, this `getFilesRead` call might - // also return the AddFile (due to data file re-writes) from the DELETE stmt above. Since they - // were committed in different commits, we can select the addFile with the higher version - val addPathToCommitVersion = deltaLog.getChanges(0).flatMap { - case (version, actions) => actions - .collect { case a: AddFile => a } - .map(a => (a.path, version)) - }.toMap - - val file6 = getFilesRead(deltaLog, "x = 6") - .map(_.path) - .maxBy(path => addPathToCommitVersion(path)) - - // At this point, our latest snapshot has only 3 rows: x=1, x=2, x=6 - all in different files - - // Case-1: all passes files to the API exists in the snapshot - val result1 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file1, file2)) - .map(addFile => (addFile.path, addFile)).toMap - assert(result1.size == 2) - assert(result1.keySet == Set(file1, file2)) - assert(result1(file1).stats === expectedStatsForFile(1, "x", deltaLog)) - assert(result1(file2).stats === expectedStatsForFile(2, "x", deltaLog)) - - // Case-2: few passes files exists in the snapshot and few don't exists - val result2 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file1, file2, file3)) - .map(addFile => (addFile.path, addFile)).toMap - assert(result1 == result2) - - // Case-3: all passed files don't exists in the snapshot - val result3 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file3, "xyz")) - assert(result3.isEmpty) - - // Case-4: file3 doesn't exist and file6 exists in the latest commit - val result4 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file3, file6)) - .map(addFile => (addFile.path, addFile)).toMap - assert(result4.size == 1) - assert(result4(file6).stats == expectedStatsForFile(6, "x", deltaLog)) - } + withTempDir { tempDir => + val tableDirPath = tempDir.getCanonicalPath + + val fileCount = 5 + // Create 5 files each having 1 row - x=1/x=2/x=3/x=4/x=5 + val data = spark.range(1, fileCount).toDF("x").repartition(fileCount, col("x")) + data.write.format("delta").save(tableDirPath) + + var deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + + // Get name of file corresponding to row x=1 + val file1 = getFilesRead(deltaLog, "x = 1").head.path + // Get name of file corresponding to row x=2 + val file2 = getFilesRead(deltaLog, "x = 2").head.path + // Get name of file corresponding to row x=3 + val file3 = getFilesRead(deltaLog, "x = 3").head.path + + deltaLog = checkpointAndCreateNewLogIfNecessary(deltaLog) + // Delete rows/files for x >= 3 from snapshot + sql(s"DELETE FROM delta.`$tableDirPath` WHERE x >= 3") + // Add another file with just one row x=6 in snapshot + sql(s"INSERT INTO delta.`$tableDirPath` VALUES (6)") + + // We want the file from the INSERT VALUES (6) stmt. However, this `getFilesRead` call might + // also return the AddFile (due to data file re-writes) from the DELETE stmt above. Since + // they were committed in different commits, we can select the addFile with the higher + // version + val addPathToCommitVersion = deltaLog.getChanges(0).flatMap { + case (version, actions) => actions + .collect { case a: AddFile => a } + .map(a => (a.path, version)) + }.toMap + + val file6 = getFilesRead(deltaLog, "x = 6") + .map(_.path) + .maxBy(path => addPathToCommitVersion(path)) + + // At this point, our latest snapshot has only 3 rows: x=1, x=2, x=6 - all in + // different files + + // Case-1: all passes files to the API exists in the snapshot + val result1 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file1, file2)) + .map(addFile => (addFile.path, addFile)).toMap + assert(result1.size == 2) + assert(result1.keySet == Set(file1, file2)) + assert(result1(file1).stats === expectedStatsForFile(1, "x", deltaLog)) + assert(result1(file2).stats === expectedStatsForFile(2, "x", deltaLog)) + + // Case-2: few passes files exists in the snapshot and few don't exists + val result2 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file1, file2, file3)) + .map(addFile => (addFile.path, addFile)).toMap + assert(result1 == result2) + + // Case-3: all passed files don't exists in the snapshot + val result3 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file3, "xyz")) + assert(result3.isEmpty) + + // Case-4: file3 doesn't exist and file6 exists in the latest commit + val result4 = deltaLog.snapshot.getSpecificFilesWithStats(Seq(file3, file6)) + .map(addFile => (addFile.path, addFile)).toMap + assert(result4.size == 1) + assert(result4(file6).stats == expectedStatsForFile(6, "x", deltaLog)) + } } protected def parse(deltaLog: DeltaLog, predicate: String): Seq[Expression] = {