Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bin packing optimization #607

Merged
merged 29 commits into from
Jun 2, 2022
Merged

Bin packing optimization #607

merged 29 commits into from
Jun 2, 2022

Conversation

Blajda
Copy link
Collaborator

@Blajda Blajda commented May 16, 2022

Description

An optimization implementation provided by Databricks is bin-packing which coalesces small files into a larger file. This reduces the number of calls to the underlying storage and results in faster table reads.

This is a high level description of the process. A user can provide a filter for which partitions they want to optimize. Active add actions for those partitions are obtain and then placed into bins labeled by the partition. Once actions are in their respective bin, start building additional bins which consists of a list files to be merged together. Files with a size larger than delta.targetFileSize or bins with only a single file are not optimized.

Each bin is then processed and the smaller files are written to a larger one. Corresponding Add and Remove actions are created. Metrics on how many files were considered and total file size are also captured.

Related Issue(s)

Issues

  1. Currently the writer's provided by delta-rs write partition information to the parquet file. This differs from the Databricks implementation which does not. This causes a schema mismatch to occur when packing files from different writers. This is currently handled by dropping partition columns when rewriting.

  2. Similar to (1.), schema evolution allows the addition of new columns, converting NullTypes -> any other type, and upcasts. Currently this is not handled and will cause the a Schema mismatch error. Looking for some guidance on how these scenarios should be handled.

Todo

  • Add file statistics using new writers
  • Implement partition column drop on partition write
  • Determine criteria for when optimize should fail
  • idempotent tests

@Blajda
Copy link
Collaborator Author

Blajda commented May 16, 2022

Hi @houqp, let me know if there's anything additional you would like to see in the PR and would appreciate any feedback with the issues items. I'll work on finishing this up by adding statistics on writes.

@roeap
Copy link
Collaborator

roeap commented May 16, 2022

Thanks @Blajda - this is a great update to delta-rs. Personally I think it would be good to re-use the existing writers and fix/extend them where they fall short for the optimize scenario. At least the RecordBatchWriter should handle dropping columns that are represented as partition folders within the divide_by_partitions function. There are some tests for that, but if they don't that's a bug we should definitely fix. They also handle the file naming and computing file statistics. That way we don't duplicate that logic.

Right now the writer does not handle muti-part files or desired file sizes, which might be a use case here. I.e. we could extend the writer to either eagerly flush files when the size of the in-memory writer reaches the desired size, or create a new underlying arrow writer and flush only when flush is called. In case of multiple files, these could be the multi-part files.

A downside of the current implementation is, that we always try to partition the data written into the writer, which might be a fairly expensive operation when its not needed - not sure however how it will behave, if we don't have the partition columns present, since there is nothing to partition by :).

We could try and use the PartitionWriter directly, but the logic for add actions / statistics is currently part of the higher level writer - i think.

@houqp
Copy link
Member

houqp commented May 16, 2022

very cool @Blajda ! I agree with @roeap the code reuse suggestion.

Currently the writer's provided by delta-rs write partition information to the parquet file. This differs from the Databricks implementation which does not. This causes a schema mismatch to occur when packing files from different writers. This is currently handled by dropping partition columns when rewriting.

Ideally, we should also not write out partition columns into the actual parquet file. I think this is an oversight from our end. User of delta-rs should be responsible for populating the partition columns. We could track it as a follow up clean up task.

Similar to (1.), schema evolution allows the addition of new columns, converting NullTypes -> any other type, and upcasts. Currently this is not handled and will cause the a Schema mismatch error. Looking for some guidance on how these scenarios should be handled.

As far as I know, there is no easy way around this other than we need to manually compare the table schema with the schema we got after reading from the parquet file and backfill new columns in the arrow record batches before writing them out to the new parquet file.

@roeap
Copy link
Collaborator

roeap commented May 16, 2022

Just had a quick look into the RecordbatchWriter. To me it seems we do make sure the partition columns are not part of the data written out to storage.

