Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Implement optimized write. #2145

Closed
wants to merge 7 commits into from

Conversation

weiluo-db
Copy link
Contributor

@weiluo-db weiluo-db commented Oct 6, 2023

[Spark] Implement optimized write.

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Optimized write is an optimization that repartitions and rebalances data before writing them out to a Delta table. Optimized writes improve file size as data is written and benefit subsequent reads on the table.

This PR introduces a new DeltaOptimizedWriterExec exec node. It's responsible for executing the shuffle (HashPartitioning based on the table's partition columns) and rebalancing afterwards. More specifically, the number of shuffle partitions is controlled by two new knobs:

  • spark.databricks.delta.optimizeWrite.numShuffleBlocks (default=50,000,000), which controls "maximum number of shuffle blocks to target";
  • spark.databricks.delta.optimizeWrite.maxShufflePartitions (default=2,000), which controls "max number of output buckets (reducers) that can be used by optimized writes".

After repartitioning, the blocks are then sorted in ascending order by size and bin-packed into appropriately-sized bins for output tasks. The bin size is controlled by the following new knob:

  • spark.databricks.delta.optimizeWrite.binSize (default=512MiB).

Note that this knob is based on the in-memory size of row-based shuffle blocks. So the final output Parquet size is usually smaller than the bin size due to column-based encoding and compression.

The whole optimized write feature can be controlled in the following ways, in precedence order from high to low (i.e. each option takes precedence over any successive ones):

  1. The optimizeWrite Delta option in DataFrameWriter (default=None), e.g. spark.range(0, 100).toDF().write.format("delta").option("optimizedWrite", "true").save(...);
  2. The spark.databricks.delta.optimizeWrite.enabled Spark session setting (default=None).
  3. The delta.autoOptimize.optimizeWrite table property (default=None);

Optimized write is DISABLED by default.

Fixes #1158

How was this patch tested?

Unit tests: OptimizedWritesSuite andBinPackingUtilsSuite.

Does this PR introduce any user-facing changes?

Yes. Please see the Description for details.

@felipepessoto
Copy link
Contributor

@weiluo-db there is a PR open waiting for review: #1198

@weiluo-db
Copy link
Contributor Author

@weiluo-db there is a PR open waiting for review: #1198

@felipepessoto This PR's approach to Optimized Write is more well tested in production environments.

