-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Scala API for optimize #961
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Kimahriman These APIs are useful and thank you for the PR
PR looks good. Left few comments.
* | ||
* @since 1.2.0 | ||
*/ | ||
def optimize(condition: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL equivalent of Optimize returns DataFrame containing the operation metrics. We can do the same here (look at the vacuum for examples).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know offhand the best way to convert the Seq[Row]
to a DataFrame? I mostly only use the Python API. There's a createDataFrame(rows: List[Row], schema: [StructType])
, not sure if there's a way to use the output
to create that schema or if I should just manually define that in the class as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it looks like executeVacuum
also just returns sparkSession.emptyDataFrame
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be done similar to executeGenerate
here. Instead of using the OptimizeExecutor
, use the DeltaCommand
implementation OptimizeTableCommand
. This also removes the changes where the partition filter has is moved from OptimizeTableCommand
to OptimizeExecutor
.
For vacuum API, please create an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't try to go that route initially because it seemed weird to call a function on DeltaTable that then has to generate the table ID to pass to another function just to it can re-look up the DeltaLog again. And same for the expression, I wanted to add passing a Column expression and it seemed weird to have to convert the Column back to SQL just so it can be parsed into an expression again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got that route working by creating the table identifier like the generate command does
} | ||
|
||
/** | ||
* Optimize data in the table that match the given `condition`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specify that condition should only contain the filters on partition columns, otherwise a [[AnalysisException]] is thrown. Same comment for the next API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the comment
assert(fileListBefore.count(_.partitionValues === Map("id" -> "0")) > 1) | ||
|
||
val versionBefore = deltaLogBefore.snapshot.version | ||
io.delta.tables.DeltaTable.forPath(spark, path).optimize("id = 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One suggestion to minimize the test code duplication.
Add an abstract method executeOptimize(path, condition)
to OptimizeCompactionSuiteBase
and modify all tests to call this method rather than invoking the SQL or Scala API directly.
Rename OptimizeCompactionSuite
to OptimizeCompactionSQLSuite
and implement executeOptimize(path, condition)
that issues a Optimize SQL command.
Add OptimizeCompactionScalaSuite
extending OptimizeCompactionSuite
and implement executeOptimize(path, condition)
that issues a Optimize Scala API call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try that out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got that working, made two funcs to handle path or table
6a9e456
to
b9e5c99
Compare
Hi @Kimahriman, updated changes look good. I am just thinking through the updates to these APIs when we add Z-order (#920). Give me a couple of days to think this through. |
HI @Kimahriman, Given that we have plan to support two types of optimization (file compaction and Z-Order) and may be more, here is the an API proposal based on builder pattern (similar to Merge APIs). It helps the construction of optimize operation easier, less number of APIs and extensible. Here is the POC (based on top of this PR). Let me know what you think.
|
Works for me. If you want to keep working it that's fine. I won't be able to work on this for a couple more weeks |
28f9360
to
d9f43ba
Compare
Got it working, let me know how it looks |
d9f43ba
to
91f2e0c
Compare
Not sure how the docs are supposed to work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Minor comments. Thanks for adding the APIs.
* | ||
* @param sparkSession [[SparkSession]] to use for execution | ||
* @param tableIdentifier Id of the table on which to | ||
* execute the optimize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@since 1.3.0
deltaLog.update() | ||
assert(deltaLog.snapshot.version === versionBeforeOptimize + 1) | ||
checkDatasetUnorderly(data.toDF().as[Int], 1, 2, 3, 4, 5, 6) | ||
|
||
// Make sure thread pool is shut down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah missed that in the rebase
* deltaTable | ||
* .optimize() | ||
* .partitionFilter("date='2021-11-18') | ||
* .executeZOrderBy("city", "); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Extra "
.
Check the readme here. |
I'm not sure how I'm supposed to fix
I see other places use |
Looking at the docs, it looks like this never worked for the APIs we are publishing API docs. It must be because the referenced class is another jar (Spark in this case) for which we are not generating any docs. |
* using Z-Order curves | ||
* @return DataFrame containing the OPTIMIZE execution metrics | ||
*/ | ||
def executeZOrderBy(columns: String *): DataFrame = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Kimahriman, given that this API is not yet supported I think it is better to remove it. No need to make any changes, I will remove this before I put it into the merge queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's fine, wasn't sure whether to include it or not
f223755
to
ff2c5e9
Compare
* @param partitionFilter The partition filter to apply | ||
* @return [[DeltaOptimizeBuilder]] with partition filter applied | ||
*/ | ||
def partitionFilter(partitionFilter: String): DeltaOptimizeBuilder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Kimahriman, had an offline conversation with @tdas. He mentioned one very good point about keeping the names same as SQL. In SQL we have where
to select partitions. Renaming this method to def where(partitionFilter: String)
to keep it sync with the SQL.
We follow this pattern in other APIs as well. For example in Merge: SQL has WHEN MATCHED
and in Scala/Python we have similar named method whenMatched
.
Let me know if there are any concerns with the rename. I can make locally change and put it into the merge queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no problem with that, just let me know if you want me to change anything or if you'll handle it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Kimahriman, its ok I can make the change.
Add functions to `DeltaTable` to perform optimization. API documentation: ``` /** * Optimize the data layout of the table. This returns * a [[DeltaOptimizeBuilder]] object that can be used to specify * the partition filter to limit the scope of optimize and * also execute different optimization techniques such as file * compaction or order data using Z-Order curves. * * See the [[DeltaOptimizeBuilder]] for a full description * of this operation. * * Scala example to run file compaction on a subset of * partitions in the table: * {{{ * deltaTable * .optimize() * .where("date='2021-11-18'") * .executeCompaction(); * }}} * * @SInCE 1.3.0 */ def optimize(): DeltaOptimizeBuilder ``` ``` /** * Builder class for constructing OPTIMIZE command and executing. * * @param sparkSession SparkSession to use for execution * @param tableIdentifier Id of the table on which to * execute the optimize * @SInCE 1.3.0 */ class DeltaOptimizeBuilder( sparkSession: SparkSession, tableIdentifier: String) extends AnalysisHelper { /** * Apply partition filter on this optimize command builder to limit * the operation on selected partitions. * @param partitionFilter The partition filter to apply * @return [[DeltaOptimizeBuilder]] with partition filter applied */ def where(partitionFilter: String): DeltaOptimizeBuilder /** * Compact the small files in selected partitions. * @return DataFrame containing the OPTIMIZE execution metrics */ def executeCompaction(): DataFrame } ``` Closes delta-io#961 Fixes delta-io#960 Signed-off-by: Venki Korukanti <[email protected]> GitOrigin-RevId: 615e215b96fb9e9b9223d3d2b429dc18dff102f4
Add functions to `DeltaTable` to perform optimization. API documentation: ``` /** * Optimize the data layout of the table. This returns * a [[DeltaOptimizeBuilder]] object that can be used to specify * the partition filter to limit the scope of optimize and * also execute different optimization techniques such as file * compaction or order data using Z-Order curves. * * See the [[DeltaOptimizeBuilder]] for a full description * of this operation. * * Scala example to run file compaction on a subset of * partitions in the table: * {{{ * deltaTable * .optimize() * .where("date='2021-11-18'") * .executeCompaction(); * }}} * * @SInCE 1.3.0 */ def optimize(): DeltaOptimizeBuilder ``` ``` /** * Builder class for constructing OPTIMIZE command and executing. * * @param sparkSession SparkSession to use for execution * @param tableIdentifier Id of the table on which to * execute the optimize * @SInCE 1.3.0 */ class DeltaOptimizeBuilder( sparkSession: SparkSession, tableIdentifier: String) extends AnalysisHelper { /** * Apply partition filter on this optimize command builder to limit * the operation on selected partitions. * @param partitionFilter The partition filter to apply * @return [[DeltaOptimizeBuilder]] with partition filter applied */ def where(partitionFilter: String): DeltaOptimizeBuilder /** * Compact the small files in selected partitions. * @return DataFrame containing the OPTIMIZE execution metrics */ def executeCompaction(): DataFrame } ``` Closes delta-io#961 Fixes delta-io#960 Signed-off-by: Venki Korukanti <[email protected]> GitOrigin-RevId: 615e215b96fb9e9b9223d3d2b429dc18dff102f4
Resolves #960
Add functions to
DeltaTable
to perform optimization. Lightweight wrapper that just calls the OptimizeExecutor directly. All the commands seem to be implemented slightly differently (some return an empty DF, some return the actual result of something), so wasn't sure which route to go. Just had to move the partition condition checking into the executor. Also added a few duplicate tests using the DeltaTable API instead of SQL.