Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row-wise filtering step to read_parquet #13334

Merged
merged 21 commits into from
May 17, 2023

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented May 10, 2023

Closes #13324

Description

This PR adds a post_filters argument to cudf.read_parquet, which is set equal to the filters argument by default. When this argument is set, the specified DNF (disjunctive normal form) filter expression will be applied to the in-memory cudf.DataFrame object after IO is performed.

The overal result of this PR is that the behavior of cudf.read_parquet becomes more consistent with that of pd.read_parquet in the sense that the default result will now enforce filters at a row-wise granularity for both libraries.

Note on the "need" for distinct filters and post_filters arguments

My hope is that post_filters will eventually go away. However, I added a distinct argument for two general reasons:

  1. PyArrow does not yet support "is" and "is not" operands in filters. Therefore, we can not pass along all filters from dask/dask_cudf down to cudf.read_parquet using the existing filters argument, because we rely on pyarrow to filter out row-groups (note that dask implements its own filter-conversion utility to avoid this problem). I'm hoping pyarrow will eventually adopt these comparison types (xref: [Python] Add support for "is" and "is not" to pyarrow.parquet.filters_to_expression apache/arrow#34504)
  2. When cudf.read_parquet is called from dask_cudf.read_parquet, row-group filtering will have already been applied. Therefore, it is convenient to specify that you only need cudf to provide the post-IO row-wise filtering step. Otherwise, we are effectively duplicating some metadata processing.

My primary concern with adding post_filters is the idea that row-wise filtering could be added at the cuio/libcudf level in the future. In that (hypothetical) case, post_filters wouldn't really be providing any value, but we would probably be able to deprecate it without much pain (if any).

Additional Context

This feature is ultimately needed to support general predicate-pushdown optimizations in Dask Expressions (dask-expr). This is because the high-level optimization logic becomes much simpler when a filter-based operation of a ReadParquet expression can be iteratively "absorbed" into the root ReadParquet expression.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

@rjzamora rjzamora added 2 - In Progress Currently a work in progress Python Affects Python cuDF API. dask Dask issue improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels May 10, 2023
@rjzamora rjzamora self-assigned this May 10, 2023
@rjzamora
Copy link
Member Author

cc @charlesbluca (for both dask-expr and dask-sql visibility)

@rjzamora rjzamora marked this pull request as ready for review May 11, 2023 13:25
@rjzamora rjzamora requested review from a team as code owners May 11, 2023 13:25
@rjzamora rjzamora added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels May 11, 2023
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think some of the core logic can be tidied up a bit, and am not sure this works correctly for more than two ANDed or ORed filters.

Comment on lines 574 to 576
and df.index.name is None
and df.index.start == 0
and df.index.step == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the thought that if the index is not a default rangeindex then it must be intentional and so we must not reset it? How are we to know that the "default" rangeindex is not intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong feeling about how we handle the default index. I just know that pandas/pyarrow avoids carrying around a filtered index if/when the original DataFrame would have had a default index anyway.

conjunctions.append(_comparisons[op](df[column], value))

