Skip to content

Commit

Permalink
Fix combined filtering and column projection in read_parquet (#13666)
Browse files Browse the repository at this point in the history
Follow-up to #13334 for the special case that the `filters` argument includes column names that are **not** included in the current column projection (i.e. the `columns` argument). Although this pattern is not a common case at the moment, it is perfectly valid, and will become more common when cudf is used as a dask-expr backend (since the predicate-pushdown optimizations in dask-expr are significantly more advanced than those in dask-dataframe).

**Note**:
Prior to #13334, the special case targeted by this PR would not have produced any run-time errors, but it also wouldn't have produced proper filtering in many cases. Now that `cudf.read_parquet` **does** produce proper row-wise filtering, it turns out that we now need to sacrifice a bit of IO in cases like this. Although this is unfortunate, I personally feel that it is still worthwhile to enforce row-wise filtering.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #13666
  • Loading branch information
rjzamora authored Jul 11, 2023
1 parent 3c51c9e commit 091de4d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
23 changes: 22 additions & 1 deletion python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
from __future__ import annotations

import itertools
import math
import operator
import shutil
Expand Down Expand Up @@ -558,6 +559,18 @@ def read_parquet(
"for full CPU-based filtering functionality."
)

# Make sure we read in the columns needed for row-wise
# filtering after IO. This means that one or more columns
# will be dropped almost immediately after IO. However,
# we do NEED these columns for accurate filtering.
projected_columns = None
if columns and filters:
projected_columns = columns
columns = sorted(
set(v[0] for v in itertools.chain.from_iterable(filters))
| set(columns)
)

# Convert parquet data to a cudf.DataFrame
df = _parquet_to_frame(
filepaths_or_buffers,
Expand All @@ -574,7 +587,15 @@ def read_parquet(
)

# Apply filters row-wise (if any are defined), and return
return _apply_post_filters(df, filters)
df = _apply_post_filters(df, filters)
if projected_columns:
# Elements of `projected_columns` may now be in the index.
# We must filter these names from our projection
projected_columns = [
col for col in projected_columns if col in df._column_names
]
return df[projected_columns]
return df


def _normalize_filters(filters: list | None) -> List[List[tuple]] | None:
Expand Down
27 changes: 27 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2793,3 +2793,30 @@ def test_parquet_writer_schema_nullability(data, force_nullable_schema):
assert pa.parquet.read_schema(file_obj).field(0).nullable == (
force_nullable_schema or df.isnull().any().any()
)


def test_parquet_read_filter_and_project():
# Filter on columns that are not included
# in the current column projection

with BytesIO() as buffer:
# Write parquet data
df = cudf.DataFrame(
{
"a": [1, 2, 3, 4, 5] * 10,
"b": [0, 1, 2, 3, 4] * 10,
"c": range(50),
"d": [6, 7] * 25,
"e": [8, 9] * 25,
}
)
df.to_parquet(buffer)

# Read back with filter and projection
columns = ["b"]
filters = [[("a", "==", 5), ("c", ">", 20)]]
got = cudf.read_parquet(buffer, columns=columns, filters=filters)

# Check result
expected = df[(df.a == 5) & (df.c > 20)][columns].reset_index(drop=True)
assert_eq(got, expected)

0 comments on commit 091de4d

Please sign in to comment.