/// Returns the arrow schema representation of the partitioned files written to table
pub fn partition_arrow_schema(&self) -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(
self.arrow_schema_ref
.fields()
.iter()
.filter(|f| !self.partition_columns.contains(f.name()))
.map(|f| f.to_owned())
.collect::<Vec<_>>(),
))
}

We use that partitioned schema to initialize the PartitonWriter and validate the Schema of data written into the PartitionWriter. Then again, I might have missed something, or there could be a bug ...#

While we currently cannot fully handle schema evolution, in case of added columns, it is permissible to omit these from the written data, rather then filling them. Of course this only makes sense if the data is missing from all files being binned together. In other cases back-filling with null seems like the only way to go...

https://github.com/delta-io/delta/blob/7103115962ab795272d9a259b0c069c277777939/PROTOCOL.md?plain=1#L477-L480

@Blajda
Copy link
Collaborator Author

Blajda commented May 17, 2022

Hi @roeap,
I originally wrote this at the start of April with the original JSON writer and I haven't tested that scenario with the new writers. Glad to hear it was resolved and sorry about the confusion. The protocol link was helpful, I was trying determine if default values are supported when adding columns and this confirms that null for back-filling is acceptable.

I want to avoid the usage of divide_by_partition_values since the sort is not required in this case but I do agree there are utilities such as next_data_path which can be reused.

Next steps for me is factoring out the drop partition columns functionality and creating some utility for back-filling.

@roeap
Copy link
Collaborator

roeap commented May 17, 2022

@Blajda - what do you think about adding a write_partition method to the Writer trait and essentially carving out the inner loop over partitions in the current write method? It would then be great to have the writer aware of the desired file size as well, since right not it is not. In the case of optimize you do have more prior knowledge about how to bin the files, that the optimize command can already leverage - but the general logic would be a great addition to the writer.

That way you get the create add (foremost stats) for free. More importantly though, as we move to support higher writer versions we will need to handle things like column alias / renames, column invariants, identity- and calculated columns etc. which I suspect will be non trivial logic. By leveraging a single writer struct I feel our lives might be a lot easier if we have a single code path that writes out data.

I know @wjones127 has been thinking a lot about our writer designs as well.

@Blajda
Copy link
Collaborator Author

Blajda commented May 18, 2022

what do you think about adding a write_partition method to the Writer trait and essentially carving out the inner loop over partitions in the current write method?

Yes that sounds like the right approach to ensure the writer's are unified. I assume the signature would look something like this.

async fn write_partition(&mut self, values: RecordBatch, partition: &?) -> Result<(), DeltaWriterError>

Now it's mostly a decision on how to represent the full partition path. We can reuse get_partition_key and have ? be a String. My only concern is that it might be bit confusing for new users. Maybe we can make a new type called PartitionPath which can be created in the same way as get_partition_key and simply encapsulates a String.

I think that abstraction would be very helpful and would also help cleanup the bin packing implementation.

In terms of tracking the desired file size, my understanding is that in-memory size doesn't exactly match on disk size due to parquet compression and RLE. But If that calculation is trivial then I'll go for it.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if write_partition() took a stream of record batches instead of a single one. Then optimize command would bin-pack to collect a set of record batch streams, and pass those streams on to write_partition() (one at a time, or maybe even in parallel). The function would then be responsible for merging the schemas. Also need to think about how the optimize metrics are passed back; maybe the write_partition() function returns metrics?

Left some other general comments as well.

rust/src/optimize.rs Show resolved Hide resolved
@@ -0,0 +1,91 @@
#[cfg(feature = "datafusion-ext")]
mod optiize {
Copy link
Collaborator

@wjones127 wjones127 May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you may want a few more tests. Here are some properties we likely want to test for:

