Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row-wise filtering step to read_parquet #13334

Merged
merged 21 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6488c5e
support row-wise filtering - with test coverage
rjzamora May 10, 2023
6932869
add post_filters
rjzamora May 10, 2023
4b6c52c
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 10, 2023
dcddab0
revise for code review
rjzamora May 12, 2023
a569982
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 12, 2023
3e8bb8f
Merge remote-tracking branch 'upstream/branch-23.06' into row-wise-fi…
rjzamora May 12, 2023
f833445
add not in support
rjzamora May 12, 2023
b76bc93
basic type hints
rjzamora May 12, 2023
81fc31f
better type hints
rjzamora May 12, 2023
5c33b86
move filter normalization
rjzamora May 12, 2023
2f9ea8f
drop operator
rjzamora May 15, 2023
e4ff979
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 15, 2023
d8f0d15
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
adc4358
add operator back
rjzamora May 16, 2023
14af140
Merge branch 'row-wise-filter-parquet' of https://github.com/rjzamora…
rjzamora May 16, 2023
9d747c7
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
2193bac
move early return
rjzamora May 16, 2023
fc8b5dd
Merge branch 'row-wise-filter-parquet' of https://github.com/rjzamora…
rjzamora May 16, 2023
fef329e
Merge remote-tracking branch 'upstream/branch-23.06' into row-wise-fi…
rjzamora May 16, 2023
1e8e811
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
ac21f1b
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.

import math
import operator
import shutil
import tempfile
import warnings
Expand All @@ -9,6 +10,7 @@
from typing import Dict, List, Optional, Tuple
from uuid import uuid4

import numpy as np
import pandas as pd
from pyarrow import dataset as ds, parquet as pq

