Skip to content

Commit

Permalink
Add Scala API for optimize
Browse files Browse the repository at this point in the history
Add functions to `DeltaTable` to perform optimization.

API documentation:
```
  /**
   * Optimize the data layout of the table. This returns
   * a [[DeltaOptimizeBuilder]] object that can be used to specify
   * the partition filter to limit the scope of optimize and
   * also execute different optimization techniques such as file
   * compaction or order data using Z-Order curves.
   *
   * See the [[DeltaOptimizeBuilder]] for a full description
   * of this operation.
   *
   * Scala example to run file compaction on a subset of
   * partitions in the table:
   * {{{
   *    deltaTable
   *     .optimize()
   *     .where("date='2021-11-18'")
   *     .executeCompaction();
   * }}}
   *
   * @SInCE 1.3.0
   */
  def optimize(): DeltaOptimizeBuilder
```

```
/**
 * Builder class for constructing OPTIMIZE command and executing.
 *
 * @param sparkSession SparkSession to use for execution
 * @param tableIdentifier Id of the table on which to
 *        execute the optimize
 * @SInCE 1.3.0
 */
class DeltaOptimizeBuilder(
    sparkSession: SparkSession,
    tableIdentifier: String) extends AnalysisHelper {

  /**
   * Apply partition filter on this optimize command builder to limit
   * the operation on selected partitions.
   * @param partitionFilter The partition filter to apply
   * @return [[DeltaOptimizeBuilder]] with partition filter applied
   */
  def where(partitionFilter: String): DeltaOptimizeBuilder

  /**
   * Compact the small files in selected partitions.
   * @return DataFrame containing the OPTIMIZE execution metrics
   */
  def executeCompaction(): DataFrame
}
```

Closes delta-io#961
Fixes delta-io#960

Signed-off-by: Venki Korukanti <[email protected]>
GitOrigin-RevId: 615e215b96fb9e9b9223d3d2b429dc18dff102f4
  • Loading branch information
Kimahriman authored and jbguerraz committed Jul 6, 2022
1 parent acfdde1 commit 324bcb8
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 28 deletions.
77 changes: 77 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.tables

import org.apache.spark.sql.delta.commands.OptimizeTableCommand
import org.apache.spark.sql.delta.util.AnalysisHelper

import org.apache.spark.annotation.Unstable
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException

/**
* Builder class for constructing OPTIMIZE command and executing.
*
* @param sparkSession SparkSession to use for execution
* @param tableIdentifier Id of the table on which to
* execute the optimize
* @since 1.3.0
*/
class DeltaOptimizeBuilder private(
sparkSession: SparkSession,
tableIdentifier: String) extends AnalysisHelper {
@volatile private var partitionFilter: Option[String] = None

/**
* Apply partition filter on this optimize command builder to limit
* the operation on selected partitions.
* @param partitionFilter The partition filter to apply
* @return [[DeltaOptimizeBuilder]] with partition filter applied
*/
def where(partitionFilter: String): DeltaOptimizeBuilder = {
this.partitionFilter = Some(partitionFilter)
this
}

/**
* Compact the small files in selected partitions.
* @return DataFrame containing the OPTIMIZE execution metrics
*/
def executeCompaction(): DataFrame = {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tableIdentifier)
val optimize = OptimizeTableCommand(None, Some(tableId), partitionFilter)
toDataset(sparkSession, optimize)
}
}

private[delta] object DeltaOptimizeBuilder {
/**
* :: Unstable ::
*
* Private method for internal usage only. Do not call this directly.
*/
@Unstable
private[delta] def apply(
sparkSession: SparkSession,
tableIdentifier: String): DeltaOptimizeBuilder = {
new DeltaOptimizeBuilder(sparkSession, tableIdentifier)
}
}
25 changes: 25 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,31 @@ class DeltaTable private[tables](
executeDelete(None)
}

