Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add row-wise filtering step to read_parquet #13334

Merged
merged 21 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
6488c5e
support row-wise filtering - with test coverage
rjzamora May 10, 2023
6932869
add post_filters
rjzamora May 10, 2023
4b6c52c
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 10, 2023
dcddab0
revise for code review
rjzamora May 12, 2023
a569982
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 12, 2023
3e8bb8f
Merge remote-tracking branch 'upstream/branch-23.06' into row-wise-fi…
rjzamora May 12, 2023
f833445
add not in support
rjzamora May 12, 2023
b76bc93
basic type hints
rjzamora May 12, 2023
81fc31f
better type hints
rjzamora May 12, 2023
5c33b86
move filter normalization
rjzamora May 12, 2023
2f9ea8f
drop operator
rjzamora May 15, 2023
e4ff979
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 15, 2023
d8f0d15
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
adc4358
add operator back
rjzamora May 16, 2023
14af140
Merge branch 'row-wise-filter-parquet' of https://github.com/rjzamora…
rjzamora May 16, 2023
9d747c7
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
2193bac
move early return
rjzamora May 16, 2023
fc8b5dd
Merge branch 'row-wise-filter-parquet' of https://github.com/rjzamora…
rjzamora May 16, 2023
fef329e
Merge remote-tracking branch 'upstream/branch-23.06' into row-wise-fi…
rjzamora May 16, 2023
1e8e811
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 16, 2023
ac21f1b
Merge branch 'branch-23.06' into row-wise-filter-parquet
rjzamora May 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,18 +569,21 @@ def read_parquet(
return _apply_post_filters(df, filters)


def _handle_in(column, value):
def _handle_in(column, value, *, negate):
if not isinstance(value, (list, set, tuple)):
bdice marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError(
"Value of 'in' filter must be a " "list, set, or tuple."
"Value of 'in' or 'not in' filter must be a list, set, or tuple."
)
return reduce(operator.or_, (operator.eq(column, v) for v in value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry!

if negate:
return reduce(operator.and_, (operator.ne(column, v) for v in value))
else:
return reduce(operator.or_, (operator.eq(column, v) for v in value))
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


def _handle_is(column, value, *, negate):
if value not in {np.nan, None}:
raise TypeError(
"Value of 'is' or 'is not' filter " "must be np.nan or None."
"Value of 'is' or 'is not' filter must be np.nan or None."
)
return ~column.isna() if negate else column.isna()

Expand All @@ -605,7 +608,8 @@ def _apply_post_filters(df, filters):
"<=": operator.le,
">": operator.gt,
">=": operator.ge,
"in": _handle_in,
"in": partial(_handle_in, negate=False),
"not in": partial(_handle_in, negate=True),
"is": partial(_handle_is, negate=False),
"is not": partial(_handle_is, negate=True),
}
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ def test_parquet_read_filtered_multiple_files(tmpdir):
([("y", "==", "c"), ("x", ">", 8)], 0),
([("y", "==", "c"), ("x", ">=", 5)], 1),
([[("y", "==", "c")], [("x", "<", 3)]], 5),
([[("x", "not in", (0, 9)), ("z", "not in", (4, 5))]], 6),
([[("y", "==", "c")], [("x", "in", (0, 9)), ("z", "in", (0, 9))]], 4),
([[("x", "==", 0)], [("x", "==", 1)], [("x", "==", 2)]], 3),
([[("x", "==", 0), ("z", "==", 9), ("y", "==", "a")]], 1),
Expand Down
35 changes: 35 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,41 @@ def test_filters(tmpdir):
assert not len(c)


@pytest.mark.parametrize("numeric", [True, False])
@pytest.mark.parametrize("null", [np.nan, None])
def test_isna_filters(tmpdir, null, numeric):

tmp_path = str(tmpdir)
df = pd.DataFrame(
{
"x": range(10),
"y": list("aabbccddee"),
"i": [0] * 4 + [np.nan] * 2 + [0] * 4,
"j": [""] * 4 + [None] * 2 + [""] * 4,
}
)
ddf = dd.from_pandas(df, npartitions=5)
assert ddf.npartitions == 5
ddf.to_parquet(tmp_path, engine="pyarrow")

# Test "is"
col = "i" if numeric else "j"
filters = [(col, "is", null)]
out = dask_cudf.read_parquet(
tmp_path, filters=filters, split_row_groups=True
)
assert len(out) == 2
assert list(out.x.compute().values) == [4, 5]

# Test "is not"
filters = [(col, "is not", null)]
out = dask_cudf.read_parquet(
tmp_path, filters=filters, split_row_groups=True
)
assert len(out) == 8
assert list(out.x.compute().values) == [0, 1, 2, 3, 6, 7, 8, 9]


def test_filters_at_row_group_level(tmpdir):

tmp_path = str(tmpdir)
Expand Down