Expand Down Expand Up @@ -439,6 +441,7 @@ def read_parquet(
open_file_options=None,
bytes_per_thread=None,
dataset_kwargs=None,
post_filters=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -547,7 +550,8 @@ def read_parquet(
"for full CPU-based filtering functionality."
)

return _parquet_to_frame(
# Convert parquet data to a cudf.DataFrame
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
Expand All @@ -561,6 +565,118 @@ def read_parquet(
**kwargs,
)

# Can re-set the index before returning if we filter
# out rows from a DataFrame with a default RangeIndex
# (to reduce memory usage)
post_filters = post_filters or filters
wence- marked this conversation as resolved.
Show resolved Hide resolved
reset_index = post_filters and (
isinstance(df.index, cudf.RangeIndex)
and df.index.name is None
and df.index.start == 0
and df.index.step == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the thought that if the index is not a default rangeindex then it must be intentional and so we must not reset it? How are we to know that the "default" rangeindex is not intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong feeling about how we handle the default index. I just know that pandas/pyarrow avoids carrying around a filtered index if/when the original DataFrame would have had a default index anyway.

)

# Apply post_filters (if any are defined)
df = _apply_post_filters(df, post_filters)

# Return final cudf.DataFrame
return df.reset_index(drop=True) if reset_index else df


def _apply_post_filters(df, filters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I propose a rewrite like this, which I think separates the handler logic from the conjunction/disjunction a little more clearly? WDYT?

(Probably needs some minor modifications for py 3.9 compat (with the type-annotations)).

from functools import reduce, partial
import operator
import numpy as np


def handle_in(column, value):
    if not isinstance(value, (list, set, tuple)):
        raise TypeError("Value of 'in' filter must be a " "list, set, or tuple.")
    return reduce(operator.or_, (operator.eq(column, value) for v in value))


def handle_is(column, value, *, negate):
    if value not in {np.nan, None}:
        raise TypeError("Value of 'is' or 'is not' filter " "must be np.nan or None.")
    return ~column.isna() if negate else column.isna()


def _apply_post_filters(df, filters: list[tuple | list[tuple]]):
    # Apply DNF filters to an in-memory DataFrame
    #
    # Disjunctive normal form (DNF) means that the inner-most
    # tuple describes a single column predicate. These inner
    # predicates are combined with an AND conjunction into a
    # larger predicate. The outer-most list then combines all
    # of the combined filters with an OR disjunction.

    if not filters:
        # No filters to apply
        return df

    handlers = {
        "==": operator.eq,
        "!=": operator.ne,
        "<": operator.lt,
        "<=": operator.le,
        ">": operator.gt,
        ">=": operator.ge,
        "in": handle_in,
        "is": partial(handle_is, negate=False),
        "is not": partial(handle_is, negate=True),
    }

    try:
        # Disjunction loop
        #
        # All elements of `disjunctions` shall be combined with
        # an `OR` disjunction (operator.or_)
        disjunctions = []
        expressions = [f if isinstance(f, list) else [f] for f in filters]
        for expr in expressions:
            conjunction = reduce(
                operator.and_,
                (handlers[op](df[column], value) for (column, op, value) in expr),
            )
            disjunctions.append(conjunction)

        return reduce(operator.or_, disjunctions)
    except (KeyError, TypeError):
        warnings.warn(...)
        return df

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reduction over the disjunctions could be merged in (so no looping) but I think it's a little less readable (would be something like):

        return reduce(
            operator.or_,
            (
                reduce(
                    operator.and_,
                    (handlers[op](df[column], value) for (column, op, value) in expr),
                )
                for expr in expressions
            ),
        )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I revised the code to look more like your example (there were some minor bugs, and I could't quite get the type annotations right yet - so left them out). Wouldn't mind going with the loop-free code, but didn't get a chance to try it yet.

# Apply DNF filters to an in-memory DataFrame
#
# Disjunctive normal form (DNF) means that the inner-most
# tuple describes a single column predicate. These inner
# predicates are combined with an AND conjunction into a
# larger predicate. The outer-most list then combines all
# of the combined filters with an OR disjunction.
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

if not filters:
# No filters to apply
return df

_comparisons = {
"==": operator.eq,
"!=": operator.ne,
"<": operator.lt,
"<=": operator.le,
">": operator.gt,
">=": operator.ge,
}

try:
# Disjunction loop
#
# All elements of `disjunctions` shall be combined with
# an `OR` disjunction (operator.or_)
disjunctions = []
other = filters.copy()
while other:

# Conjunction loop
#
# All elements of `conjunctions` shall be combined with
# an `AND` conjunction (operator.and_)
conjunctions = []
comparisons, *other = (
other if isinstance(other[0], list) else [other]
)
for (column, op, value) in comparisons:

# Inner comparison loop
#
# `op` is expected to be the string representation
# of a comparison operator (e.g. "==")
if op == "in":
# Special case: "in"
if not isinstance(value, (list, set, tuple)):
raise TypeError(
"Value of 'in' filter must be a "
"list, set, or tuple."
)
if len(value) == 1:
conjunctions.append(operator.eq(df[column], value[0]))
else:
conjunctions.append(
operator.or_(
*[operator.eq(df[column], v) for v in value]
)
)
elif op in ("is", "is not"):
# Special case: "is" or "is not"
if value not in (np.nan, None):
raise TypeError(
"Value of 'is' or 'is not' filter "
"must be np.nan or None."
)
conjunctions.append(
df[column].isna() if op == "is" else ~df[column].isna()
)
else:
# Conventional comparison operator
conjunctions.append(_comparisons[op](df[column], value))

disjunctions.append(
operator.and_(*conjunctions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and_ has type bool -> bool -> bool so this only works if you have exactly two conjunctions.

You want (handles length-1 conjunctions as well):

disjunctions.append(functools.reduce(operator.and_, conjunctions))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right - I did completely forget that operator.and_/or_ are binops

if len(conjunctions) > 1
else conjunctions[0]
)

return df[
operator.or_(*disjunctions)
if len(disjunctions) > 1
else disjunctions[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again here, wants to be functools.reduce(operator.or_, disjunctions)

]

except (KeyError, TypeError):

# Unsupported op or value
warnings.warn(
f"Row-wise filtering failed in read_parquet for {filters}"
)
return df


@_cudf_nvtx_annotate
def _parquet_to_frame(
Expand Down
68 changes: 43 additions & 25 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,7 @@ def test_parquet_read_filtered_multiple_files(tmpdir):
)
assert_eq(
filtered_df,
cudf.DataFrame(
{"x": [2, 3, 2, 3], "y": list("bbcc")}, index=[2, 3, 2, 3]
),
cudf.DataFrame({"x": [2, 2], "y": list("bc")}, index=[2, 2]),
)


Expand All @@ -541,13 +539,13 @@ def test_parquet_read_filtered_multiple_files(tmpdir):
@pytest.mark.parametrize(
"predicate,expected_len",
[
([[("x", "==", 0)], [("z", "==", 0)]], 4),
([("x", "==", 0), ("z", "==", 0)], 0),
([("x", "==", 0), ("z", "!=", 0)], 2),
([[("x", "==", 0)], [("z", "==", 0)]], 2),
([("x", "==", 0), ("z", "==", 0)], 0),
([("x", "==", 0), ("z", "!=", 0)], 1),
([("y", "==", "c"), ("x", ">", 8)], 0),
([("y", "==", "c"), ("x", ">=", 5)], 2),
([[("y", "==", "c")], [("x", "<", 3)]], 6),
([("y", "==", "c"), ("x", ">=", 5)], 1),
([[("y", "==", "c")], [("x", "<", 3)]], 5),
([[("y", "==", "c")], [("x", "in", (0, 9))]], 4),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some tests with more than two predicates being anded together (and being ored together)?

],
)
def test_parquet_read_filtered_complex_predicate(
Expand All @@ -556,7 +554,13 @@ def test_parquet_read_filtered_complex_predicate(
# Generate data
fname = tmpdir.join("filtered_complex_predicate.parquet")
df = pd.DataFrame(
{"x": range(10), "y": list("aabbccddee"), "z": reversed(range(10))}
{
"x": range(10),
"y": list("aabbccddee"),
"z": reversed(range(10)),
"j": [0] * 4 + [np.nan] * 2 + [0] * 4,
"k": [""] * 4 + [None] * 2 + [""] * 4,
}
)
df.to_parquet(fname, row_group_size=2)

Expand All @@ -566,6 +570,30 @@ def test_parquet_read_filtered_complex_predicate(
assert_eq(len(df_filtered), expected_len)


@pytest.mark.parametrize(
"predicate,expected_len",
[
([[("j", "is not", np.nan)], [("i", "<", 3)]], 8),
([("k", "is", None)], 2),
],
)
def test_parquet_post_filters(tmpdir, predicate, expected_len):
# Check that "is" and "is not" are supported
# as `post_filters` (even though they are not
# supported by pyarrow)
fname = tmpdir.join("post_filters.parquet")
df = pd.DataFrame(
{
"i": range(10),
"j": [0] * 4 + [np.nan] * 2 + [0] * 4,
"k": [""] * 4 + [None] * 2 + [""] * 4,
}
)
df.to_parquet(fname, row_group_size=2)
df_filtered = cudf.read_parquet(fname, post_filters=predicate)
assert_eq(len(df_filtered), expected_len)


@pytest.mark.parametrize("row_group_size", [1, 5, 100])
def test_parquet_read_row_groups(tmpdir, pdf, row_group_size):
if len(pdf) > 100:
Expand Down Expand Up @@ -1954,26 +1982,16 @@ def test_read_parquet_partitioned_filtered(
assert got.dtypes["c"] == "int"
assert_eq(expect, got)

# Filter on non-partitioned column.
# Cannot compare to pandas, since the pyarrow
# backend will filter by row (and cudf can
# only filter by column, for now)
# Filter on non-partitioned column
filters = [("a", "==", 10)]
got = cudf.read_parquet(
read_path,
filters=filters,
row_groups=row_groups,
)
assert len(got) < len(df) and 10 in got["a"]
got = cudf.read_parquet(read_path, filters=filters)
expect = pd.read_parquet(read_path, filters=filters)

# Filter on both kinds of columns
filters = [[("a", "==", 10)], [("c", "==", 1)]]
got = cudf.read_parquet(
read_path,
filters=filters,
row_groups=row_groups,
)
assert len(got) < len(df) and (1 in got["c"] and 10 in got["a"])
got = cudf.read_parquet(read_path, filters=filters)
expect = pd.read_parquet(read_path, filters=filters)
assert_eq(expect, got)


def test_parquet_writer_chunked_metadata(tmpdir, simple_pdf, simple_gdf):
Expand Down
6 changes: 5 additions & 1 deletion python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
For other URLs (e.g. starting with "s3://", and "gcs://") the key-value
pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and
``urllib`` for more details.
filters : list of tuple, list of lists of tuples default None
filters : list of tuple, list of lists of tuples, default None
If not None, specifies a filter predicate used to filter out row groups
using statistics stored for each row group as Parquet metadata. Row groups
that do not match the given filter predicate are not read. The
Expand All @@ -161,6 +161,10 @@
as a list of tuples. This form is interpreted as a single conjunction.
To express OR in predicates, one must use the (preferred) notation of
list of lists of tuples.
post_filters : list of tuple, list of lists of tuples, default None
Row-wise filters to be applied to the in-memory `DataFrame` after IO
is performed. If `None` (the default), `post_filters` will be set equal
to the value of `filters`.
row_groups : int, or list, or a list of lists default None
If not None, specifies, for each input file, which row groups to read.
If reading multiple inputs, a list of lists should be passed, one list
Expand Down
6 changes: 6 additions & 0 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def _read_paths(
fs,
columns=None,
row_groups=None,
filters=None,
strings_to_categorical=None,
partitions=None,
partitioning=None,
Expand Down Expand Up @@ -102,6 +103,7 @@ def _read_paths(
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
post_filters=filters,
strings_to_categorical=strings_to_categorical,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
Expand All @@ -120,6 +122,7 @@ def _read_paths(
row_groups=row_groups[i]
if row_groups
else None,
post_filters=filters,
strings_to_categorical=strings_to_categorical,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
Expand Down Expand Up @@ -180,6 +183,7 @@ def read_partition(
index,
categories=(),
partitions=(),
filters=None,
partitioning=None,
schema=None,
open_file_options=None,
Expand Down Expand Up @@ -252,6 +256,7 @@ def read_partition(
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
filters=filters,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
Expand All @@ -278,6 +283,7 @@ def read_partition(
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
filters=filters,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
Expand Down
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_filters_at_row_group_level(tmpdir):
tmp_path, filters=[("x", "==", 1)], split_row_groups=True
)
assert a.npartitions == 1
assert (a.shape[0] == 2).compute()
assert (a.shape[0] == 1).compute()

ddf.to_parquet(tmp_path, engine="pyarrow", row_group_size=1)

Expand Down