-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add metadata tables for data_files
and delete_files
#1066
Conversation
pyiceberg/table/__init__.py
Outdated
@@ -4365,6 +4365,10 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType: | |||
for manifest_list in snapshot.manifests(io): | |||
for manifest_entry in manifest_list.fetch_manifest_entry(io): | |||
data_file = manifest_entry.data_file | |||
if file_content_type == "data" and data_file.content != DataFileContent.DATA: | |||
continue | |||
if file_content_type == "delete" and data_file.content == DataFileContent.DATA: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about moving the file_content_type options to constants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated signature of _files() to use DataFileContent
Enum, had to use a list, as there are two types in case of delete files.
pyiceberg/table/__init__.py
Outdated
"nan_value_counts": dict(data_file.nan_value_counts), | ||
"lower_bounds": dict(data_file.lower_bounds), | ||
"upper_bounds": dict(data_file.upper_bounds), | ||
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that using dict(data_file.column_sizes or {})
will achieve the same behavior without the need for an if statement. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: push this logic to the line above
column_sizes = data_file.column_sizes or {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also i wonder if the null/None behavior is due to the .toPandas()
conversion when queried through Spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a few comments
pyiceberg/table/__init__.py
Outdated
"nan_value_counts": dict(data_file.nan_value_counts), | ||
"lower_bounds": dict(data_file.lower_bounds), | ||
"upper_bounds": dict(data_file.upper_bounds), | ||
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: push this logic to the line above
column_sizes = data_file.column_sizes or {}
pyiceberg/table/__init__.py
Outdated
"nan_value_counts": dict(data_file.nan_value_counts), | ||
"lower_bounds": dict(data_file.lower_bounds), | ||
"upper_bounds": dict(data_file.upper_bounds), | ||
"column_sizes": dict(data_file.column_sizes) if data_file.column_sizes is not None else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also i wonder if the null/None behavior is due to the .toPandas()
conversion when queried through Spark
# configure table properties | ||
if format_version == 2: | ||
with tbl.transaction() as txn: | ||
txn.set_properties({"write.delete.mode": "merge-on-read"}) | ||
spark.sql(f"DELETE FROM {identifier} WHERE int = 1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this to produce delete files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About this: #1066 (comment)
I have checked in Spark shell as well, the values are null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know if this produces positional deletes, equality deletes, or both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Spark only produces positional delete files. Flink might produce equality delete files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, good point. I can't find any reference with Spark and equality delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark streaming might produce equality deletes, I tried with Flink, created an Iceberg table and upserted some data into it, observed that there were both positional and equality deletes files, which looked weird to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was a nit comment btw, not blocking. we don't necessary have to test for equality delete files here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, other than this, other comments are resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark only produces positional delete files. Flink might produce equality delete files.
That's my understanding as well. I think just testing for positional deletes is okay for now, and maybe we can think of ways in adding equality deletes in our test suite in the future.
b54687e
to
0a7db61
Compare
@kevinjqliu could you execute the workflow for this? I've merged this PR into refactored init.py. Also wanted to check if integration tests are failing here. |
thanks again @soumya-ghosh. I'll wait for another approval before merging. |
Thanks for making this contribution @soumya-ghosh ! And thank you @kevinjqliu and @ndrluis for the detailed reviews! |
* Add metadata tables for data_files and delete_files * Update API docs for `data_files` and `delete_files` * Update mehtod signature of `_files()` * Migrate implementation of files() table from __init__.py
* Add metadata tables for data_files and delete_files * Update API docs for `data_files` and `delete_files` * Update mehtod signature of `_files()` * Migrate implementation of files() table from __init__.py
Implements metadata tables for
data_files
anddelete_files
- #1053Have reused the logic of
files
to derivedata_files
anddelete_files
.Also reused the test cases for files (
test_inspect_files
) as schema is same asfiles
.