diff --git a/core/src/main/scala/io/delta/tables/DeltaTable.scala b/core/src/main/scala/io/delta/tables/DeltaTable.scala index 9455873d775..89ec3c52ecc 100644 --- a/core/src/main/scala/io/delta/tables/DeltaTable.scala +++ b/core/src/main/scala/io/delta/tables/DeltaTable.scala @@ -183,25 +183,28 @@ class DeltaTable private[tables]( } /** - * Optimize data in the table that match the given `condition`. + * Optimize data in the table that match the given `condition`. The condition must only + * container filters on partition columns, otherwise an `AnalysisException` is thrown. * * @param condition Boolean SQL expression * * @since 1.2.0 */ def optimize(condition: String): Unit = { - optimize(functions.expr(condition)) + val tableId = table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`") + executeOptimize(tableId, Some(condition)) } /** - * Optimize data in the table that match the given `condition`. + * Optimize data in the table that match the given `condition`. The condition must only + * container filters on partition columns, otherwise an `AnalysisException` is thrown. * * @param condition Boolean SQL expression * * @since 1.2.0 */ def optimize(condition: Column): Unit = { - executeOptimize(Some(condition.expr)) + optimize(condition.expr.sql) } /** @@ -210,7 +213,8 @@ class DeltaTable private[tables]( * @since 1.2.0 */ def optimize(): Unit = { - executeOptimize(None) + val tableId = table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`") + executeOptimize(tableId, None) } diff --git a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala index edcb552fb18..011ec30b1f9 100644 --- a/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala +++ b/core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala @@ -19,7 +19,7 @@ package io.delta.tables.execution import scala.collection.Map import org.apache.spark.sql.delta.DeltaLog -import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, OptimizeExecutor, RestoreTableCommand, VacuumCommand} +import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, OptimizeExecutor, OptimizeTableCommand, RestoreTableCommand, VacuumCommand} import org.apache.spark.sql.delta.util.AnalysisHelper import io.delta.tables.DeltaTable @@ -83,9 +83,15 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable => sparkSession.emptyDataFrame } - protected def executeOptimize(condition: Option[Expression]): DataFrame = { - val optimize = new OptimizeExecutor(sparkSession, deltaLog, condition.toSeq).optimize() - sparkSession.emptyDataFrame + protected def executeOptimize( + tblIdentifier: String, + condition: Option[String]): DataFrame = { + val tableId: TableIdentifier = sparkSession + .sessionState + .sqlParser + .parseTableIdentifier(tblIdentifier) + val optimize = OptimizeTableCommand(None, Some(tableId), condition) + toDataset(sparkSession, optimize) } protected def toStrColumnMap(map: Map[String, String]): Map[String, Column] = { diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 96e61f02169..425ea0b198a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -63,7 +63,12 @@ case class OptimizeTableCommand( // Parse the predicate expression into Catalyst expression and verify only simple filters // on partition columns are present val partitionPredicates = partitionPredicate.map(predicate => { - parsePredicates(sparkSession, predicate) + val predicates = parsePredicates(sparkSession, predicate) + verifyPartitionPredicates( + sparkSession, + deltaLog.snapshot.metadata.partitionColumns, + predicates) + predicates }).getOrElse(Seq(Literal.TrueLiteral)) new OptimizeExecutor(sparkSession, deltaLog, partitionPredicates) @@ -85,11 +90,6 @@ class OptimizeExecutor( partitionPredicate: Seq[Expression]) extends DeltaCommand with SQLMetricsReporting with Serializable { - verifyPartitionPredicates( - sparkSession, - deltaLog.snapshot.metadata.partitionColumns, - partitionPredicate) - /** Timestamp to use in [[FileAction]] */ private val operationTimestamp = new SystemClock().getTimeMillis() diff --git a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala index 3b090cfdc22..6d87dbd0a40 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.optimize import java.io.File // scalastyle:off import.ordering.noEmptyLine +import io.delta.tables.DeltaTable import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.hadoop.fs.Path @@ -28,7 +29,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{expr, lit} import org.apache.spark.sql.test.SharedSparkSession /** @@ -39,6 +40,9 @@ trait OptimizeCompactionSuiteBase extends QueryTest import testImplicits._ + def executeOptimizeTable(table: String, condition: Option[String] = None) + def executeOptimizePath(path: String, condition: Option[String] = None) + test("optimize command: with database and table name") { withTempDir { tempDir => val dbName = "delta_db" @@ -53,7 +57,8 @@ trait OptimizeCompactionSuiteBase extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val versionBeforeOptimize = deltaLog.snapshot.version - spark.sql(s"OPTIMIZE $tableName") + executeOptimizeTable(tableName) + deltaLog.update() assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) checkDatasetUnorderly(spark.table(tableName).as[Int], 1, 2, 3, 4, 5, 6) @@ -71,24 +76,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val versionBeforeOptimize = deltaLog.snapshot.version - spark.sql(s"OPTIMIZE '${tempDir.getCanonicalPath}'") - deltaLog.update() - assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) - checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6) - } - } - - test("optimize command via DeltaTable") { - withTempDir { tempDir => - appendToDeltaTable(Seq(1, 2, 3).toDF(), tempDir.toString, partitionColumns = None) - appendToDeltaTable(Seq(4, 5, 6).toDF(), tempDir.toString, partitionColumns = None) - - def data: DataFrame = spark.read.format("delta").load(tempDir.toString) - - val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getPath) - val deltaLog = DeltaLog.forTable(spark, tempDir) - val versionBeforeOptimize = deltaLog.snapshot.version - deltaTable.optimize() + executeOptimizePath(tempDir.getCanonicalPath) deltaLog.update() assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6) @@ -106,25 +94,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val e = intercept[AnalysisException] { // Should fail when predicate is on a non-partition column - spark.sql(s"OPTIMIZE '$path' WHERE value < 4") - } - assert(e.getMessage.contains("Predicate references non-partition column 'value'. " + - "Only the partition columns may be referenced: [id]")) - } - } - - test("optimize command via DeltaTable: predicate on non-partition column") { - withTempDir { tempDir => - val path = new File(tempDir, "testTable").getCanonicalPath - val partitionColumns = Some(Seq("id")) - appendToDeltaTable( - Seq(1, 2, 3).toDF("value").withColumn("id", 'value % 2), - path, - partitionColumns) - - val e = intercept[AnalysisException] { - // Should fail when predicate is on a non-partition column - io.delta.tables.DeltaTable.forPath(spark, path).optimize("value < 4") + executeOptimizePath(path, Some("value < 4")) } assert(e.getMessage.contains("Predicate references non-partition column 'value'. " + "Only the partition columns may be referenced: [id]")) @@ -154,7 +124,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest (0 to 1).foreach(partId => assert(fileListBefore.count(_.partitionValues === Map("id" -> partId.toString)) > 1)) - spark.sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogAfter.startTransaction(); @@ -195,54 +165,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest assert(fileListBefore.count(_.partitionValues === Map("id" -> "0")) > 1) val versionBefore = deltaLogBefore.snapshot.version - spark.sql(s"OPTIMIZE '$path' WHERE id = 0") - - val deltaLogAfter = DeltaLog.forTable(spark, path) - val txnAfter = deltaLogBefore.startTransaction(); - val fileListAfter = txnAfter.filterFiles() - - assert(fileListBefore.length > fileListAfter.length) - // Optimized partition should contain only one file - assert(fileListAfter.count(_.partitionValues === Map("id" -> "0")) === 1) - - // File counts in partitions that are not part of the OPTIMIZE should remain the same - assert(fileListAfter.count(_.partitionValues === Map("id" -> "1")) === - fileListAfter.count(_.partitionValues === Map("id" -> "1"))) - - // version is incremented - assert(deltaLogAfter.snapshot.version === versionBefore + 1) - - // data should remain the same after the OPTIMIZE - checkDatasetUnorderly( - spark.read.format("delta").load(path).select("value").as[Long], - (1L to 6L): _*) - } - } - - - test("optimize command via DeltaTable: on partitioned table - selected partitions") { - withTempDir { tempDir => - val path = new File(tempDir, "testTable").getCanonicalPath - val partitionColumns = Some(Seq("id")) - appendToDeltaTable( - Seq(1, 2, 3).toDF("value").withColumn("id", 'value % 2), - path, - partitionColumns) - - appendToDeltaTable( - Seq(4, 5, 6).toDF("value").withColumn("id", 'value % 2), - path, - partitionColumns) - - val deltaLogBefore = DeltaLog.forTable(spark, path) - val txnBefore = deltaLogBefore.startTransaction(); - val fileListBefore = txnBefore.filterFiles() - - assert(fileListBefore.length >= 3) - assert(fileListBefore.count(_.partitionValues === Map("id" -> "0")) > 1) - - val versionBefore = deltaLogBefore.snapshot.version - io.delta.tables.DeltaTable.forPath(spark, path).optimize("id = 0") + executeOptimizePath(path, Some("id = 0")) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); @@ -294,7 +217,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest assert(filesInEachPartitionBefore.keys.exists( _ === s"part=${ExternalCatalogUtils.DEFAULT_PARTITION_NAME}")) - spark.sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); @@ -337,7 +260,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val fileCountInTestPartitionBefore = fileListBefore .count(_.partitionValues === Map[String, String]("date" -> "2017-10-10", "part" -> "3")) - spark.sql(s"OPTIMIZE '$path' WHERE date = '2017-10-10' and part = 3") + executeOptimizePath(path, Some("date = '2017-10-10' and part = 3")) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); @@ -386,13 +309,41 @@ trait OptimizeCompactionSuiteBase extends QueryTest // block the first write until the second batch can attempt to write. BlockWritesLocalFileSystem.blockUntilConcurrentWrites(numPartitions) failAfter(60.seconds) { - sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) } assert(deltaLog.snapshot.numOfFiles === numPartitions) // 1 file per partition } } } + /** + * Utility method to append the given data to the Delta table located at the given path. + * Optionally partitions the data. + */ + protected def appendToDeltaTable[T]( + data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = { + var df = data.repartition(1).write; + partitionColumns.map(columns => { + df = df.partitionBy(columns: _*) + }) + df.format("delta").mode("append").save(tablePath) + } +} + +/** + * Runs optimize compaction tests. + */ +class OptimizeCompactionSQLSuite extends OptimizeCompactionSuiteBase + with DeltaSQLCommandTest { + def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = { + val conditionClause = condition.map(c => s"WHERE $c").getOrElse("") + spark.sql(s"OPTIMIZE $table $conditionClause") + } + + def executeOptimizePath(path: String, condition: Option[String] = None): Unit = { + executeOptimizeTable(s"'$path'", condition) + } + test("optimize command: missing path") { val e = intercept[ParseException] { spark.sql(s"OPTIMIZE") @@ -413,24 +364,23 @@ trait OptimizeCompactionSuiteBase extends QueryTest } assert(e.getMessage.contains("OPTIMIZE")) } - - /** - * Utility method to append the given data to the Delta table located at the given path. - * Optionally partitions the data. - */ - protected def appendToDeltaTable[T]( - data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = { - var df = data.repartition(1).write; - partitionColumns.map(columns => { - df = df.partitionBy(columns: _*) - }) - df.format("delta").mode("append").save(tablePath) - } } -/** - * Runs optimize compaction tests. - */ -class OptimizeCompactionSuite extends OptimizeCompactionSuiteBase - with DeltaSQLCommandTest +class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase + with DeltaSQLCommandTest { + def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = { + if (condition.isDefined) { + DeltaTable.forName(table).optimize(expr(condition.get)) + } else { + DeltaTable.forName(table).optimize() + } + } + def executeOptimizePath(path: String, condition: Option[String] = None): Unit = { + if (condition.isDefined) { + DeltaTable.forPath(path).optimize(expr(condition.get)) + } else { + DeltaTable.forPath(path).optimize() + } + } +}