From 18d1b1e92bad92e5cd0a3ca9fabb664b568a908d Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Sun, 12 May 2024 22:25:58 +0000 Subject: [PATCH 1/8] Initial pass at cython implementation of chunked reader --- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 12 +++++ python/cudf/cudf/_lib/parquet.pyx | 68 ++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 1680eb43700..e13b532a3c1 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -278,6 +278,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: vector[string] column_chunks_file_paths, ) except + + cdef cppclass chunked_parquet_reader: + chunked_parquet_reader() except + + chunked_parquet_reader( + size_t chunk_read_limit, + const parquet_reader_options& options) except + + chunked_parquet_reader( + size_t chunk_read_limit, + size_t pass_read_limit, + const parquet_reader_options& options) except + + bool has_next() except + + cudf_io_types.table_with_metadata read_chunk() except + + cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( const vector[unique_ptr[vector[uint8_t]]]& metadata_list ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index dcfa087a1fa..c657a11395e 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -37,6 +37,7 @@ cimport cudf._lib.cpp.types as cudf_types from cudf._lib.column cimport Column from cudf._lib.cpp.expressions cimport expression from cudf._lib.cpp.io.parquet cimport ( + chunked_parquet_reader as cpp_chunked_parquet_reader, chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, @@ -754,6 +755,73 @@ cdef class ParquetWriter: self.initialized = True +cdef class ParquetReader: + cdef bool initialized + cdef unique_ptr[cpp_chunked_parquet_reader] reader + cdef cudf_io_types.source_info source + cdef table_input_metadata tbl_meta + cdef cudf_io_types.sink_info sink + cdef vector[unique_ptr[cudf_io_data_sink.data_sink]] _data_sink + cdef cudf_io_types.statistics_freq stat_freq + cdef cudf_io_types.compression_type comp_type + cdef object index + cdef size_t chunk_read_limit + cdef size_t row_group_size_bytes + cdef size_type row_group_size_rows + cdef size_t max_page_size_bytes + cdef size_type max_page_size_rows + cdef size_t max_dictionary_size + cdef cudf_io_types.dictionary_policy dict_policy + + def __cinit__(self, object filepath_or_buffer, int chunk_read_limit): + filepaths_or_buffers = ( + list(filepath_or_buffer) + if is_list_like(filepath_or_buffer) + else [filepath_or_buffer] + ) + + self.chunk_read_limit = chunk_read_limit + source = make_source_info(filepaths_or_buffers) + # Setup parquet reader arguments + cdef parquet_reader_options args + cdef parquet_reader_options_builder builder + cdef vector[vector[size_type]] cpp_row_groups + cdef bool cpp_use_pandas_metadata = True + cdef data_type cpp_timestamp_type = cudf_types.data_type( + cudf_types.type_id.EMPTY + ) + builder = ( + parquet_reader_options.builder(source) + .row_groups(cpp_row_groups) + .use_pandas_metadata(cpp_use_pandas_metadata) + .timestamp_type(cpp_timestamp_type) + ) + + args = move(builder.build()) + + with nogil: + self.reader.reset(new cpp_chunked_parquet_reader(chunk_read_limit, args)) + + def has_next(self): + cdef bool res + with nogil: + res = self.reader.get()[0].has_next() + return res + + def read_chunk(self): + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(self.reader.get()[0].read_chunk()) + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=['a', 'b'] + )) + return df + + cpdef merge_filemetadata(object filemetadata_list): """ Cython function to call into libcudf API, see `merge_row_group_metadata`. From 359be2697049201c52251775ceef671207f2a30e Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 13 May 2024 21:52:23 +0000 Subject: [PATCH 2/8] implement chunked reader --- python/cudf/cudf/_lib/parquet.pyx | 199 +++++++++++++++++++++++++++--- 1 file changed, 183 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index c657a11395e..82336c664eb 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -51,7 +51,11 @@ from cudf._lib.cpp.io.parquet_metadata cimport ( parquet_metadata, read_parquet_metadata as parquet_metadata_reader, ) -from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata +from cudf._lib.cpp.io.types cimport ( + column_in_metadata, + table_input_metadata, + table_metadata, +) from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport data_type, size_type from cudf._lib.expressions cimport Expression @@ -772,55 +776,218 @@ cdef class ParquetReader: cdef size_type max_page_size_rows cdef size_t max_dictionary_size cdef cudf_io_types.dictionary_policy dict_policy + cdef table_metadata result_meta + cdef vector[unordered_map[string, string]] per_file_user_data + cdef object pandas_meta + + def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + Expression filters=None, int chunk_read_limit=100000): + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + self.pa_buffers = pa_buffers + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + cdef bool cpp_use_pandas_metadata = use_pandas_metadata - def __cinit__(self, object filepath_or_buffer, int chunk_read_limit): - filepaths_or_buffers = ( - list(filepath_or_buffer) - if is_list_like(filepath_or_buffer) - else [filepath_or_buffer] + cdef vector[vector[size_type]] cpp_row_groups + cdef data_type cpp_timestamp_type = cudf_types.data_type( + cudf_types.type_id.EMPTY ) + if row_groups is not None: + cpp_row_groups = row_groups - self.chunk_read_limit = chunk_read_limit - source = make_source_info(filepaths_or_buffers) # Setup parquet reader arguments cdef parquet_reader_options args cdef parquet_reader_options_builder builder - cdef vector[vector[size_type]] cpp_row_groups - cdef bool cpp_use_pandas_metadata = True - cdef data_type cpp_timestamp_type = cudf_types.data_type( - cudf_types.type_id.EMPTY - ) builder = ( parquet_reader_options.builder(source) .row_groups(cpp_row_groups) .use_pandas_metadata(cpp_use_pandas_metadata) .timestamp_type(cpp_timestamp_type) ) + if filters is not None: + builder = builder.filter(dereference(filters.c_obj.get())) args = move(builder.build()) + cdef vector[string] cpp_columns + self.allow_range_index = True + if columns is not None: + cpp_columns.reserve(len(columns)) + self.allow_range_index = len(columns) > 0 + for col in columns: + cpp_columns.push_back(str(col).encode()) + args.set_columns(cpp_columns) + # Filters don't handle the range index correctly + self.allow_range_index &= filters is None + + self.chunk_read_limit = chunk_read_limit with nogil: self.reader.reset(new cpp_chunked_parquet_reader(chunk_read_limit, args)) + self.initialized = False + self.row_groups = row_groups + self.filepaths_or_buffers = filepaths_or_buffers - def has_next(self): + def _has_next(self): cdef bool res with nogil: res = self.reader.get()[0].has_next() return res - def read_chunk(self): + def _read_chunk(self): # Read Parquet cdef cudf_io_types.table_with_metadata c_result with nogil: c_result = move(self.reader.get()[0].read_chunk()) + if not self.initialized: + self.names = [info.name.decode() for info in c_result.metadata.schema_info] + self.result_meta = c_result.metadata + self.per_file_user_data = c_result.metadata.per_file_user_data + + # Access the Parquet per_file_user_data to find the index + + self.column_index_type = None + self.index_col_names = None + self.is_range_index = True + for single_file in self.per_file_user_data: + json_str = single_file[b'pandas'].decode('utf-8') + if json_str != "": + self.pandas_meta = json.loads(json_str) + file_is_range_index, self.index_col, self.column_index_type = \ + _parse_metadata(self.pandas_meta) + self.is_range_index &= file_is_range_index + + if not file_is_range_index and self.index_col is not None \ + and self.index_col_names is None: + self.index_col_names = {} + for idx_col in self.index_col: + for c in self.pandas_meta['columns']: + if c['field_name'] == idx_col: + self.index_col_names[idx_col] = c['name'] + df = cudf.DataFrame._from_data(*data_from_unique_ptr( move(c_result.tbl), - column_names=['a', 'b'] + column_names=self.names, )) + if not self.initialized: + self.initialized = True return df + def read(self): + dfs = [] + while self._has_next(): + dfs.append(self._read_chunk()) + df = cudf.concat(dfs) + update_struct_field_names(df, self.result_meta.schema_info) + if self.pandas_meta is not None: + # Book keep each column metadata as the order + # of `meta["columns"]` and `column_names` are not + # guaranteed to be deterministic and same always. + meta_data_per_column = { + col_meta['name']: col_meta for col_meta in self.pandas_meta["columns"] + } + + # update the decimal precision of each column + for col in self.names: + if isinstance(df._data[col].dtype, cudf.core.dtypes.DecimalDtype): + df._data[col].dtype.precision = ( + meta_data_per_column[col]["metadata"]["precision"] + ) + + # Set the index column + if self.index_col is not None and len(self.index_col) > 0: + if self.is_range_index: + if not self.allow_range_index: + return df + + if len(self.per_file_user_data) > 1: + range_index_meta = { + "kind": "range", + "name": None, + "start": 0, + "stop": len(df), + "step": 1 + } + else: + range_index_meta = self.index_col[0] + + if self.row_groups is not None: + per_file_metadata = [ + pa.parquet.read_metadata( + # Pyarrow cannot read directly from bytes + io.BytesIO(s) if isinstance(s, bytes) else s + ) for s in ( + self.pa_buffers or self.filepaths_or_buffers + ) + ] + + filtered_idx = [] + for i, file_meta in enumerate(per_file_metadata): + row_groups_i = [] + start = 0 + for row_group in range(file_meta.num_row_groups): + stop = start + file_meta.row_group(row_group).num_rows + row_groups_i.append((start, stop)) + start = stop + + for rg in self.row_groups[i]: + filtered_idx.append( + cudf.RangeIndex( + start=row_groups_i[rg][0], + stop=row_groups_i[rg][1], + step=range_index_meta['step'] + ) + ) + + if len(filtered_idx) > 0: + idx = cudf.concat(filtered_idx) + else: + idx = cudf.Index(cudf.core.column.column_empty(0)) + else: + idx = cudf.RangeIndex( + start=range_index_meta['start'], + stop=range_index_meta['stop'], + step=range_index_meta['step'], + name=range_index_meta['name'] + ) + + df._index = idx + elif set(self.index_col).issubset(self.names): + index_data = df[self.index_col] + actual_index_names = list(self.index_col_names.values()) + if len(index_data._data) == 1: + idx = cudf.Index( + index_data._data.columns[0], + name=actual_index_names[0] + ) + else: + idx = cudf.MultiIndex.from_frame( + index_data, + names=actual_index_names + ) + df.drop(columns=self.index_col, inplace=True) + df._index = idx + else: + if self.cpp_use_pandas_metadata: + df.index.names = self.index_col + + # Set column dtype for empty types. + if len(df._data.names) == 0 and self.column_index_type is not None: + df._data.label_dtype = cudf.dtype(self.column_index_type) + return df cpdef merge_filemetadata(object filemetadata_list): """ From e909e922686f0be0700f322b8639f2408939c673 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 14 May 2024 01:35:01 +0000 Subject: [PATCH 3/8] cleanup --- python/cudf/cudf/_lib/parquet.pyx | 32 +++++++++++++++---------------- python/cudf/cudf/io/parquet.py | 9 ++++++++- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 82336c664eb..ea9f594cf39 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -762,27 +762,25 @@ cdef class ParquetWriter: cdef class ParquetReader: cdef bool initialized cdef unique_ptr[cpp_chunked_parquet_reader] reader - cdef cudf_io_types.source_info source - cdef table_input_metadata tbl_meta - cdef cudf_io_types.sink_info sink - cdef vector[unique_ptr[cudf_io_data_sink.data_sink]] _data_sink - cdef cudf_io_types.statistics_freq stat_freq - cdef cudf_io_types.compression_type comp_type - cdef object index cdef size_t chunk_read_limit cdef size_t row_group_size_bytes - cdef size_type row_group_size_rows - cdef size_t max_page_size_bytes - cdef size_type max_page_size_rows - cdef size_t max_dictionary_size - cdef cudf_io_types.dictionary_policy dict_policy cdef table_metadata result_meta cdef vector[unordered_map[string, string]] per_file_user_data cdef object pandas_meta + cdef list pa_buffers + cdef bool allow_range_index + cdef object row_groups + cdef object filepaths_or_buffers + cdef object names + cdef object column_index_type + cdef object index_col_names + cdef bool is_range_index + cdef object index_col + cdef bool cpp_use_pandas_metadata def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, - Expression filters=None, int chunk_read_limit=100000): + Expression filters=None, int chunk_read_limit=1024000000): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -798,7 +796,7 @@ cdef class ParquetReader: cdef cudf_io_types.source_info source = make_source_info( filepaths_or_buffers) - cdef bool cpp_use_pandas_metadata = use_pandas_metadata + self.cpp_use_pandas_metadata = use_pandas_metadata cdef vector[vector[size_type]] cpp_row_groups cdef data_type cpp_timestamp_type = cudf_types.data_type( @@ -813,7 +811,7 @@ cdef class ParquetReader: builder = ( parquet_reader_options.builder(source) .row_groups(cpp_row_groups) - .use_pandas_metadata(cpp_use_pandas_metadata) + .use_pandas_metadata(self.cpp_use_pandas_metadata) .timestamp_type(cpp_timestamp_type) ) if filters is not None: @@ -882,8 +880,8 @@ cdef class ParquetReader: move(c_result.tbl), column_names=self.names, )) - if not self.initialized: - self.initialized = True + + self.initialized = True return df def read(self): diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a6c67d22af7..21d87a3a287 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -858,12 +858,19 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - return libparquet.read_parquet( + x = libparquet.ParquetReader( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, ) + return x.read() + # return libparquet.read_parquet( + # filepaths_or_buffers, + # columns=columns, + # row_groups=row_groups, + # use_pandas_metadata=use_pandas_metadata, + # ) else: if ( isinstance(filepaths_or_buffers, list) From 1c11945f6ddaf34d13a1f4db1e3eeb32c2717717 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Wed, 22 May 2024 23:40:56 +0000 Subject: [PATCH 4/8] use pass_read_limit --- python/cudf/cudf/_lib/parquet.pyx | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 5561227eba4..3d999603627 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -773,6 +773,7 @@ cdef class ParquetReader: cdef bool initialized cdef unique_ptr[cpp_chunked_parquet_reader] reader cdef size_t chunk_read_limit + cdef size_t pass_read_limit cdef size_t row_group_size_bytes cdef table_metadata result_meta cdef vector[unordered_map[string, string]] per_file_user_data @@ -790,7 +791,9 @@ cdef class ParquetReader: def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, - Expression filters=None, int chunk_read_limit=1024000000): + Expression filters=None, + size_t chunk_read_limit=0, + size_t pass_read_limit=1024000000): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -840,9 +843,16 @@ cdef class ParquetReader: self.allow_range_index &= filters is None self.chunk_read_limit = chunk_read_limit + self.pass_read_limit = pass_read_limit with nogil: - self.reader.reset(new cpp_chunked_parquet_reader(chunk_read_limit, args)) + self.reader.reset( + new cpp_chunked_parquet_reader( + chunk_read_limit, + pass_read_limit, + args + ) + ) self.initialized = False self.row_groups = row_groups self.filepaths_or_buffers = filepaths_or_buffers From 866b13d531da53277d2f1519d5cede9717508ff7 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Thu, 30 May 2024 18:29:35 +0000 Subject: [PATCH 5/8] add tests --- python/cudf/cudf/io/parquet.py | 9 +-------- python/cudf/cudf/tests/test_parquet.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a007ed3bc0d..dbdb2093b72 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -908,19 +908,12 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - x = libparquet.ParquetReader( + return libparquet.read_parquet( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, ) - return x.read() - # return libparquet.read_parquet( - # filepaths_or_buffers, - # columns=columns, - # row_groups=row_groups, - # use_pandas_metadata=use_pandas_metadata, - # ) else: if ( isinstance(filepaths_or_buffers, list) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e32fdacd8d6..e95dc9cdf04 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -22,6 +22,7 @@ from pyarrow import fs as pa_fs, parquet as pq import cudf +from cudf._lib.parquet import ParquetReader from cudf.io.parquet import ( ParquetDatasetWriter, ParquetWriter, @@ -3407,3 +3408,18 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): # Check results assert_eq(expected, got) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +def test_parquet_chunked_reader(chunk_read_limit, pass_read_limit): + expected = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000}) + buffer = BytesIO() + expected.to_parquet(buffer) + reader = ParquetReader( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + ) + actual = reader.read() + assert_eq(expected, actual) From 8925162faf856d484307bdf40f55b7903081ae50 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 3 Jun 2024 23:24:29 +0000 Subject: [PATCH 6/8] remove duplication --- python/cudf/cudf/_lib/parquet.pyx | 305 ++++++++----------------- python/cudf/cudf/tests/test_parquet.py | 15 +- 2 files changed, 101 insertions(+), 219 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 3d999603627..ac03c3e995c 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -26,6 +26,7 @@ from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr +from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move @@ -128,50 +129,22 @@ def _parse_metadata(meta): return file_is_range_index, file_index_cols, file_column_dtype -cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, - use_pandas_metadata=True, - Expression filters=None): - """ - Cython function to call into libcudf API, see `read_parquet`. - - filters, if not None, should be an Expression that evaluates to a - boolean predicate as a function of columns being read. - - See Also - -------- - cudf.io.parquet.read_parquet - cudf.io.parquet.to_parquet - """ - - # Convert NativeFile buffers to NativeFileDatasource, - # but save original buffers in case we need to use - # pyarrow for metadata processing - # (See: https://github.com/rapidsai/cudf/issues/9599) - pa_buffers = [] - for i, datasource in enumerate(filepaths_or_buffers): - if isinstance(datasource, NativeFile): - pa_buffers.append(datasource) - filepaths_or_buffers[i] = NativeFileDatasource(datasource) +cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( + cudf_io_types.source_info source, + vector[vector[size_type]] row_groups, + bool use_pandas_metadata, + Expression filters, + object columns): - cdef cudf_io_types.source_info source = make_source_info( - filepaths_or_buffers) - - cdef bool cpp_use_pandas_metadata = use_pandas_metadata - - cdef vector[vector[size_type]] cpp_row_groups + cdef parquet_reader_options args + cdef parquet_reader_options_builder builder cdef data_type cpp_timestamp_type = cudf_types.data_type( cudf_types.type_id.EMPTY ) - if row_groups is not None: - cpp_row_groups = row_groups - - # Setup parquet reader arguments - cdef parquet_reader_options args - cdef parquet_reader_options_builder builder builder = ( parquet_reader_options.builder(source) - .row_groups(cpp_row_groups) - .use_pandas_metadata(cpp_use_pandas_metadata) + .row_groups(row_groups) + .use_pandas_metadata(use_pandas_metadata) .use_arrow_schema(True) .timestamp_type(cpp_timestamp_type) ) @@ -187,28 +160,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, for col in columns: cpp_columns.push_back(str(col).encode()) args.set_columns(cpp_columns) - # Filters don't handle the range index correctly allow_range_index &= filters is None - # Read Parquet - cdef cudf_io_types.table_with_metadata c_result - - with nogil: - c_result = move(parquet_reader(args)) - - names = [info.name.decode() for info in c_result.metadata.schema_info] - - # Access the Parquet per_file_user_data to find the index + return pair[parquet_reader_options, bool](args, allow_range_index) + +cdef object _process_metadata(object df, + table_metadata table_meta, + list names, + object row_groups, + object filepaths_or_buffers, + list pa_buffers, + bool allow_range_index, + bool use_pandas_metadata): + update_struct_field_names(df, table_meta.schema_info) index_col = None - cdef vector[unordered_map[string, string]] per_file_user_data = \ - c_result.metadata.per_file_user_data - + is_range_index = True column_index_type = None index_col_names = None - is_range_index = True + meta = None + cdef vector[unordered_map[string, string]] per_file_user_data = \ + table_meta.per_file_user_data for single_file in per_file_user_data: json_str = single_file[b'pandas'].decode('utf-8') - meta = None if json_str != "": meta = json.loads(json_str) file_is_range_index, index_col, column_index_type = _parse_metadata(meta) @@ -222,29 +195,16 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if c['field_name'] == idx_col: index_col_names[idx_col] = c['name'] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=names - )) - - update_struct_field_names(df, c_result.metadata.schema_info) - if meta is not None: - # Book keep each column metadata as the order - # of `meta["columns"]` and `column_names` are not - # guaranteed to be deterministic and same always. meta_data_per_column = { col_meta['name']: col_meta for col_meta in meta["columns"] } - - # update the decimal precision of each column for col in names: if isinstance(df._data[col].dtype, cudf.core.dtypes.DecimalDtype): df._data[col].dtype.precision = ( meta_data_per_column[col]["metadata"]["precision"] ) - # Set the index column if index_col is not None and len(index_col) > 0: if is_range_index: if not allow_range_index: @@ -264,7 +224,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if row_groups is not None: per_file_metadata = [ pa.parquet.read_metadata( - # Pyarrow cannot read directly from bytes io.BytesIO(s) if isinstance(s, bytes) else s ) for s in ( pa_buffers or filepaths_or_buffers @@ -321,9 +280,65 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if use_pandas_metadata: df.index.names = index_col - # Set column dtype for empty types. if len(df._data.names) == 0 and column_index_type is not None: df._data.label_dtype = cudf.dtype(column_index_type) + + return df + + +cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + Expression filters=None): + """ + Cython function to call into libcudf API, see `read_parquet`. + + filters, if not None, should be an Expression that evaluates to a + boolean predicate as a function of columns being read. + + See Also + -------- + cudf.io.parquet.read_parquet + cudf.io.parquet.to_parquet + """ + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + cdef vector[vector[size_type]] cpp_row_groups + if row_groups is not None: + cpp_row_groups = row_groups + + # Setup parquet reader arguments + cdef parquet_reader_options args + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, filters, columns) + args, allow_range_index = c_res.first, c_res.second + + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(parquet_reader(args)) + + names = [info.name.decode() for info in c_result.metadata.schema_info] + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=names + )) + df = _process_metadata(df, c_result.metadata, names, row_groups, + filepaths_or_buffers, pa_buffers, + allow_range_index, use_pandas_metadata) return df cpdef read_parquet_metadata(filepaths_or_buffers): @@ -791,7 +806,6 @@ cdef class ParquetReader: def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, - Expression filters=None, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000): @@ -812,38 +826,12 @@ cdef class ParquetReader: self.cpp_use_pandas_metadata = use_pandas_metadata cdef vector[vector[size_type]] cpp_row_groups - cdef data_type cpp_timestamp_type = cudf_types.data_type( - cudf_types.type_id.EMPTY - ) if row_groups is not None: cpp_row_groups = row_groups - - # Setup parquet reader arguments cdef parquet_reader_options args - cdef parquet_reader_options_builder builder - builder = ( - parquet_reader_options.builder(source) - .row_groups(cpp_row_groups) - .use_pandas_metadata(self.cpp_use_pandas_metadata) - .timestamp_type(cpp_timestamp_type) - ) - if filters is not None: - builder = builder.filter(dereference(filters.c_obj.get())) - - args = move(builder.build()) - cdef vector[string] cpp_columns - self.allow_range_index = True - if columns is not None: - cpp_columns.reserve(len(columns)) - self.allow_range_index = len(columns) > 0 - for col in columns: - cpp_columns.push_back(str(col).encode()) - args.set_columns(cpp_columns) - # Filters don't handle the range index correctly - self.allow_range_index &= filters is None - - self.chunk_read_limit = chunk_read_limit - self.pass_read_limit = pass_read_limit + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, None, columns) + args, self.allow_range_index = c_res.first, c_res.second with nogil: self.reader.reset( @@ -873,28 +861,6 @@ cdef class ParquetReader: if not self.initialized: self.names = [info.name.decode() for info in c_result.metadata.schema_info] self.result_meta = c_result.metadata - self.per_file_user_data = c_result.metadata.per_file_user_data - - # Access the Parquet per_file_user_data to find the index - - self.column_index_type = None - self.index_col_names = None - self.is_range_index = True - for single_file in self.per_file_user_data: - json_str = single_file[b'pandas'].decode('utf-8') - if json_str != "": - self.pandas_meta = json.loads(json_str) - file_is_range_index, self.index_col, self.column_index_type = \ - _parse_metadata(self.pandas_meta) - self.is_range_index &= file_is_range_index - - if not file_is_range_index and self.index_col is not None \ - and self.index_col_names is None: - self.index_col_names = {} - for idx_col in self.index_col: - for c in self.pandas_meta['columns']: - if c['field_name'] == idx_col: - self.index_col_names[idx_col] = c['name'] df = cudf.DataFrame._from_data(*data_from_unique_ptr( move(c_result.tbl), @@ -909,102 +875,9 @@ cdef class ParquetReader: while self._has_next(): dfs.append(self._read_chunk()) df = cudf.concat(dfs) - update_struct_field_names(df, self.result_meta.schema_info) - if self.pandas_meta is not None: - # Book keep each column metadata as the order - # of `meta["columns"]` and `column_names` are not - # guaranteed to be deterministic and same always. - meta_data_per_column = { - col_meta['name']: col_meta for col_meta in self.pandas_meta["columns"] - } - - # update the decimal precision of each column - for col in self.names: - if isinstance(df._data[col].dtype, cudf.core.dtypes.DecimalDtype): - df._data[col].dtype.precision = ( - meta_data_per_column[col]["metadata"]["precision"] - ) - - # Set the index column - if self.index_col is not None and len(self.index_col) > 0: - if self.is_range_index: - if not self.allow_range_index: - return df - - if len(self.per_file_user_data) > 1: - range_index_meta = { - "kind": "range", - "name": None, - "start": 0, - "stop": len(df), - "step": 1 - } - else: - range_index_meta = self.index_col[0] - - if self.row_groups is not None: - per_file_metadata = [ - pa.parquet.read_metadata( - # Pyarrow cannot read directly from bytes - io.BytesIO(s) if isinstance(s, bytes) else s - ) for s in ( - self.pa_buffers or self.filepaths_or_buffers - ) - ] - - filtered_idx = [] - for i, file_meta in enumerate(per_file_metadata): - row_groups_i = [] - start = 0 - for row_group in range(file_meta.num_row_groups): - stop = start + file_meta.row_group(row_group).num_rows - row_groups_i.append((start, stop)) - start = stop - - for rg in self.row_groups[i]: - filtered_idx.append( - cudf.RangeIndex( - start=row_groups_i[rg][0], - stop=row_groups_i[rg][1], - step=range_index_meta['step'] - ) - ) - - if len(filtered_idx) > 0: - idx = cudf.concat(filtered_idx) - else: - idx = cudf.Index(cudf.core.column.column_empty(0)) - else: - idx = cudf.RangeIndex( - start=range_index_meta['start'], - stop=range_index_meta['stop'], - step=range_index_meta['step'], - name=range_index_meta['name'] - ) - - df._index = idx - elif set(self.index_col).issubset(self.names): - index_data = df[self.index_col] - actual_index_names = list(self.index_col_names.values()) - if len(index_data._data) == 1: - idx = cudf.Index( - index_data._data.columns[0], - name=actual_index_names[0] - ) - else: - idx = cudf.MultiIndex.from_frame( - index_data, - names=actual_index_names - ) - df.drop(columns=self.index_col, inplace=True) - df._index = idx - else: - if self.cpp_use_pandas_metadata: - df.index.names = self.index_col - - # Set column dtype for empty types. - if len(df._data.names) == 0 and self.column_index_type is not None: - df._data.label_dtype = cudf.dtype(self.column_index_type) + df = _process_metadata(df, self.result_meta, self.names, self.row_groups, + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) return df cpdef merge_filemetadata(object filemetadata_list): diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e95dc9cdf04..18e8891d95b 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3412,14 +3412,23 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): @pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) @pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) -def test_parquet_chunked_reader(chunk_read_limit, pass_read_limit): - expected = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000}) +@pytest.mark.parametrize("use_pandas_metadata", [True, False]) +@pytest.mark.parametrize("row_groups", [[[0]], None, [[0, 1]]]) +def test_parquet_chunked_reader( + chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups +): + df = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000}) buffer = BytesIO() - expected.to_parquet(buffer) + df.to_parquet(buffer) reader = ParquetReader( [buffer], chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, + use_pandas_metadata=use_pandas_metadata, + row_groups=row_groups, + ) + expected = cudf.read_parquet( + buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups ) actual = reader.read() assert_eq(expected, actual) From c0587baeac7fa7acde552bf3e91353f13bd692da Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 3 Jun 2024 23:26:44 +0000 Subject: [PATCH 7/8] cleanup --- python/cudf/cudf/_lib/parquet.pyx | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index ac03c3e995c..f0f32831d07 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -196,15 +196,21 @@ cdef object _process_metadata(object df, index_col_names[idx_col] = c['name'] if meta is not None: + # Book keep each column metadata as the order + # of `meta["columns"]` and `column_names` are not + # guaranteed to be deterministic and same always. meta_data_per_column = { col_meta['name']: col_meta for col_meta in meta["columns"] } + + # update the decimal precision of each column for col in names: if isinstance(df._data[col].dtype, cudf.core.dtypes.DecimalDtype): df._data[col].dtype.precision = ( meta_data_per_column[col]["metadata"]["precision"] ) + # Set the index column if index_col is not None and len(index_col) > 0: if is_range_index: if not allow_range_index: @@ -224,6 +230,7 @@ cdef object _process_metadata(object df, if row_groups is not None: per_file_metadata = [ pa.parquet.read_metadata( + # Pyarrow cannot read directly from bytes io.BytesIO(s) if isinstance(s, bytes) else s ) for s in ( pa_buffers or filepaths_or_buffers From dc79239e8e9237e8913c41cfb9dd4b40856ecfbb Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 4 Jun 2024 18:39:46 +0000 Subject: [PATCH 8/8] add strings --- python/cudf/cudf/tests/test_parquet.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 18e8891d95b..2596fe8cd37 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3417,7 +3417,9 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): def test_parquet_chunked_reader( chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups ): - df = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000}) + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + ) buffer = BytesIO() df.to_parquet(buffer) reader = ParquetReader(