  1. Optimize is idempotent. If you run twice, the second time it won't write any new files.
  2. Optimize bin packs. For example, with a max file size of 100MB, files of size 70MB, 100MB, 30MB will turn into two files of 100MB, regardless of the order they show up in the log. (I think this is handled, but might be nice to test.)
  3. Optimize fails if a concurrent writer overwrites the tables. It might be able to succeed if a concurrent writer appends to the table, at the very least in the case where the append happens in a different partition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the implementation to sort files to be optimized to ensure that it is idempotent. Added a couple of tests to validate that behavior too. For item 3 I think it will have to wait until we have a generalized pattern for non-append writers.

//Check for remove actions on the optimized partitions
let mut dtx = table.create_transaction(None);
dtx.add_actions(actions);
dtx.commit(None, None).await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a Optimize operation to the DeltaOperation enum, and pass it here to have it generate the respective data in the commit info?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new struct and associated tests for this.

rust/tests/optimize_test.rs Show resolved Hide resolved
@roeap
Copy link
Collaborator

roeap commented May 31, 2022

Looking good!

Left some minor questions. Reading this I realized that we should probably treat other operations - like vacuum - the same as here. I.e. having it as a separtae struct that can be executed ... But this is something for a follow up.

@Blajda
Copy link
Collaborator Author

Blajda commented Jun 2, 2022

Hi @roeap @wjones127
I've implemented the suggested changes and add additional tests. Let me know what you think! 😃

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding those concurrency tests!

I just have one question on that commit info test.

last_commit["operationParameters"]["targetSize"],
json!(2_000_000)
);
assert_eq!(last_commit["operationParameters"]["predicate"], Value::Null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should predicate equal the filter used above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should. I added an additional TODO here. There's isn't a function that obtains the String representation of PartitionFilters. Should be fairly simple to implement.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. That seems sufficient for now.

@@ -165,3 +193,33 @@ pub(crate) fn stringified_partition_value(

Ok(Some(s))
}

/// Remove any partition related fields from the schema
pub(crate) fn schema_without_partitions(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do you still need this function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Reverted and restored the original function from where it was sourced.

Copy link
Collaborator

@wjones127 wjones127 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I'll wait a few days before merging to give others a chance to provide any final feedback.

Great job on this!


let partition_value = partition_value
.as_deref()
.unwrap_or(NULL_PARTITION_VALUE_DATA_PATH);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that this crate traditionally uses the constant, but the protocol specifies a different behaviour when it comes to nulls. I opened a ticket to track this question #619 - since it was also not introduced here...

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization

Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - great work @Blajda!

@wjones127 wjones127 merged commit 92c35d8 into delta-io:main Jun 2, 2022
@Blajda
Copy link
Collaborator Author

Blajda commented Jun 2, 2022

Thanks @wjones127 and @roeap for helping carry this to completion.

@Blajda Blajda deleted the bin-optimize branch June 2, 2022 23:32
@houqp
Copy link
Member

houqp commented Jun 5, 2022

Amazing work on this @Blajda !

ion-elgreco pushed a commit that referenced this pull request Feb 14, 2024
# Description
Implement string representation for PartitionFilter and add it to
optimize command commit info for a TODO.

Following json representations of CommitInfo from delta scala project
were found:
```
"operationParameters":{"predicate":"[\"(id#480 < 50)\"]"}
"operationParameters":{"predicate":"[\"(col1#1939 = 2)\"]"}
"operationParameters":{"predicate":"[\"`col2` LIKE 'data-2-%'\"]"}
"operationParameters":{"predicate":"[\"(id#378L < 2)\"]"}
"operationParameters":{
  "predicate":"[\"(spark_catalog.delta.`/table-uuid`.id IN (0, 180, 308, 225, 756, 1007, 1503))\"]"
}
```

So I think representing predicate as a list of filters in string
representation would be logical, for example:
```
// single PartitionFilter
"operationParameters":{"predicate":"[\"date = '2022-05-22'\"]"}
// two filters
"operationParameters":{"predicate":"[\"date = '2022-05-22'\", \"country = 'US'\"]"}
```

# Related Issue(s)
implement TODO in #607
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants