-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Managed Iceberg] Allow updating partition specs during pipeline runtime #32879
[Managed Iceberg] Allow updating partition specs during pipeline runtime #32879
Conversation
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
I've looked into the Iceberg codebase to check for any violations of snapshot consistency. Interestingly, it seems possible to commit data files produced with an old partitioning scheme (and therefore from a previous snapshot). Could this actually be a feature of partition evolution?! 🤔 |
Yep this is what I found empirically as well. The docs mention that each partition spec's metadata is kept separate from each other. This allows Iceberg to carry out a separate query plan for each partition spec, then combine all files afterwards. Old data written with the old partition spec doesn't get affected. What I'm seeing is new data written with the old spec is also not affected. This is convenient for streaming writes because after updating the table's partition spec, there may be some inflight data that is still using the old partition spec. Iceberg will still accept that data when it gets committed and manage the metadata accordingly. |
Yep, makes sense! 👍 Example Manifest list: {
"manifest_path": "gs://<bucket>/<schema>/<table>/metadata/c4aa6dc6-5e0b-454c-b126-1d97d2f223f8-m1.avro",
"manifest_length": 8363467,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 4814,
"min_sequence_number": 4093,
"added_snapshot_id": 481579842725173797,
"added_data_files_count": 0,
"existing_data_files_count": 24356,
"deleted_data_files_count": 0,
"added_rows_count": 0,
"existing_rows_count": 785677,
"deleted_rows_count": 0,
"partitions": {
"array": []
}
}
{
"manifest_path": "gs://<bucket>/<schema>/<table>//metadata/9753c692-b48e-49bb-9c27-713c8a41842e-m1.avro",
"manifest_length": 8372685,
"partition_spec_id": 0,
"content": 0,
"sequence_number": 4093,
"min_sequence_number": 3557,
"added_snapshot_id": 1992827298894205835,
"added_data_files_count": 0,
"existing_data_files_count": 19878,
"deleted_data_files_count": 0,
"added_rows_count": 0,
"existing_rows_count": 1263244,
"deleted_rows_count": 0,
"partitions": {
"array": []
}
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@DanielMorales9 sorry but I did some experimenting and found it needed some more work, namely what was missing:
I fixed this up in the previous commit baba789, can you take another look? |
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
Show resolved
Hide resolved
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
Show resolved
Hide resolved
// To handle this, we create a manifest file for each partition spec, and group data files | ||
// accordingly. | ||
// Afterward, we append all manifests using a single commit operation. | ||
private void appendManifestFiles(Table table, Iterable<FileWriteResult> fileWriteResults) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a long method 😲
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
Show resolved
Hide resolved
@DanielMorales9 any more thoughts on this one? |
Nope, LGTM 👍 |
Users may update the table's spec while a write pipeline is running (e.g. streaming). Sometimes, this update can happen after serializing DataFiles. The partition spec is not serializable so we don't preserve it, but we do keep its ID. We can use this information to fetch the correct partition spec and recreate the DataFile before appending to the table.
Fixes #32862