/**
* Optimize the data layout of the table. This returns
* a [[DeltaOptimizeBuilder]] object that can be used to specify
* the partition filter to limit the scope of optimize and
* also execute different optimization techniques such as file
* compaction or order data using Z-Order curves.
*
* See the [[DeltaOptimizeBuilder]] for a full description
* of this operation.
*
* Scala example to run file compaction on a subset of
* partitions in the table:
* {{{
* deltaTable
* .optimize()
* .where("date='2021-11-18'")
* .executeCompaction();
* }}}
*
* @since 1.3.0
*/
def optimize(): DeltaOptimizeBuilder = {
DeltaOptimizeBuilder(sparkSession,
table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`"))
}

/**
* Update rows in the table based on the rules defined by `set`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path
import io.delta.tables.DeltaTable

import org.scalatest.concurrent.TimeLimits.failAfter
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -43,6 +43,9 @@ trait OptimizeCompactionSuiteBase extends QueryTest

import testImplicits._

def executeOptimizeTable(table: String, condition: Option[String] = None)
def executeOptimizePath(path: String, condition: Option[String] = None)

test("optimize command: with database and table name") {
withTempDir { tempDir =>
val dbName = "delta_db"
Expand All @@ -57,7 +60,8 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE $tableName")
executeOptimizeTable(tableName)

deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(spark.table(tableName).as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -75,7 +79,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE '${tempDir.getCanonicalPath}'")
executeOptimizePath(tempDir.getCanonicalPath)
deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -97,7 +101,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val e = intercept[AnalysisException] {
// Should fail when predicate is on a non-partition column
spark.sql(s"OPTIMIZE '$path' WHERE value < 4")
executeOptimizePath(path, Some("value < 4"))
}
assert(e.getMessage.contains("Predicate references non-partition column 'value'. " +
"Only the partition columns may be referenced: [id]"))
Expand Down Expand Up @@ -129,7 +133,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
(0 to 1).foreach(partId =>
assert(fileListBefore.count(_.partitionValues === Map(id -> partId.toString)) > 1))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogAfter.startTransaction();
Expand Down Expand Up @@ -250,14 +254,13 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(fileListBefore.count(_.partitionValues === Map(id -> "0")) > 1)

val versionBefore = deltaLogBefore.snapshot.version
spark.sql(s"OPTIMIZE '$path' WHERE id = 0")
executeOptimizePath(path, Some("id = 0"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
val fileListAfter = txnAfter.filterFiles()

assert(fileListBefore.length > fileListAfter.length)

// Optimized partition should contain only one file
assert(fileListAfter.count(_.partitionValues === Map(id -> "0")) === 1)

Expand Down Expand Up @@ -306,7 +309,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(filesInEachPartitionBefore.keys.exists(
_ === (partitionColumnPhysicalName, nullPartitionValue)))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -353,7 +356,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
val fileCountInTestPartitionBefore = fileListBefore
.count(_.partitionValues === Map[String, String](date -> "2017-10-10", part -> "3"))

spark.sql(s"OPTIMIZE '$path' WHERE date = '2017-10-10' and part = 3")
executeOptimizePath(path, Some("date = '2017-10-10' and part = 3"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -402,13 +405,42 @@ trait OptimizeCompactionSuiteBase extends QueryTest
// block the first write until the second batch can attempt to write.
BlockWritesLocalFileSystem.blockUntilConcurrentWrites(numPartitions)
failAfter(60.seconds) {
sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)
}
assert(deltaLog.snapshot.numOfFiles === numPartitions) // 1 file per partition
}
}
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests using OPTIMIZE SQL
*/
class OptimizeCompactionSQLSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {

def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
val conditionClause = condition.map(c => s"WHERE $c").getOrElse("")
spark.sql(s"OPTIMIZE $table $conditionClause")
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
executeOptimizeTable(s"'$path'", condition)
}

test("optimize command: missing path") {
val e = intercept[ParseException] {
spark.sql(s"OPTIMIZE")
Expand All @@ -429,29 +461,33 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
assert(e.getMessage.contains("OPTIMIZE"))
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests.
* Runs optimize compaction tests using OPTIMIZE Scala APIs
*/
class OptimizeCompactionSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest
class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {
def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forName(table).optimize().where(condition.get).executeCompaction()
} else {
DeltaTable.forName(table).optimize().executeCompaction()
}
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forPath(path).optimize().where(condition.get).executeCompaction()
DeltaTable.forPath(path).optimize().where(condition.get).executeCompaction()
} else {
DeltaTable.forPath(path).optimize().executeCompaction()
}
}
}


class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSuite
class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSQLSuite
with DeltaColumnMappingEnableNameMode {
override protected def runOnlyTests = Seq(
"optimize command: on table with multiple partition columns",
Expand Down

0 comments on commit 324bcb8

Please sign in to comment.