From 9d0c57a64d63d52182bd1c1e930180bf62404f1a Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Thu, 1 Aug 2024 10:59:27 -0700 Subject: [PATCH] Add skiprows and nrows to parquet reader (#16214) closes #15144 Authors: - Thomas Li (https://github.com/lithomas1) - Muhammad Haseeb (https://github.com/mhaseeb123) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - Lawrence Mitchell (https://github.com/wence-) URL: https://github.com/rapidsai/cudf/pull/16214 --- python/cudf/cudf/_lib/parquet.pyx | 35 ++++++++++++----- .../cudf/cudf/_lib/pylibcudf/io/parquet.pxd | 2 +- .../cudf/cudf/_lib/pylibcudf/io/parquet.pyx | 18 ++++----- python/cudf/cudf/io/parquet.py | 23 +++++++++++ .../cudf/pylibcudf_tests/io/test_parquet.py | 2 +- python/cudf/cudf/tests/test_parquet.py | 39 +++++++++++++++++++ python/cudf/cudf/utils/ioutils.py | 10 +++++ python/cudf_polars/cudf_polars/dsl/ir.py | 2 +- 8 files changed, 110 insertions(+), 21 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index a2eed94bb3c..4a4b13b0b31 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -22,7 +22,7 @@ from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io from cudf._lib.utils import _index_level_name, generate_pandas_metadata -from libc.stdint cimport uint8_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr @@ -132,7 +132,10 @@ cdef object _process_metadata(object df, object filepaths_or_buffers, list pa_buffers, bool allow_range_index, - bool use_pandas_metadata): + bool use_pandas_metadata, + size_type nrows=-1, + int64_t skip_rows=0, + ): add_df_col_struct_names(df, child_names) index_col = None @@ -221,9 +224,13 @@ cdef object _process_metadata(object df, else: idx = cudf.Index(cudf.core.column.column_empty(0)) else: + start = range_index_meta["start"] + skip_rows + stop = range_index_meta["stop"] + if nrows != -1: + stop = start + nrows idx = cudf.RangeIndex( - start=range_index_meta['start'], - stop=range_index_meta['stop'], + start=start, + stop=stop, step=range_index_meta['step'], name=range_index_meta['name'] ) @@ -260,7 +267,9 @@ def read_parquet_chunked( row_groups=None, use_pandas_metadata=True, size_t chunk_read_limit=0, - size_t pass_read_limit=1024000000 + size_t pass_read_limit=1024000000, + size_type nrows=-1, + int64_t skip_rows=0 ): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -287,7 +296,9 @@ def read_parquet_chunked( row_groups, use_pandas_metadata, chunk_read_limit=chunk_read_limit, - pass_read_limit=pass_read_limit + pass_read_limit=pass_read_limit, + skip_rows=skip_rows, + nrows=nrows, ) tbl_w_meta = reader.read_chunk() @@ -320,13 +331,16 @@ def read_parquet_chunked( df = _process_metadata(df, column_names, child_names, per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, - allow_range_index, use_pandas_metadata) + allow_range_index, use_pandas_metadata, + nrows=nrows, skip_rows=skip_rows) return df cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, - Expression filters=None): + Expression filters=None, + size_type nrows=-1, + int64_t skip_rows=0): """ Cython function to call into libcudf API, see `read_parquet`. @@ -362,6 +376,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, filters, convert_strings_to_categories = False, use_pandas_metadata = use_pandas_metadata, + skip_rows = skip_rows, + nrows = nrows, ) df = cudf.DataFrame._from_data( @@ -371,7 +387,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, df = _process_metadata(df, tbl_w_meta.column_names(include_children=False), tbl_w_meta.child_names, tbl_w_meta.per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, - allow_range_index, use_pandas_metadata) + allow_range_index, use_pandas_metadata, + nrows=nrows, skip_rows=skip_rows) return df cpdef read_parquet_metadata(filepaths_or_buffers): diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd index 027f215fb91..93ef849b813 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd @@ -28,7 +28,7 @@ cpdef read_parquet( bool convert_strings_to_categories = *, bool use_pandas_metadata = *, int64_t skip_rows = *, - size_type num_rows = *, + size_type nrows = *, # disabled see comment in parquet.pyx for more # ReaderColumnSchema reader_column_schema = *, # DataType timestamp_type = * diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx index 96119e1b714..84a79f9565f 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx @@ -26,7 +26,7 @@ cdef parquet_reader_options _setup_parquet_reader_options( bool convert_strings_to_categories = False, bool use_pandas_metadata = True, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, # ReaderColumnSchema reader_column_schema = None, # DataType timestamp_type = DataType(type_id.EMPTY) ): @@ -40,8 +40,8 @@ cdef parquet_reader_options _setup_parquet_reader_options( ) if row_groups is not None: opts.set_row_groups(row_groups) - if num_rows != -1: - opts.set_num_rows(num_rows) + if nrows != -1: + opts.set_num_rows(nrows) if skip_rows != 0: opts.set_skip_rows(skip_rows) if columns is not None: @@ -73,7 +73,7 @@ cdef class ChunkedParquetReader: Whether to convert string columns to the category type skip_rows : int64_t, default 0 The number of rows to skip from the start of the file. - num_rows : size_type, default -1 + nrows : size_type, default -1 The number of rows to read. By default, read the entire file. chunk_read_limit : size_t, default 0 Limit on total number of bytes to be returned per read, @@ -90,7 +90,7 @@ cdef class ChunkedParquetReader: bool use_pandas_metadata=True, bool convert_strings_to_categories=False, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000 ): @@ -103,7 +103,7 @@ cdef class ChunkedParquetReader: convert_strings_to_categories=convert_strings_to_categories, use_pandas_metadata=use_pandas_metadata, skip_rows=skip_rows, - num_rows=num_rows, + nrows=nrows, ) with nogil: @@ -152,7 +152,7 @@ cpdef read_parquet( bool convert_strings_to_categories = False, bool use_pandas_metadata = True, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, # Disabled, these aren't used by cudf-python # we should only add them back in if there's user demand # ReaderColumnSchema reader_column_schema = None, @@ -178,7 +178,7 @@ cpdef read_parquet( the per-file user metadata of the ``TableWithMetadata`` skip_rows : int64_t, default 0 The number of rows to skip from the start of the file. - num_rows : size_type, default -1 + nrows : size_type, default -1 The number of rows to read. By default, read the entire file. Returns @@ -195,7 +195,7 @@ cpdef read_parquet( convert_strings_to_categories, use_pandas_metadata, skip_rows, - num_rows, + nrows, ) with nogil: diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7dab2f20100..4a419a2fbb6 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -539,6 +539,8 @@ def read_parquet( open_file_options=None, bytes_per_thread=None, dataset_kwargs=None, + nrows=None, + skip_rows=None, *args, **kwargs, ): @@ -685,6 +687,8 @@ def read_parquet( partition_keys=partition_keys, partition_categories=partition_categories, dataset_kwargs=dataset_kwargs, + nrows=nrows, + skip_rows=skip_rows, **kwargs, ) # Apply filters row-wise (if any are defined), and return @@ -813,6 +817,8 @@ def _parquet_to_frame( partition_keys=None, partition_categories=None, dataset_kwargs=None, + nrows=None, + skip_rows=None, **kwargs, ): # If this is not a partitioned read, only need @@ -820,11 +826,18 @@ def _parquet_to_frame( if not partition_keys: return _read_parquet( paths_or_buffers, + nrows=nrows, + skip_rows=skip_rows, *args, row_groups=row_groups, **kwargs, ) + if nrows is not None or skip_rows is not None: + raise NotImplementedError( + "nrows/skip_rows is not supported when reading a partitioned parquet dataset" + ) + partition_meta = None partitioning = (dataset_kwargs or {}).get("partitioning", None) if hasattr(partitioning, "schema"): @@ -912,6 +925,8 @@ def _read_parquet( columns=None, row_groups=None, use_pandas_metadata=None, + nrows=None, + skip_rows=None, *args, **kwargs, ): @@ -934,13 +949,21 @@ def _read_parquet( columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, + nrows=nrows if nrows is not None else -1, + skip_rows=skip_rows if skip_rows is not None else 0, ) else: + if nrows is None: + nrows = -1 + if skip_rows is None: + skip_rows = 0 return libparquet.read_parquet( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, + nrows=nrows, + skip_rows=skip_rows, ) else: if ( diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py index 07d2ab3d69a..dbd20cd473e 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py @@ -31,7 +31,7 @@ def test_read_parquet_basic( res = plc.io.parquet.read_parquet( plc.io.SourceInfo([source]), - num_rows=nrows, + nrows=nrows, skip_rows=skiprows, columns=columns, ) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 3806b901b10..879a2c50db7 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1978,6 +1978,25 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename): assert fn == filename +@pytest.mark.parametrize("kwargs", [{"nrows": 1}, {"skip_rows": 1}]) +def test_parquet_partitioned_notimplemented(tmpdir_factory, kwargs): + # Checks that write_to_dataset is wrapping to_parquet + # as expected + pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) + size = 100 + pdf = pd.DataFrame( + { + "a": np.arange(0, stop=size, dtype="int64"), + "b": np.random.choice(list("abcd"), size=size), + "c": np.random.choice(np.arange(4), size=size), + } + ) + pdf.to_parquet(pdf_dir, index=False, partition_cols=["b"]) + + with pytest.raises(NotImplementedError): + cudf.read_parquet(pdf_dir, **kwargs) + + @pytest.mark.parametrize("return_meta", [True, False]) def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta): pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) @@ -3768,6 +3787,26 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) +@pytest.mark.parametrize( + "nrows,skip_rows", + [ + (0, 0), + (1000, 0), + (0, 1000), + (1000, 10000), + ], +) +def test_parquet_reader_nrows_skiprows(nrows, skip_rows): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000} + ) + expected = df[skip_rows : skip_rows + nrows] + buffer = BytesIO() + df.to_parquet(buffer) + got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) + assert_eq(expected, got) + + def test_parquet_reader_pandas_compatibility(): df = pd.DataFrame( {"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000} diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 80555750b3a..448a815fe1b 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -199,6 +199,16 @@ in parallel (using a python thread pool). Default allocation is {bytes_per_thread} bytes. This parameter is functional only when `use_python_file_object=False`. +skiprows : int, default None + If not None, the number of rows to skip from the start of the file. + + .. note:: + This option is not supported when the low-memory mode is on. +nrows : int, default None + If not None, the total number of rows to read. + + .. note: + This option is not supported when the low-memory mode is on. Returns ------- diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 7f62dff4389..3754addeb11 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -321,7 +321,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: tbl_w_meta = plc.io.parquet.read_parquet( plc.io.SourceInfo(self.paths), columns=with_columns, - num_rows=nrows, + nrows=nrows, ) df = DataFrame.from_table( tbl_w_meta.tbl,