@@ -228,7 +228,9 @@ abstract class MergeIntoCommandBase extends LeafRunnableCommand
txn: OptimisticTransaction,
outputDF: DataFrame): Seq[FileAction] = {
val partitionColumns = txn.metadata.partitionColumns
if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
// If the write will be optimized write, which shuffles the data anyway, then don't repartition.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you mention in the comment that Optimized write will handle both cases of splitting the task when its very large and combining tasks when they are very small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

@@ -348,6 +354,7 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl
def writeFiles(
inputData: Dataset[_],
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you clarify why we need this new isOptimize flag now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments.

@@ -449,4 +462,27 @@ trait TransactionalWrite extends DeltaLogging { self: OptimisticTransactionImpl

resultFiles.toSeq ++ committer.changeFiles
}

/**
* Optimized writes can be enabled/disabled through the following order:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense, I guess the intention is to have an explicit way to turn on/off for every write/table/session in that order

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for changing the precendence

* @param partitionColumns The partition columns of the table. Used for hash partitioning the write
* @param deltaLog The DeltaLog for the table. Used for logging only
*/
case class DeltaOptimizedWriterExec(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I insepect the plan is this what will show up in the write node ?

.doc("Maximum number of shuffle blocks to target for the adaptive shuffle " +
"in optimized writes.")
.intConf
.createWithDefault(50000000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason for choosing these defaults

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained in the config doc, optimizeWrite.maxShufflePartitions must be not be larger than spark.shuffle.minNumPartitionsToHighlyCompress, which is 2000 by default. And then optimizeWrite.numShuffleBlocks is set high enough to produce sufficient number of partitions (while still being limited by optimizeWrite.maxShufflePartitions).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

Copy link
Collaborator

@rahulsmahadev rahulsmahadev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution, left a few questions

@weiluo-db
Copy link
Contributor Author

Thanks for this contribution, left a few questions

Thanks for the review. PTAL.

@weiluo-db weiluo-db changed the title [SPARK] Implement optimized write. [Spark] Implement optimized write. Oct 10, 2023
@rasidhan
Copy link

Why is this effort being duplicated and being released in such a hurry when there is an existing PR that has been pending review for 16 months #1198 and has been on the roadmap #1307 too?

@rasidhan rasidhan mentioned this pull request Oct 10, 2023
1 task
// We want table properties to take precedence over the session/default conf.
DeltaConfigs.OPTIMIZE_WRITE
.fromMetaData(metadata)
.orElse(sessionConf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When session config is set, shouldn't it have precedence over table properties?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn’t check Databricks, do you know what is the behavior?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

@tdas
Copy link
Contributor

tdas commented Oct 16, 2023

Hello @rasidhan, apologies for the delay in responding to you. First and foremost, I would like to sincerely thank @sezruby for their contributions and dedication to the project. I absolutely recognize the time and energy that goes into making such contributions.

In evaluating #1198, we did review the logic and we did consider various technical factors to contrast it against our implementation. We have been evaluating our implementation of optimized write in various production workloads for many months. So while the proposed implementation in this PR may seem sudden, this implementation has been battle-tested under a wide variety of data scale and cluster configurations.

I can assure you this does not diminish the value of the work you've done. The Delta project is genuinely grateful for your understanding and we hope to better our communication and collaboration processes in the future to prevent such instances.

assert(BinPackingUtils.binPackBySize(input, (x: Int) => x, (x: Int) => x, binSize) == expect)
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove line

writeOptions.flatMap(_.optimizeWrite)
.getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove line

private def getShuffleRDD: ShuffledRowRDD = {
if (cachedShuffleRDD == null) {
val resolver = org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
val saltedPartitioning = HashPartitioning(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for choosing HashPartitioning over other schemes ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches table partition. For unpartitioned tables, it shouldn't really matter that much. But HashPartitioning should be more robust to certain failure cases, e.g. https://issues.apache.org/jira/browse/SPARK-38388.

Copy link
Collaborator

@rahulsmahadev rahulsmahadev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few more comments

@rahulsmahadev
Copy link
Collaborator

can you retrigger the CI, looks like a flaky test runner

@felipepessoto
Copy link
Contributor

@rahulsmahadev do you know how to retrigger CI? I'm asking because I have another PR open with same test issues. The errors don't seem flaky tho. It seems more consistently failing.

@rahulsmahadev
Copy link
Collaborator

@felipepessoto I just create an empty commit usually git commit –allow-empty . cc: @scottsand-db if you are aware of a better way to retrigger CI

Copy link
Collaborator

@rahulsmahadev rahulsmahadev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@tdas tdas added this to the 3.1.0 milestone Nov 1, 2023
@tdas tdas closed this in fcfd440 Nov 14, 2023
@Kimahriman
Copy link
Contributor

How is this supposed to work for large writes (large in this case being several TiB or more)? I'm seeing very skewed partitions writing data testing this out. First, my reducer tasks seem to be capped at maxShufflePartitions, even though it looked like that config is just used to determine how many reducers to pretend will exist during the map phase, and then the computeBins combines individual map outputs from there (which could end up being more or less than the initial pretend reducer count). Is that not how this works?

When I increase the maxShufflePartitions (and highly compressed shuffle config) large enough, it seems to combine some map outputs, but I still get incredibly skewed reducing tasks, reading anywhere from < 100 MiB of data up to several GiB of the shuffle data.

When I do the exact same job with #1198 (which I've been using in production for for over a year now), I get much more evenly distributed reducer tasks (reading ~750 MiB of shuffle data, 750 because of the parquet compression ratio it assumes)

@weiluo-db
Copy link
Contributor Author

@Kimahriman The number of a reducers is controlled by both numShuffleBlocks and maxShufflePartitions, as you can see here: https://github.com/delta-io/delta/pull/2145/files#diff-90671ab8774fd49ade989674814a12d06081192e33e559c955f4c26e6733a221R73-R78.

How many mappers do you have in your test? When you run the same job with #1198, what bin size do you use, and how many reducers do you get?

It'll be super helpful if you could also paste the full metrics from the DeltaOptimizedWriterExec node here. Thanks!

@Kimahriman
Copy link
Contributor

The number of a reducers is controlled by both numShuffleBlocks and maxShufflePartitions

My understanding is that the numShuffleBlocks is like the total mappers * total reducers. So if you have say 1k mappers, you'll get min(numShuffleBlocks / 1k, maxShufflePartitions) reducers, which with default settings will also use maxShufflePartitions until you have more than 25k mappers. And the total mappers * total reducers is effectively the chunks that are then combined into "bins", so if you have 1k mappers and 2k reducers, you have 2 million shuffle blocks to work with to combine into similarly sized bins. Is that all correct?

How many mappers do you have in your test? When you run the same job with #1198, what bin size do you use, and how many reducers do you get?

There were about ~4500 mappers writing roughly 2.3 TiB of shuffle data. With the default settings in this PR, I ended up with exactly 2000 reducing tasks. I tried increasing maxShufflePartitions to 10k and ended up with ~9870 reducers, some of which were reading up to 3+ GiB of shuffle data. The resulting data is pretty skewed as far as output partitions.

For the other PR, we also have binSize set to 512 MiB. The same job resulted in ~20k reducing tasks. The one difference here is that we actually have the spark.sql.shuffle.partitions set to 20k, which is what is used as the equivalent maxShufflePartitions. I didn't try setting maxShufflePartitions to 20k to match to see how they compared. But with 4500 mappers I still would assume it could be split pretty evenly (4500 * 2000 = 9 million shuffle blocks to work with right?). We did end up with a lot of small partitions, but the largest reducers didn't read more than 1 GiB of shuffle data.

It'll be super helpful if you could also paste the full metrics from the DeltaOptimizedWriterExec node here. Thanks!

Don't have those right now unfortunately (and probably can't get until next week). Was going to try adding some logging to to understand what was going on too.

@weiluo-db
Copy link
Contributor Author

With the default settings in this PR, I ended up with exactly 2000 reducing tasks.

BTW, did you ever try running with the default 2k reducers? If so, how skewed was it compared to the other runs? I can only speculate that we somehow didn't get accurate stats about the shuffle blocks (e.g. HighlyCompressedMapStatus). Perhaps try raising the reducers to 20k, which could allow a more leveled comparison.

Any additional metrics and logs would definitely help. Thanks!

@Kimahriman
Copy link
Contributor

BTW, did you ever try running with the default 2k reducers? If so, how skewed was it compared to the other runs? I can only speculate that we somehow didn't get accurate stats about the shuffle blocks (e.g. HighlyCompressedMapStatus). Perhaps try raising the reducers to 20k, which could allow a more leveled comparison.

Yeah the first attempt was with the default 2k reducers and I got 2k reducing tasks exactly, which is odd because I expected the partitions to get split due to the large size, but that didn't seem to happen at all. I have the HighlyCompressedMapStatus config set to 100k to make sure it doesn't become an issue (realized that when initially testing out the other PR way back when).

Any additional metrics and logs would definitely help. Thanks!

I should get a chance to do some more testing tomorrow (and added a few more log statements as well).

Kimahriman pushed a commit to Kimahriman/delta that referenced this pull request Nov 21, 2023
-Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

Optimized write is an optimization that repartitions and rebalances data before writing them out to a Delta table. Optimized writes improve file size as data is written and benefit subsequent reads on the table.

This PR introduces a new `DeltaOptimizedWriterExec` exec node. It's responsible for executing the shuffle (`HashPartitioning` based on the table's partition columns) and rebalancing afterwards. More specifically, the number of shuffle partitions is controlled by two new knobs:

- `spark.databricks.delta.optimizeWrite.numShuffleBlocks` (default=50,000,000), which controls "maximum number of shuffle blocks to target";
- `spark.databricks.delta.optimizeWrite.maxShufflePartitions` (default=2,000), which controls "max number of output buckets (reducers) that can be used by optimized writes".

After repartitioning, the blocks are then sorted in ascending order by size and bin-packed into appropriately-sized bins for output tasks. The bin size is controlled by the following new knob:

- `spark.databricks.delta.optimizeWrite.binSize` (default=512MiB).

Note that this knob is based on the in-memory size of row-based shuffle blocks. So the final output Parquet size is usually smaller than the bin size due to column-based encoding and compression.

The whole optimized write feature can be controlled in the following ways, in precedence order from high to low (i.e. each option takes precedence over any successive ones):

1. The `optimizeWrite` Delta option in DataFrameWriter (default=None), e.g. `spark.range(0, 100).toDF().write.format("delta").option("optimizedWrite", "true").save(...)`;
2. The `spark.databricks.delta.optimizeWrite.enabled` Spark session setting (default=None).
3. The `delta.autoOptimize.optimizeWrite` table property (default=None);

Optimized write is **DISABLED** by default.

Closes delta-io#2145 <!-- when there is a corresponding PR created on
                                https://github.com/delta-io/delta repository -->

GitOrigin-RevId: f76f96d7a94fddab027bfa512d223b12ab3dd681
@Kimahriman
Copy link
Contributor

Figured out what the issue was. The original PR treats binSize as a byte value, likely based on the Databricks docs that treat "targetFileSize" as a byte value: https://docs.databricks.com/en/delta/tune-file-size.html#set-a-target-file-size. So I had my bin size set to 512 * 1024 * 1024, which is being interpreted as that many megabytes, not bytes. I think most Spark settings are interpreted as bytes by default, only a few things aren't. Removing that setting fixed things for me.

@felipepessoto
Copy link
Contributor

felipepessoto commented Nov 21, 2023

Should we change the config to use bytes unit instead of MB?

@weiluo-db weiluo-db mentioned this pull request Jan 9, 2024
5 tasks
vkorukanti added a commit that referenced this pull request Jan 16, 2024
Optimized write feature was added by #2145. This PR adds the corresponding documentation for the feature.

Co-authored-by: Venki Korukanti <[email protected]>
vkorukanti added a commit to vkorukanti/delta that referenced this pull request Jan 18, 2024
(Cherry-pick of 494f2b2 to branch-3.1)

Optimized write feature was added by delta-io#2145. This PR adds the corresponding documentation for the feature.

Co-authored-by: Venki Korukanti <[email protected]>
vkorukanti added a commit that referenced this pull request Jan 18, 2024
(Cherry-pick of 494f2b2 to branch-3.1)

Optimized write feature was added by #2145. This PR adds the corresponding documentation for the feature.

Co-authored-by: Venki Korukanti <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] OPTIMIZED WRITE
6 participants