Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Scala API for optimize #961

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.tables

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.commands.OptimizeTableCommand
import org.apache.spark.sql.delta.util.AnalysisHelper


/**
* Builder class for constructing OPTIMIZE command and executing.
*
* @param sparkSession SparkSession to use for execution
* @param tableIdentifier Id of the table on which to
* execute the optimize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@since 1.3.0

* @since 1.3.0
*/
class DeltaOptimizeBuilder(
sparkSession: SparkSession,
tableIdentifier: String) extends AnalysisHelper {
private var partitionFilter: Option[String] = None

/**
* Apply partition filter on this optimize command builder to limit
* the operation on selected partitions.
* @param partitionFilter The partition filter to apply
* @return [[DeltaOptimizeBuilder]] with partition filter applied
*/
def partitionFilter(partitionFilter: String): DeltaOptimizeBuilder = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Kimahriman, had an offline conversation with @tdas. He mentioned one very good point about keeping the names same as SQL. In SQL we have where to select partitions. Renaming this method to def where(partitionFilter: String) to keep it sync with the SQL.

We follow this pattern in other APIs as well. For example in Merge: SQL has WHEN MATCHED and in Scala/Python we have similar named method whenMatched.

Let me know if there are any concerns with the rename. I can make locally change and put it into the merge queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no problem with that, just let me know if you want me to change anything or if you'll handle it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Kimahriman, its ok I can make the change.

this.partitionFilter = Some(partitionFilter)
this
}

/**
* Z-Order the data in the table using the given columns.
* @param columns Zero or more columns to order the data
* using Z-Order curves
* @return DataFrame containing the OPTIMIZE execution metrics
*/
def executeZOrderBy(columns: String *): DataFrame = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Kimahriman, given that this API is not yet supported I think it is better to remove it. No need to make any changes, I will remove this before I put it into the merge queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's fine, wasn't sure whether to include it or not

throw new UnsupportedOperationException("Z ordering is not yet supported")
}

/**
* Compact the small files in selected partitions.
* @return DataFrame containing the OPTIMIZE execution metrics
*/
def executeCompaction(): DataFrame = {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tableIdentifier)
val optimize = OptimizeTableCommand(None, Some(tableId), partitionFilter)
toDataset(sparkSession, optimize)
}
}
34 changes: 34 additions & 0 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,40 @@ class DeltaTable private[tables](
executeDelete(None)
}

/**
* Optimize the data layout of the table. This returns
* a [[DeltaOptimizeBuilder]] object that can be used to specify
* the partition filter to limit the scope of optimize and
* also execute different optimization techniques such as file
* compaction or order data using Z-Order curves.
*
* See the [[DeltaOptimizeBuilder]] for a full description
* of this operation.
*
* Scala example to run file compaction on a subset of
* partitions in the table:
* {{{
* deltaTable
* .optimize()
* .partitionFilter("date='2021-11-18'")
* .executeCompaction();
* }}}
*
* Scala example to Z-Order data using given columns on a
* subset of partitions in the table:
* {{{
* deltaTable
* .optimize()
* .partitionFilter("date='2021-11-18'")
* .executeZOrderBy("city", "state");
* }}}
*
* @since 1.3.0
*/
def optimize(): DeltaOptimizeBuilder = {
new DeltaOptimizeBuilder(sparkSession,
table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`"))
}

/**
* Update rows in the table based on the rules defined by `set`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File
import scala.collection.JavaConverters._

// scalastyle:off import.ordering.noEmptyLine
import io.delta.tables.DeltaTable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path
Expand All @@ -42,6 +44,9 @@ trait OptimizeCompactionSuiteBase extends QueryTest

import testImplicits._

def executeOptimizeTable(table: String, condition: Option[String] = None)
def executeOptimizePath(path: String, condition: Option[String] = None)

test("optimize command: with database and table name") {
withTempDir { tempDir =>
val dbName = "delta_db"
Expand All @@ -56,7 +61,8 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE $tableName")
executeOptimizeTable(tableName)

deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(spark.table(tableName).as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -74,7 +80,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE '${tempDir.getCanonicalPath}'")
executeOptimizePath(tempDir.getCanonicalPath)
deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -96,7 +102,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val e = intercept[AnalysisException] {
// Should fail when predicate is on a non-partition column
spark.sql(s"OPTIMIZE '$path' WHERE value < 4")
executeOptimizePath(path, Some("value < 4"))
}
assert(e.getMessage.contains("Predicate references non-partition column 'value'. " +
"Only the partition columns may be referenced: [id]"))
Expand Down Expand Up @@ -128,7 +134,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
(0 to 1).foreach(partId =>
assert(fileListBefore.count(_.partitionValues === Map(id -> partId.toString)) > 1))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogAfter.startTransaction();
Expand Down Expand Up @@ -171,14 +177,13 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(fileListBefore.count(_.partitionValues === Map(id -> "0")) > 1)

val versionBefore = deltaLogBefore.snapshot.version
spark.sql(s"OPTIMIZE '$path' WHERE id = 0")
executeOptimizePath(path, Some("id = 0"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
val fileListAfter = txnAfter.filterFiles()

assert(fileListBefore.length > fileListAfter.length)

// Optimized partition should contain only one file
assert(fileListAfter.count(_.partitionValues === Map(id -> "0")) === 1)

Expand Down Expand Up @@ -227,7 +232,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(filesInEachPartitionBefore.keys.exists(
_ === (partitionColumnPhysicalName, nullPartitionValue)))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -274,7 +279,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
val fileCountInTestPartitionBefore = fileListBefore
.count(_.partitionValues === Map[String, String](date -> "2017-10-10", part -> "3"))

spark.sql(s"OPTIMIZE '$path' WHERE date = '2017-10-10' and part = 3")
executeOptimizePath(path, Some("date = '2017-10-10' and part = 3"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -323,13 +328,41 @@ trait OptimizeCompactionSuiteBase extends QueryTest
// block the first write until the second batch can attempt to write.
BlockWritesLocalFileSystem.blockUntilConcurrentWrites(numPartitions)
failAfter(60.seconds) {
sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)
}
assert(deltaLog.snapshot.numOfFiles === numPartitions) // 1 file per partition
}
}
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests.
*/
class OptimizeCompactionSQLSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {
def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
val conditionClause = condition.map(c => s"WHERE $c").getOrElse("")
spark.sql(s"OPTIMIZE $table $conditionClause")
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
executeOptimizeTable(s"'$path'", condition)
}

test("optimize command: missing path") {
val e = intercept[ParseException] {
spark.sql(s"OPTIMIZE")
Expand All @@ -350,29 +383,28 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
assert(e.getMessage.contains("OPTIMIZE"))
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests.
*/
class OptimizeCompactionSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest
class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {
def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forName(table).optimize().partitionFilter(condition.get).executeCompaction()
} else {
DeltaTable.forName(table).optimize().executeCompaction()
}
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forPath(path).optimize().partitionFilter(condition.get).executeCompaction()
} else {
DeltaTable.forPath(path).optimize().executeCompaction()
}
}
}

class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSuite
class OptimizeCompactionNameColumnMappingSuite extends OptimizeCompactionSQLSuite
with DeltaColumnMappingEnableNameMode {
override protected def runOnlyTests = Seq(
"optimize command: on table with multiple partition columns",
Expand Down