diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 9ff4db2822b..0dec8e1c67f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,6 +1,7 @@ # Copyright (c) 2019-2023, NVIDIA CORPORATION. from __future__ import annotations +import itertools import math import operator import shutil @@ -558,6 +559,18 @@ def read_parquet( "for full CPU-based filtering functionality." ) + # Make sure we read in the columns needed for row-wise + # filtering after IO. This means that one or more columns + # will be dropped almost immediately after IO. However, + # we do NEED these columns for accurate filtering. + projected_columns = None + if columns and filters: + projected_columns = columns + columns = sorted( + set(v[0] for v in itertools.chain.from_iterable(filters)) + | set(columns) + ) + # Convert parquet data to a cudf.DataFrame df = _parquet_to_frame( filepaths_or_buffers, @@ -574,7 +587,15 @@ def read_parquet( ) # Apply filters row-wise (if any are defined), and return - return _apply_post_filters(df, filters) + df = _apply_post_filters(df, filters) + if projected_columns: + # Elements of `projected_columns` may now be in the index. + # We must filter these names from our projection + projected_columns = [ + col for col in projected_columns if col in df._column_names + ] + return df[projected_columns] + return df def _normalize_filters(filters: list | None) -> List[List[tuple]] | None: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 3a35a0088ff..cdece1397c3 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2793,3 +2793,30 @@ def test_parquet_writer_schema_nullability(data, force_nullable_schema): assert pa.parquet.read_schema(file_obj).field(0).nullable == ( force_nullable_schema or df.isnull().any().any() ) + + +def test_parquet_read_filter_and_project(): + # Filter on columns that are not included + # in the current column projection + + with BytesIO() as buffer: + # Write parquet data + df = cudf.DataFrame( + { + "a": [1, 2, 3, 4, 5] * 10, + "b": [0, 1, 2, 3, 4] * 10, + "c": range(50), + "d": [6, 7] * 25, + "e": [8, 9] * 25, + } + ) + df.to_parquet(buffer) + + # Read back with filter and projection + columns = ["b"] + filters = [[("a", "==", 5), ("c", ">", 20)]] + got = cudf.read_parquet(buffer, columns=columns, filters=filters) + + # Check result + expected = df[(df.a == 5) & (df.c > 20)][columns].reset_index(drop=True) + assert_eq(got, expected)