disjunctions.append(
operator.and_(*conjunctions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and_ has type bool -> bool -> bool so this only works if you have exactly two conjunctions.

You want (handles length-1 conjunctions as well):

disjunctions.append(functools.reduce(operator.and_, conjunctions))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right - I did completely forget that operator.and_/or_ are binops

Comment on lines 667 to 669
operator.or_(*disjunctions)
if len(disjunctions) > 1
else disjunctions[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again here, wants to be functools.reduce(operator.or_, disjunctions)

return df.reset_index(drop=True) if reset_index else df


def _apply_post_filters(df, filters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I propose a rewrite like this, which I think separates the handler logic from the conjunction/disjunction a little more clearly? WDYT?

(Probably needs some minor modifications for py 3.9 compat (with the type-annotations)).

from functools import reduce, partial
import operator
import numpy as np


def handle_in(column, value):
    if not isinstance(value, (list, set, tuple)):
        raise TypeError("Value of 'in' filter must be a " "list, set, or tuple.")
    return reduce(operator.or_, (operator.eq(column, value) for v in value))


def handle_is(column, value, *, negate):
    if value not in {np.nan, None}:
        raise TypeError("Value of 'is' or 'is not' filter " "must be np.nan or None.")
    return ~column.isna() if negate else column.isna()


def _apply_post_filters(df, filters: list[tuple | list[tuple]]):
    # Apply DNF filters to an in-memory DataFrame
    #
    # Disjunctive normal form (DNF) means that the inner-most
    # tuple describes a single column predicate. These inner
    # predicates are combined with an AND conjunction into a
    # larger predicate. The outer-most list then combines all
    # of the combined filters with an OR disjunction.

    if not filters:
        # No filters to apply
        return df

    handlers = {
        "==": operator.eq,
        "!=": operator.ne,
        "<": operator.lt,
        "<=": operator.le,
        ">": operator.gt,
        ">=": operator.ge,
        "in": handle_in,
        "is": partial(handle_is, negate=False),
        "is not": partial(handle_is, negate=True),
    }

    try:
        # Disjunction loop
        #
        # All elements of `disjunctions` shall be combined with
        # an `OR` disjunction (operator.or_)
        disjunctions = []
        expressions = [f if isinstance(f, list) else [f] for f in filters]
        for expr in expressions:
            conjunction = reduce(
                operator.and_,
                (handlers[op](df[column], value) for (column, op, value) in expr),
            )
            disjunctions.append(conjunction)

        return reduce(operator.or_, disjunctions)
    except (KeyError, TypeError):
        warnings.warn(...)
        return df

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reduction over the disjunctions could be merged in (so no looping) but I think it's a little less readable (would be something like):

        return reduce(
            operator.or_,
            (
                reduce(
                    operator.and_,
                    (handlers[op](df[column], value) for (column, op, value) in expr),
                )
                for expr in expressions
            ),
        )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I revised the code to look more like your example (there were some minor bugs, and I could't quite get the type annotations right yet - so left them out). Wouldn't mind going with the loop-free code, but didn't get a chance to try it yet.

([[("y", "==", "c")], [("x", "<", 3)]], 6),
([("y", "==", "c"), ("x", ">=", 5)], 1),
([[("y", "==", "c")], [("x", "<", 3)]], 5),
([[("y", "==", "c")], [("x", "in", (0, 9))]], 4),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tests with more than two predicates being anded together (and being ored together)?

python/cudf/cudf/io/parquet.py Outdated Show resolved Hide resolved
@GregoryKimball
Copy link
Contributor

(Linking the discussion in #12512)

@rjzamora rjzamora changed the title Add row-wise post_filters step to read_parquet Add row-wise filtering step to read_parquet May 12, 2023
@rjzamora
Copy link
Member Author

Small update: Removed the post_filters argument in favor of calling _apply_post_filters manually in dask_cudf.

# All elements of `disjunctions` shall be combined with
# an `OR` disjunction (operator.or_)
disjunctions = []
for expr in filters if isinstance(filters[0], list) else [filters]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not fully sure this is correct handling (my suggestion might also have been wrong).

AIUI, these are possible inputs as filters:

  • (A, op, B) => (A op B)
  • [(A, op, B)] => (A op B)
  • [(A, op, B), (C, op, D)] => (A op B) v (C op D)
  • [[(A, op, B), (C, op, D)], (E, op, F)] => ((A op B) ^ (C op D)) v (E op F)
  • [[(A, op, B), (C, op, D)], [(E, op, F), (G, op H)]] => ((A op B) ^ (C op D)) v ((E op F) ^ (G op H))

So the input type is tuple | list[tuple | list[tuple]]

But this code only handles tuple | list[list[tuple]].

TBF, my code only handled list[tuple | list[tuple]].

To rephrase, who should do the sanitisation of the filters argument to this function? It would be much easier if, by the time we got here, we always just had list[list[tuple]]. That sanitisation could either be pushed up to read_parquet or else here but a little bit earlier, so we would say something like:

def _apply_filters(df, filters : tuple | list[tuple | list[tuple]]):
    if isinstance(filters, tuple): # singleton (A op B)
        filters = [filters]
    filters = [f if isinstance(f, list) else [f] for f in filters]
    ...
    for expr in filters: # Now everything is pre-processed correctly.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly commentary, but a few minor cleanups.

)
return reduce(operator.or_, (operator.eq(column, v) for v in value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry!

f"filters must be None, or non-empty List[Tuple] "
f"or List[List[Tuple]]. Got {filters}"
)
if not filters or not isinstance(filters, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easier to accept empty containers as "no filters" and normalise them to None (rather than requiring specifically None as empty filters).

So:

if filters:
   ... validate
else:
   return None

f"got {predicate}."
)

filters = filters if isinstance(filters[0], list) else [filters]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so now we definitively have a list-of-lists.


filters = filters if isinstance(filters[0], list) else [filters]
for conjunction in filters:
if not conjunction or not isinstance(conjunction, list):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so each entry must be a non-empty list.

if not conjunction or not isinstance(conjunction, list):
raise TypeError(msg)
for predicate in conjunction:
_validate_predicate(predicate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And each entry in that non-empty list must be a 3-tuple of appropriate type.

Comment on lines 596 to 597
for predicate in conjunction:
_validate_predicate(predicate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tl;dr: nothing to do here.

Could write:

if not all(isinstance(item, tuple) and len(item) == 3 for item in conjunction):
   raise TypeError("Every predicate must be ...")

But I guess then it's hard to point at the bad one, unless one abuses the walrus operator like so:

if not all(isinstance(item := pred, tuple) and len(pred) == 3 for pred in conjunctions):
   raise TypeError(... item)

Which is kind of ugly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I don't expect the number of predicates to get very large here. It seems reasonable to just call _validate_predicate in a loop.

python/cudf/cudf/io/parquet.py Outdated Show resolved Hide resolved
python/cudf/cudf/io/parquet.py Outdated Show resolved Hide resolved
python/cudf/cudf/io/parquet.py Outdated Show resolved Hide resolved
python/cudf/cudf/io/parquet.py Show resolved Hide resolved
@rjzamora
Copy link
Member Author

Thanks for the the reviews @wence- @bdice ! This should be good for a final look when you have time.

Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora. I learned a thing or two from this review! Appreciate your effort here.

python/cudf/cudf/io/parquet.py Outdated Show resolved Hide resolved
Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora !

@rjzamora rjzamora added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team labels May 17, 2023
@rjzamora
Copy link
Member Author

/merge

@shwina
Copy link
Contributor

shwina commented May 18, 2023

@rjzamora do you expect the work done here to be roughly the same for orc?

@rjzamora
Copy link
Member Author

do you expect the work done here to be roughly the same for orc?

Yes, the _apply_post_filters code added here should be independent of the IO mechanism, since it is a post-IO filtering operation. We could easily move it out of the parquet.py file and use it for ORC (and even CSV).

@shwina
Copy link
Contributor

shwina commented May 19, 2023

Thanks, I'll go ahead and open a PR to do the same for ORC then!

rapids-bot bot pushed a commit that referenced this pull request Jul 11, 2023
Follow-up to #13334 for the special case that the `filters` argument includes column names that are **not** included in the current column projection (i.e. the `columns` argument). Although this pattern is not a common case at the moment, it is perfectly valid, and will become more common when cudf is used as a dask-expr backend (since the predicate-pushdown optimizations in dask-expr are significantly more advanced than those in dask-dataframe).

**Note**:
Prior to #13334, the special case targeted by this PR would not have produced any run-time errors, but it also wouldn't have produced proper filtering in many cases. Now that `cudf.read_parquet` **does** produce proper row-wise filtering, it turns out that we now need to sacrifice a bit of IO in cases like this. Although this is unfortunate, I personally feel that it is still worthwhile to enforce row-wise filtering.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #13666
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge dask Dask issue improvement Improvement / enhancement to an existing function non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Apply row-wise filtering for filters in cudf/dask_cudf.read_parquet
5 participants