-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Managed Iceberg] Support partitioning time types (year, month, day, hour) #32939
[Managed Iceberg] Support partitioning time types (year, month, day, hour) #32939
Conversation
@DanielMorales9 can you take a look at this one too? |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
I think a more scalable approach here would be to encapsulate the writing logic within a Parquet writer class. This would be similar to how Spark or Flink handle Parquet writes (e.g., i.e. SparkParquetWriter, FlinkParquetWriter), allowing you to manage the type conversions and partitioning requirements specific to Iceberg in a centralized and reusable way. |
We have a relatively thin RecordWriter wrapper that uses Parquet and Avro writers. A RecordWriter is blind to its data file's partition key and spec. There's one RecordWriter for each destination-partition pair, and RecordWriterManager takes care of routing records to the correct destination and partition. If it helps, we can certainly move the new partition logic in this PR to RecordWriterManager. I can see that it belongs there more than in utils. |
hey @DanielMorales9, would you like to take a look? the next Beam release is getting cut next week if we wanna get this in before then. |
Hey @ahmedabu98, I am a bit concerned with the additional overhead you are introducing by recreating all records just to fit the types expected by the Iceberg partitioning logic. Imo, it should be done in a single place rather than scattered everywhere (such as in the timestamp issue we discussed some time ago). |
Agreed it's not the most ideal. I expected all these conversions to be taken care of behind the scenes by Iceberg's partitioning logic, but looks like for time types we need to it on our side instead.
We're not really recreating the records (i.e. not doing a fully copy). We're just creating an empty record and filling in the fields that we're partitioning on (ref). I found this to be the minimal implementation needed to make it work. In the average case, we should expect only a few fields to be populated, not the full record. I'm open to suggestions though! hope I'm missing something |
…erg_time_partitioning
@DanielMorales9 others are beginning to ask for this feature as well. I'll try to find other reviewers in case you don't have time to look at this |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
…erg_time_partitioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This more generally adds the partition fields functionality, right? It isn't just adding date/time support. Or am I misunderstanding?
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
Outdated
Show resolved
Hide resolved
Partitioning support was added in #32102 and already works with most fields (e.g. integer, float, string, etc.), but further handling is required for some time types. I decided to make most of this handling uniform for all types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from my end
Fixes #32865