Skip to content

Commit

Permalink
Use OptimizeTableCommand in Scala APi and unify tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kimahriman committed Feb 27, 2022
1 parent a4c315d commit b9e5c99
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 124 deletions.
14 changes: 9 additions & 5 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,28 @@ class DeltaTable private[tables](
}

/**
* Optimize data in the table that match the given `condition`.
* Optimize data in the table that match the given `condition`. The condition must only
* container filters on partition columns, otherwise an `AnalysisException` is thrown.
*
* @param condition Boolean SQL expression
*
* @since 1.2.0
*/
def optimize(condition: String): Unit = {
optimize(functions.expr(condition))
val tableId = table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`")
executeOptimize(tableId, Some(condition))
}

/**
* Optimize data in the table that match the given `condition`.
* Optimize data in the table that match the given `condition`. The condition must only
* container filters on partition columns, otherwise an `AnalysisException` is thrown.
*
* @param condition Boolean SQL expression
*
* @since 1.2.0
*/
def optimize(condition: Column): Unit = {
executeOptimize(Some(condition.expr))
optimize(condition.expr.sql)
}

/**
Expand All @@ -210,7 +213,8 @@ class DeltaTable private[tables](
* @since 1.2.0
*/
def optimize(): Unit = {
executeOptimize(None)
val tableId = table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`")
executeOptimize(tableId, None)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.delta.tables.execution
import scala.collection.Map

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, OptimizeExecutor, RestoreTableCommand, VacuumCommand}
import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, OptimizeExecutor, OptimizeTableCommand, RestoreTableCommand, VacuumCommand}
import org.apache.spark.sql.delta.util.AnalysisHelper
import io.delta.tables.DeltaTable

Expand Down Expand Up @@ -83,9 +83,15 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
sparkSession.emptyDataFrame
}

