diff --git a/core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala b/core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala new file mode 100644 index 00000000000..642164cb243 --- /dev/null +++ b/core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala @@ -0,0 +1,71 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.tables + +import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.delta.commands.OptimizeTableCommand +import org.apache.spark.sql.delta.util.AnalysisHelper + + +/** + * Builder class for constructing OPTIMIZE command and executing. + * + * @param sparkSession SparkSession to use for execution + * @param tableIdentifier Id of the table on which to + * execute the optimize + * @since 1.3.0 + */ +class DeltaOptimizeBuilder( + sparkSession: SparkSession, + tableIdentifier: String) extends AnalysisHelper { + private var partitionFilter: Option[String] = None + + /** + * Apply partition filter on this optimize command builder to limit + * the operation on selected partitions. + * @param partitionFilter The partition filter to apply + * @return [[DeltaOptimizeBuilder]] with partition filter applied + */ + def partitionFilter(partitionFilter: String): DeltaOptimizeBuilder = { + this.partitionFilter = Some(partitionFilter) + this + } + + /** + * Z-Order the data in the table using the given columns. + * @param columns Zero or more columns to order the data + * using Z-Order curves + * @return DataFrame containing the OPTIMIZE execution metrics + */ + def executeZOrderBy(columns: String *): DataFrame = { + throw new UnsupportedOperationException("Z ordering is not yet supported") + } + + /** + * Compact the small files in selected partitions. + * @return DataFrame containing the OPTIMIZE execution metrics + */ + def executeCompaction(): DataFrame = { + val tableId: TableIdentifier = sparkSession + .sessionState + .sqlParser + .parseTableIdentifier(tableIdentifier) + val optimize = OptimizeTableCommand(None, Some(tableId), partitionFilter) + toDataset(sparkSession, optimize) + } +} diff --git a/core/src/main/scala/io/delta/tables/DeltaTable.scala b/core/src/main/scala/io/delta/tables/DeltaTable.scala index 8aa82a12a34..fb94ce12cb7 100644 --- a/core/src/main/scala/io/delta/tables/DeltaTable.scala +++ b/core/src/main/scala/io/delta/tables/DeltaTable.scala @@ -182,6 +182,40 @@ class DeltaTable private[tables]( executeDelete(None) } + /** + * Optimize the data layout of the table. This returns + * a [[DeltaOptimizeBuilder]] object that can be used to specify + * the partition filter to limit the scope of optimize and + * also execute different optimization techniques such as file + * compaction or order data using Z-Order curves. + * + * See the [[DeltaOptimizeBuilder]] for a full description + * of this operation. + * + * Scala example to run file compaction on a subset of + * partitions in the table: + * {{{ + * deltaTable + * .optimize() + * .partitionFilter("date='2021-11-18'") + * .executeCompaction(); + * }}} + * + * Scala example to Z-Order data using given columns on a + * subset of partitions in the table: + * {{{ + * deltaTable + * .optimize() + * .partitionFilter("date='2021-11-18'") + * .executeZOrderBy("city", "state"); + * }}} + * + * @since 1.3.0 + */ + def optimize(): DeltaOptimizeBuilder = { + new DeltaOptimizeBuilder(sparkSession, + table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`")) + } /** * Update rows in the table based on the rules defined by `set`. diff --git a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala index 79f85d688a8..245944c2151 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/optimize/OptimizeCompactionSuite.scala @@ -21,6 +21,8 @@ import java.io.File import scala.collection.JavaConverters._ // scalastyle:off import.ordering.noEmptyLine +import io.delta.tables.DeltaTable + import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.hadoop.fs.Path @@ -42,6 +44,9 @@ trait OptimizeCompactionSuiteBase extends QueryTest import testImplicits._ + def executeOptimizeTable(table: String, condition: Option[String] = None) + def executeOptimizePath(path: String, condition: Option[String] = None) + test("optimize command: with database and table name") { withTempDir { tempDir => val dbName = "delta_db" @@ -56,7 +61,8 @@ trait OptimizeCompactionSuiteBase extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val versionBeforeOptimize = deltaLog.snapshot.version - spark.sql(s"OPTIMIZE $tableName") + executeOptimizeTable(tableName) + deltaLog.update() assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) checkDatasetUnorderly(spark.table(tableName).as[Int], 1, 2, 3, 4, 5, 6) @@ -74,7 +80,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val deltaLog = DeltaLog.forTable(spark, tempDir) val versionBeforeOptimize = deltaLog.snapshot.version - spark.sql(s"OPTIMIZE '${tempDir.getCanonicalPath}'") + executeOptimizePath(tempDir.getCanonicalPath) deltaLog.update() assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6) @@ -96,7 +102,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val e = intercept[AnalysisException] { // Should fail when predicate is on a non-partition column - spark.sql(s"OPTIMIZE '$path' WHERE value < 4") + executeOptimizePath(path, Some("value < 4")) } assert(e.getMessage.contains("Predicate references non-partition column 'value'. " + "Only the partition columns may be referenced: [id]")) @@ -128,7 +134,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest (0 to 1).foreach(partId => assert(fileListBefore.count(_.partitionValues === Map(id -> partId.toString)) > 1)) - spark.sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogAfter.startTransaction(); @@ -171,14 +177,13 @@ trait OptimizeCompactionSuiteBase extends QueryTest assert(fileListBefore.count(_.partitionValues === Map(id -> "0")) > 1) val versionBefore = deltaLogBefore.snapshot.version - spark.sql(s"OPTIMIZE '$path' WHERE id = 0") + executeOptimizePath(path, Some("id = 0")) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); val fileListAfter = txnAfter.filterFiles() assert(fileListBefore.length > fileListAfter.length) - // Optimized partition should contain only one file assert(fileListAfter.count(_.partitionValues === Map(id -> "0")) === 1) @@ -227,7 +232,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest assert(filesInEachPartitionBefore.keys.exists( _ === (partitionColumnPhysicalName, nullPartitionValue))) - spark.sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); @@ -274,7 +279,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest val fileCountInTestPartitionBefore = fileListBefore .count(_.partitionValues === Map[String, String](date -> "2017-10-10", part -> "3")) - spark.sql(s"OPTIMIZE '$path' WHERE date = '2017-10-10' and part = 3") + executeOptimizePath(path, Some("date = '2017-10-10' and part = 3")) val deltaLogAfter = DeltaLog.forTable(spark, path) val txnAfter = deltaLogBefore.startTransaction(); @@ -323,13 +328,41 @@ trait OptimizeCompactionSuiteBase extends QueryTest // block the first write until the second batch can attempt to write. BlockWritesLocalFileSystem.blockUntilConcurrentWrites(numPartitions) failAfter(60.seconds) { - sql(s"OPTIMIZE '$path'") + executeOptimizePath(path) } assert(deltaLog.snapshot.numOfFiles === numPartitions) // 1 file per partition } } } + /** + * Utility method to append the given data to the Delta table located at the given path. + * Optionally partitions the data. + */ + protected def appendToDeltaTable[T]( + data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = { + var df = data.repartition(1).write; + partitionColumns.map(columns => { + df = df.partitionBy(columns: _*) + }) + df.format("delta").mode("append").save(tablePath) + } +} + +/** + * Runs optimize compaction tests. + */ +class OptimizeCompactionSQLSuite extends OptimizeCompactionSuiteBase + with DeltaSQLCommandTest { + def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = { + val conditionClause = condition.map(c => s"WHERE $c").getOrElse("") + spark.sql(s"OPTIMIZE $table $conditionClause") + } + + def executeOptimizePath(path: String, condition: Option[String] = None): Unit = { + executeOptimizeTable(s"'$path'", condition) + } + test("optimize command: missing path") { val e = intercept[ParseException] { spark.sql(s"OPTIMIZE") @@ -350,29 +383,28 @@ trait OptimizeCompactionSuiteBase extends QueryTest } assert(e.getMessage.contains("OPTIMIZE")) } - - /** - * Utility method to append the given data to the Delta table located at the given path. - * Optionally partitions the data. - */ - protected def appendToDeltaTable[T]( - data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = { - var df = data.repartition(1).write; - partitionColumns.map(columns => { - df = df.partitionBy(columns: _*) - }) - df.format("delta").mode("append").save(tablePath) - } } -/** - * Runs optimize compaction tests. - */ -class OptimizeCompactionSuite extends OptimizeCompactionSuiteBase - with DeltaSQLCommandTest +class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase + with DeltaSQLCommandTest { + def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = { + if (condition.isDefined) { + DeltaTable.forName(table).optimize().partitionFilter(condition.get).executeCompaction() + } else { + DeltaTable.forName(table).optimize().executeCompaction() + } + } + def executeOptimizePath(path: String, condition: Option[String] = None): Unit = { + if (condition.isDefined) { + DeltaTable.forPath(path).optimize().partitionFilter(condition.get).executeCompaction() + } else { + DeltaTable.forPath(path).optimize().executeCompaction() + } + } +} -class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSuite +class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSQLSuite with DeltaColumnMappingEnableNameMode { override protected def runOnlyTests = Seq( "optimize command: on table with multiple partition columns",