-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug Fix: Position Deletes + row_filter yields less data when the DataFile is large #1141
Conversation
This is specifically about this piece of code?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for taking the time to find this bug.
pyiceberg/io/pyarrow.py
Outdated
else: | ||
output_batches = iter([batch]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant with output_batches
already initialized to the same value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is unfortunately different because of line 1245:
batch = batch.take(indices)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, didnt see that.
Do you mind adding a comment here to mention the possible batch
mutation? I feel like this might be an easy footgun later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the variable assignment to L1246 for readability
pyiceberg/io/pyarrow.py
Outdated
if positional_deletes: | ||
# Create the mask of indices that we're interested in | ||
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) | ||
batch = batch.take(indices) | ||
|
||
# Apply the user filter | ||
if pyarrow_filter is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a little unrelated to this PR, but I wonder if there's ever a case where we need to filter (with pyarrow_filter
) but positional_deletes
is None/empty/falsey.
I can see that as a potential issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question! That's already handled here:
iceberg-python/pyiceberg/io/pyarrow.py
Lines 1221 to 1234 in 0dc5408
fragment_scanner = ds.Scanner.from_fragment( | |
fragment=fragment, | |
# With PyArrow 16.0.0 there is an issue with casting record-batches: | |
# https://github.com/apache/arrow/issues/41884 | |
# https://github.com/apache/arrow/issues/43183 | |
# Would be good to remove this later on | |
schema=_pyarrow_schema_ensure_large_types(physical_schema) | |
if use_large_types | |
else (_pyarrow_schema_ensure_small_types(physical_schema)), | |
# This will push down the query to Arrow. | |
# But in case there are positional deletes, we have to apply them first | |
filter=pyarrow_filter if not positional_deletes else None, | |
columns=[col.name for col in file_project_schema.columns], | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
So if there are positional_deletes, apply filter manually to the pyarrow table.
If there are no positional_deletes, pushdown the filter to scanner.
arrow 17 should make this a lot cleaner
output_batches = arrow_table.to_batches() | ||
else: | ||
output_batches = iter([batch]) | ||
for output_batch in output_batches: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned about all the nested for-loops in this function. But correctness comes first and we can always refactor later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are not alone 😨
I think the long term solution is to upgrade our minimum requirement for PyArrow to 17.0.0, but as you said, I feel that we should still have a fix that works with the lower versions while we handle the longer process of discussing the version bump, and actually enforcing it slowly and together with the community
Yes, that's right - it was so hard to understand this behavior and test it until we found users with large enough data to actually run into the issues In retrospect, I think assuming that it will always be length of 1 is erroneous, but it wasn't easy to conceive that a RecordBatch would yield more RecordBatches in a zero-copy roundtrip conversion to Arrow Table and back |
if positional_deletes: | ||
# Create the mask of indices that we're interested in | ||
indices = _combine_positional_deletes(positional_deletes, current_index, current_index + len(batch)) | ||
batch = batch.take(indices) | ||
output_batches = iter([batch]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevinjqliu moved this assignment here for readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, ty!
…File is large (apache#1141) * add more tests for position deletes * multiple append and deletes * test * fix * adopt nit
…File is large (apache#1141) * add more tests for position deletes * multiple append and deletes * test * fix * adopt nit
Fixes: #1132
Culprit was the awkward zero-copy pass back and forth between RecordBatch and Table to filter by pyarrow_filter.
iceberg-python/pyiceberg/io/pyarrow.py
Lines 1246 to 1254 in 0dc5408
As noted, this is necessary because passing an expression filter to a RecordBatch hasn't yet been exposed (it is now in 17.0.0, but that would raise the lower limit of PyArrow dependency that much higher).
In cases where a single RecordBatch -> a Table -> multiple RecordBatches because of how Arrow automatically chunks a Table into multiple RecordBatches, we would lose the remaining RecordBatches in the returned output. Essentially, there's no guarantee that a single RecordBatch will remain a single one in a round trip conversion to Arrow Table, and back.
The new test covers a case where the amount of data within a DataFile is large enough for the arrow Table yields multiple RecordBatch