protected def executeOptimize(condition: Option[Expression]): DataFrame = {
val optimize = new OptimizeExecutor(sparkSession, deltaLog, condition.toSeq).optimize()
sparkSession.emptyDataFrame
protected def executeOptimize(
tblIdentifier: String,
condition: Option[String]): DataFrame = {
val tableId: TableIdentifier = sparkSession
.sessionState
.sqlParser
.parseTableIdentifier(tblIdentifier)
val optimize = OptimizeTableCommand(None, Some(tableId), condition)
toDataset(sparkSession, optimize)
}

protected def toStrColumnMap(map: Map[String, String]): Map[String, Column] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ case class OptimizeTableCommand(
// Parse the predicate expression into Catalyst expression and verify only simple filters
// on partition columns are present
val partitionPredicates = partitionPredicate.map(predicate => {
parsePredicates(sparkSession, predicate)
val predicates = parsePredicates(sparkSession, predicate)
verifyPartitionPredicates(
sparkSession,
deltaLog.snapshot.metadata.partitionColumns,
predicates)
predicates
}).getOrElse(Seq(Literal.TrueLiteral))

new OptimizeExecutor(sparkSession, deltaLog, partitionPredicates)
Expand All @@ -85,11 +90,6 @@ class OptimizeExecutor(
partitionPredicate: Seq[Expression])
extends DeltaCommand with SQLMetricsReporting with Serializable {

verifyPartitionPredicates(
sparkSession,
deltaLog.snapshot.metadata.partitionColumns,
partitionPredicate)

/** Timestamp to use in [[FileAction]] */
private val operationTimestamp = new SystemClock().getTimeMillis()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.optimize
import java.io.File

// scalastyle:off import.ordering.noEmptyLine
import io.delta.tables.DeltaTable
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.hadoop.fs.Path
Expand All @@ -28,7 +29,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{expr, lit}
import org.apache.spark.sql.test.SharedSparkSession

/**
Expand All @@ -39,6 +40,9 @@ trait OptimizeCompactionSuiteBase extends QueryTest

import testImplicits._

def executeOptimizeTable(table: String, condition: Option[String] = None)
def executeOptimizePath(path: String, condition: Option[String] = None)

test("optimize command: with database and table name") {
withTempDir { tempDir =>
val dbName = "delta_db"
Expand All @@ -53,7 +57,8 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE $tableName")
executeOptimizeTable(tableName)

deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(spark.table(tableName).as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -71,24 +76,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
spark.sql(s"OPTIMIZE '${tempDir.getCanonicalPath}'")
deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6)
}
}

test("optimize command via DeltaTable") {
withTempDir { tempDir =>
appendToDeltaTable(Seq(1, 2, 3).toDF(), tempDir.toString, partitionColumns = None)
appendToDeltaTable(Seq(4, 5, 6).toDF(), tempDir.toString, partitionColumns = None)

def data: DataFrame = spark.read.format("delta").load(tempDir.toString)

val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getPath)
val deltaLog = DeltaLog.forTable(spark, tempDir)
val versionBeforeOptimize = deltaLog.snapshot.version
deltaTable.optimize()
executeOptimizePath(tempDir.getCanonicalPath)
deltaLog.update()
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1)
checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6)
Expand All @@ -106,25 +94,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest

val e = intercept[AnalysisException] {
// Should fail when predicate is on a non-partition column
spark.sql(s"OPTIMIZE '$path' WHERE value < 4")
}
assert(e.getMessage.contains("Predicate references non-partition column 'value'. " +
"Only the partition columns may be referenced: [id]"))
}
}

test("optimize command via DeltaTable: predicate on non-partition column") {
withTempDir { tempDir =>
val path = new File(tempDir, "testTable").getCanonicalPath
val partitionColumns = Some(Seq("id"))
appendToDeltaTable(
Seq(1, 2, 3).toDF("value").withColumn("id", 'value % 2),
path,
partitionColumns)

val e = intercept[AnalysisException] {
// Should fail when predicate is on a non-partition column
io.delta.tables.DeltaTable.forPath(spark, path).optimize("value < 4")
executeOptimizePath(path, Some("value < 4"))
}
assert(e.getMessage.contains("Predicate references non-partition column 'value'. " +
"Only the partition columns may be referenced: [id]"))
Expand Down Expand Up @@ -154,7 +124,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
(0 to 1).foreach(partId =>
assert(fileListBefore.count(_.partitionValues === Map("id" -> partId.toString)) > 1))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogAfter.startTransaction();
Expand Down Expand Up @@ -195,54 +165,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(fileListBefore.count(_.partitionValues === Map("id" -> "0")) > 1)

val versionBefore = deltaLogBefore.snapshot.version
spark.sql(s"OPTIMIZE '$path' WHERE id = 0")

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
val fileListAfter = txnAfter.filterFiles()

assert(fileListBefore.length > fileListAfter.length)
// Optimized partition should contain only one file
assert(fileListAfter.count(_.partitionValues === Map("id" -> "0")) === 1)

// File counts in partitions that are not part of the OPTIMIZE should remain the same
assert(fileListAfter.count(_.partitionValues === Map("id" -> "1")) ===
fileListAfter.count(_.partitionValues === Map("id" -> "1")))

// version is incremented
assert(deltaLogAfter.snapshot.version === versionBefore + 1)

// data should remain the same after the OPTIMIZE
checkDatasetUnorderly(
spark.read.format("delta").load(path).select("value").as[Long],
(1L to 6L): _*)
}
}


test("optimize command via DeltaTable: on partitioned table - selected partitions") {
withTempDir { tempDir =>
val path = new File(tempDir, "testTable").getCanonicalPath
val partitionColumns = Some(Seq("id"))
appendToDeltaTable(
Seq(1, 2, 3).toDF("value").withColumn("id", 'value % 2),
path,
partitionColumns)

appendToDeltaTable(
Seq(4, 5, 6).toDF("value").withColumn("id", 'value % 2),
path,
partitionColumns)

val deltaLogBefore = DeltaLog.forTable(spark, path)
val txnBefore = deltaLogBefore.startTransaction();
val fileListBefore = txnBefore.filterFiles()

assert(fileListBefore.length >= 3)
assert(fileListBefore.count(_.partitionValues === Map("id" -> "0")) > 1)

val versionBefore = deltaLogBefore.snapshot.version
io.delta.tables.DeltaTable.forPath(spark, path).optimize("id = 0")
executeOptimizePath(path, Some("id = 0"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -294,7 +217,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
assert(filesInEachPartitionBefore.keys.exists(
_ === s"part=${ExternalCatalogUtils.DEFAULT_PARTITION_NAME}"))

spark.sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -337,7 +260,7 @@ trait OptimizeCompactionSuiteBase extends QueryTest
val fileCountInTestPartitionBefore = fileListBefore
.count(_.partitionValues === Map[String, String]("date" -> "2017-10-10", "part" -> "3"))

spark.sql(s"OPTIMIZE '$path' WHERE date = '2017-10-10' and part = 3")
executeOptimizePath(path, Some("date = '2017-10-10' and part = 3"))

val deltaLogAfter = DeltaLog.forTable(spark, path)
val txnAfter = deltaLogBefore.startTransaction();
Expand Down Expand Up @@ -386,13 +309,41 @@ trait OptimizeCompactionSuiteBase extends QueryTest
// block the first write until the second batch can attempt to write.
BlockWritesLocalFileSystem.blockUntilConcurrentWrites(numPartitions)
failAfter(60.seconds) {
sql(s"OPTIMIZE '$path'")
executeOptimizePath(path)
}
assert(deltaLog.snapshot.numOfFiles === numPartitions) // 1 file per partition
}
}
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests.
*/
class OptimizeCompactionSQLSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {
def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
val conditionClause = condition.map(c => s"WHERE $c").getOrElse("")
spark.sql(s"OPTIMIZE $table $conditionClause")
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
executeOptimizeTable(s"'$path'", condition)
}

test("optimize command: missing path") {
val e = intercept[ParseException] {
spark.sql(s"OPTIMIZE")
Expand All @@ -413,24 +364,23 @@ trait OptimizeCompactionSuiteBase extends QueryTest
}
assert(e.getMessage.contains("OPTIMIZE"))
}

/**
* Utility method to append the given data to the Delta table located at the given path.
* Optionally partitions the data.
*/
protected def appendToDeltaTable[T](
data: Dataset[T], tablePath: String, partitionColumns: Option[Seq[String]] = None): Unit = {
var df = data.repartition(1).write;
partitionColumns.map(columns => {
df = df.partitionBy(columns: _*)
})
df.format("delta").mode("append").save(tablePath)
}
}

/**
* Runs optimize compaction tests.
*/
class OptimizeCompactionSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest
class OptimizeCompactionScalaSuite extends OptimizeCompactionSuiteBase
with DeltaSQLCommandTest {
def executeOptimizeTable(table: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forName(table).optimize(expr(condition.get))
} else {
DeltaTable.forName(table).optimize()
}
}

def executeOptimizePath(path: String, condition: Option[String] = None): Unit = {
if (condition.isDefined) {
DeltaTable.forPath(path).optimize(expr(condition.get))
} else {
DeltaTable.forPath(path).optimize()
}
}
}

0 comments on commit b9e5c99

Please sign in to comment.