diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index 697bce739de..e2d342ffe47 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -18,3 +18,4 @@ I/O Functions avro csv json + parquet diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet.rst new file mode 100644 index 00000000000..9dfbadfa216 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet.rst @@ -0,0 +1,6 @@ +======= +Parquet +======= + +.. automodule:: cudf._lib.pylibcudf.io.parquet + :members: diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index e7959d21e01..a2eed94bb3c 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -18,16 +18,14 @@ from cython.operator cimport dereference from cudf.api.types import is_list_like -from cudf._lib.utils cimport data_from_unique_ptr +from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io -from cudf._lib import pylibcudf from cudf._lib.utils import _index_level_name, generate_pandas_metadata from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr -from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move @@ -35,25 +33,20 @@ from libcpp.vector cimport vector cimport cudf._lib.pylibcudf.libcudf.io.data_sink as cudf_io_data_sink cimport cudf._lib.pylibcudf.libcudf.io.types as cudf_io_types -cimport cudf._lib.pylibcudf.libcudf.types as cudf_types from cudf._lib.column cimport Column from cudf._lib.io.utils cimport ( + add_df_col_struct_names, make_sinks_info, make_source_info, - update_struct_field_names, ) from cudf._lib.pylibcudf.expressions cimport Expression from cudf._lib.pylibcudf.io.datasource cimport NativeFileDatasource -from cudf._lib.pylibcudf.libcudf.expressions cimport expression +from cudf._lib.pylibcudf.io.parquet cimport ChunkedParquetReader from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( - chunked_parquet_reader as cpp_chunked_parquet_reader, chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, - parquet_reader_options, - parquet_reader_options_builder, parquet_writer_options, - read_parquet as parquet_reader, write_parquet as parquet_writer, ) from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport ( @@ -63,19 +56,17 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport ( from cudf._lib.pylibcudf.libcudf.io.types cimport ( column_in_metadata, table_input_metadata, - table_metadata, ) from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view -from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type +from cudf._lib.pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile -from cudf._lib.concat import concat_columns +import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf cimport Table from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT -from cudf._lib.utils cimport data_from_pylibcudf_table - cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -133,71 +124,37 @@ def _parse_metadata(meta): return file_is_range_index, file_index_cols, file_column_dtype -cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( - cudf_io_types.source_info source, - vector[vector[size_type]] row_groups, - bool use_pandas_metadata, - Expression filters, - object columns): - - cdef parquet_reader_options args - cdef parquet_reader_options_builder builder - cdef data_type cpp_timestamp_type = cudf_types.data_type( - cudf_types.type_id.EMPTY - ) - builder = ( - parquet_reader_options.builder(source) - .row_groups(row_groups) - .use_pandas_metadata(use_pandas_metadata) - .use_arrow_schema(True) - .timestamp_type(cpp_timestamp_type) - ) - if filters is not None: - builder = builder.filter(dereference(filters.c_obj.get())) - - args = move(builder.build()) - cdef vector[string] cpp_columns - allow_range_index = True - if columns is not None: - cpp_columns.reserve(len(columns)) - allow_range_index = len(columns) > 0 - for col in columns: - cpp_columns.push_back(str(col).encode()) - args.set_columns(cpp_columns) - allow_range_index &= filters is None - - return pair[parquet_reader_options, bool](args, allow_range_index) - cdef object _process_metadata(object df, - table_metadata table_meta, list names, + dict child_names, + list per_file_user_data, object row_groups, object filepaths_or_buffers, list pa_buffers, bool allow_range_index, bool use_pandas_metadata): - update_struct_field_names(df, table_meta.schema_info) + + add_df_col_struct_names(df, child_names) index_col = None is_range_index = True column_index_type = None index_col_names = None meta = None - cdef vector[unordered_map[string, string]] per_file_user_data = \ - table_meta.per_file_user_data for single_file in per_file_user_data: + if b'pandas' not in single_file: + continue json_str = single_file[b'pandas'].decode('utf-8') - if json_str != "": - meta = json.loads(json_str) - file_is_range_index, index_col, column_index_type = _parse_metadata(meta) - is_range_index &= file_is_range_index - - if not file_is_range_index and index_col is not None \ - and index_col_names is None: - index_col_names = {} - for idx_col in index_col: - for c in meta['columns']: - if c['field_name'] == idx_col: - index_col_names[idx_col] = c['name'] + meta = json.loads(json_str) + file_is_range_index, index_col, column_index_type = _parse_metadata(meta) + is_range_index &= file_is_range_index + + if not file_is_range_index and index_col is not None \ + and index_col_names is None: + index_col_names = {} + for idx_col in index_col: + for c in meta['columns']: + if c['field_name'] == idx_col: + index_col_names[idx_col] = c['name'] if meta is not None: # Book keep each column metadata as the order @@ -297,6 +254,76 @@ cdef object _process_metadata(object df, return df +def read_parquet_chunked( + filepaths_or_buffers, + columns=None, + row_groups=None, + use_pandas_metadata=True, + size_t chunk_read_limit=0, + size_t pass_read_limit=1024000000 +): + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + + pa_buffers = [] + + new_bufs = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + new_bufs.append(NativeFileDatasource(datasource)) + else: + new_bufs.append(datasource) + + # Note: If this function ever takes accepts filters + # allow_range_index needs to be False when a filter is passed + # (see read_parquet) + allow_range_index = columns is not None and len(columns) != 0 + + reader = ChunkedParquetReader( + plc.io.SourceInfo(new_bufs), + columns, + row_groups, + use_pandas_metadata, + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit + ) + + tbl_w_meta = reader.read_chunk() + column_names = tbl_w_meta.column_names(include_children=False) + child_names = tbl_w_meta.child_names + per_file_user_data = tbl_w_meta.per_file_user_data + concatenated_columns = tbl_w_meta.tbl.columns() + + # save memory + del tbl_w_meta + + cdef Table tbl + while reader.has_next(): + tbl = reader.read_chunk().tbl + + for i in range(tbl.num_columns()): + concatenated_columns[i] = plc.concatenate.concatenate( + [concatenated_columns[i], tbl._columns[i]] + ) + # Drop residual columns to save memory + tbl._columns[i] = None + + df = cudf.DataFrame._from_data( + *_data_from_columns( + columns=[Column.from_pylibcudf(plc) for plc in concatenated_columns], + column_names=column_names, + index_names=None + ) + ) + df = _process_metadata(df, column_names, child_names, + per_file_user_data, row_groups, + filepaths_or_buffers, pa_buffers, + allow_range_index, use_pandas_metadata) + return df + + cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, Expression filters=None): @@ -322,33 +349,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, pa_buffers.append(datasource) filepaths_or_buffers[i] = NativeFileDatasource(datasource) - cdef cudf_io_types.source_info source = make_source_info( - filepaths_or_buffers) - - cdef vector[vector[size_type]] cpp_row_groups - if row_groups is not None: - cpp_row_groups = row_groups - - # Setup parquet reader arguments - cdef parquet_reader_options args - cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, filters, columns) - args, allow_range_index = c_res.first, c_res.second + allow_range_index = True + if columns is not None and len(columns) == 0 or filters: + allow_range_index = False # Read Parquet - cdef cudf_io_types.table_with_metadata c_result - with nogil: - c_result = move(parquet_reader(args)) + tbl_w_meta = plc.io.parquet.read_parquet( + plc.io.SourceInfo(filepaths_or_buffers), + columns, + row_groups, + filters, + convert_strings_to_categories = False, + use_pandas_metadata = use_pandas_metadata, + ) - names = [info.name.decode() for info in c_result.metadata.schema_info] + df = cudf.DataFrame._from_data( + *data_from_pylibcudf_io(tbl_w_meta) + ) - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=names - )) - df = _process_metadata(df, c_result.metadata, names, row_groups, - filepaths_or_buffers, pa_buffers, + df = _process_metadata(df, tbl_w_meta.column_names(include_children=False), + tbl_w_meta.child_names, tbl_w_meta.per_file_user_data, + row_groups, filepaths_or_buffers, pa_buffers, allow_range_index, use_pandas_metadata) return df @@ -804,120 +826,6 @@ cdef class ParquetWriter: self.initialized = True -cdef class ParquetReader: - cdef bool initialized - cdef unique_ptr[cpp_chunked_parquet_reader] reader - cdef size_t chunk_read_limit - cdef size_t pass_read_limit - cdef size_t row_group_size_bytes - cdef table_metadata result_meta - cdef vector[unordered_map[string, string]] per_file_user_data - cdef object pandas_meta - cdef list pa_buffers - cdef bool allow_range_index - cdef object row_groups - cdef object filepaths_or_buffers - cdef object names - cdef object column_index_type - cdef object index_col_names - cdef bool is_range_index - cdef object index_col - cdef bool cpp_use_pandas_metadata - - def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, - use_pandas_metadata=True, - size_t chunk_read_limit=0, - size_t pass_read_limit=1024000000): - - # Convert NativeFile buffers to NativeFileDatasource, - # but save original buffers in case we need to use - # pyarrow for metadata processing - # (See: https://github.com/rapidsai/cudf/issues/9599) - - pa_buffers = [] - for i, datasource in enumerate(filepaths_or_buffers): - if isinstance(datasource, NativeFile): - pa_buffers.append(datasource) - filepaths_or_buffers[i] = NativeFileDatasource(datasource) - self.pa_buffers = pa_buffers - cdef cudf_io_types.source_info source = make_source_info( - filepaths_or_buffers) - - self.cpp_use_pandas_metadata = use_pandas_metadata - - cdef vector[vector[size_type]] cpp_row_groups - if row_groups is not None: - cpp_row_groups = row_groups - cdef parquet_reader_options args - cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, None, columns) - args, self.allow_range_index = c_res.first, c_res.second - - with nogil: - self.reader.reset( - new cpp_chunked_parquet_reader( - chunk_read_limit, - pass_read_limit, - args - ) - ) - self.initialized = False - self.row_groups = row_groups - self.filepaths_or_buffers = filepaths_or_buffers - - def _has_next(self): - cdef bool res - with nogil: - res = self.reader.get()[0].has_next() - return res - - def _read_chunk(self): - # Read Parquet - cdef cudf_io_types.table_with_metadata c_result - - with nogil: - c_result = move(self.reader.get()[0].read_chunk()) - - if not self.initialized: - self.names = [info.name.decode() for info in c_result.metadata.schema_info] - self.result_meta = c_result.metadata - - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=self.names, - )) - - self.initialized = True - return df - - def read(self): - dfs = self._read_chunk() - column_names = dfs._column_names - concatenated_columns = list(dfs._columns) - del dfs - while self._has_next(): - new_chunk = list(self._read_chunk()._columns) - for i in range(len(column_names)): - concatenated_columns[i] = concat_columns( - [concatenated_columns[i], new_chunk[i]] - ) - # Must drop any residual GPU columns to save memory - new_chunk[i] = None - - dfs = cudf.DataFrame._from_data( - *data_from_pylibcudf_table( - pylibcudf.Table( - [col.to_pylibcudf(mode="read") for col in concatenated_columns] - ), - column_names=column_names, - index_names=None - ) - ) - - return _process_metadata(dfs, self.result_meta, self.names, self.row_groups, - self.filepaths_or_buffers, self.pa_buffers, - self.allow_range_index, self.cpp_use_pandas_metadata) - cpdef merge_filemetadata(object filemetadata_list): """ Cython function to call into libcudf API, see `merge_row_group_metadata`. diff --git a/python/cudf/cudf/_lib/pylibcudf/expressions.pyx b/python/cudf/cudf/_lib/pylibcudf/expressions.pyx index 38de11406ad..b983a617533 100644 --- a/python/cudf/cudf/_lib/pylibcudf/expressions.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/expressions.pyx @@ -38,6 +38,17 @@ from .types cimport DataType # Aliases for simplicity ctypedef unique_ptr[libcudf_exp.expression] expression_ptr +# Define this class just to have a docstring for it +cdef class Expression: + """ + The base class for all expression types. + This class cannot be instantiated directly, please + instantiate one of its child classes instead. + + For details, see :cpp:class:`cudf::ast::expression`. + """ + pass + cdef class Literal(Expression): """ A literal value used in an abstract syntax tree. diff --git a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt index 8dd08d11dc8..55bea4fc262 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/io/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx types.pyx) +set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx parquet.pyx types.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( @@ -22,6 +22,6 @@ rapids_cython_create_modules( ) set(targets_using_arrow_headers pylibcudf_io_avro pylibcudf_io_csv pylibcudf_io_datasource - pylibcudf_io_json pylibcudf_io_types + pylibcudf_io_json pylibcudf_io_parquet pylibcudf_io_types ) link_to_pyarrow_headers("${targets_using_arrow_headers}") diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd index 5b3272d60e0..62820048584 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.pxd @@ -1,5 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. # CSV is removed since it is def not cpdef (to force kw-only arguments) -from . cimport avro, datasource, json, types +from . cimport avro, datasource, json, parquet, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py index e17deaa4663..27640f7d955 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/io/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, csv, datasource, json, types +from . import avro, csv, datasource, json, parquet, types from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd new file mode 100644 index 00000000000..027f215fb91 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd @@ -0,0 +1,35 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libc.stdint cimport int64_t +from libcpp cimport bool +from libcpp.memory cimport unique_ptr + +from cudf._lib.pylibcudf.expressions cimport Expression +from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( + chunked_parquet_reader as cpp_chunked_parquet_reader, +) +from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.types cimport DataType + + +cdef class ChunkedParquetReader: + cdef unique_ptr[cpp_chunked_parquet_reader] reader + + cpdef bool has_next(self) + cpdef TableWithMetadata read_chunk(self) + + +cpdef read_parquet( + SourceInfo source_info, + list columns = *, + list row_groups = *, + Expression filters = *, + bool convert_strings_to_categories = *, + bool use_pandas_metadata = *, + int64_t skip_rows = *, + size_type num_rows = *, + # disabled see comment in parquet.pyx for more + # ReaderColumnSchema reader_column_schema = *, + # DataType timestamp_type = * +) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx new file mode 100644 index 00000000000..96119e1b714 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx @@ -0,0 +1,204 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +from cython.operator cimport dereference +from libc.stdint cimport int64_t +from libcpp cimport bool +from libcpp.string cimport string +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.expressions cimport Expression +from cudf._lib.pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from cudf._lib.pylibcudf.libcudf.expressions cimport expression +from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( + chunked_parquet_reader as cpp_chunked_parquet_reader, + parquet_reader_options, + read_parquet as cpp_read_parquet, +) +from cudf._lib.pylibcudf.libcudf.io.types cimport table_with_metadata +from cudf._lib.pylibcudf.libcudf.types cimport size_type + + +cdef parquet_reader_options _setup_parquet_reader_options( + SourceInfo source_info, + list columns = None, + list row_groups = None, + Expression filters = None, + bool convert_strings_to_categories = False, + bool use_pandas_metadata = True, + int64_t skip_rows = 0, + size_type num_rows = -1, + # ReaderColumnSchema reader_column_schema = None, + # DataType timestamp_type = DataType(type_id.EMPTY) +): + cdef vector[string] col_vec + cdef parquet_reader_options opts = ( + parquet_reader_options.builder(source_info.c_obj) + .convert_strings_to_categories(convert_strings_to_categories) + .use_pandas_metadata(use_pandas_metadata) + .use_arrow_schema(True) + .build() + ) + if row_groups is not None: + opts.set_row_groups(row_groups) + if num_rows != -1: + opts.set_num_rows(num_rows) + if skip_rows != 0: + opts.set_skip_rows(skip_rows) + if columns is not None: + col_vec.reserve(len(columns)) + for col in columns: + col_vec.push_back(str(col).encode()) + opts.set_columns(col_vec) + if filters is not None: + opts.set_filter(dereference(filters.c_obj.get())) + return opts + + +cdef class ChunkedParquetReader: + """ + Reads chunks of a Parquet file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo object to read the Parquet file from. + columns : list, default None + The names of the columns to be read + row_groups : list[list[size_type]], default None + List of row groups to be read. + use_pandas_metadata : bool, default True + If True, return metadata about the index column in + the per-file user metadata of the ``TableWithMetadata`` + convert_strings_to_categories : bool, default False + Whether to convert string columns to the category type + skip_rows : int64_t, default 0 + The number of rows to skip from the start of the file. + num_rows : size_type, default -1 + The number of rows to read. By default, read the entire file. + chunk_read_limit : size_t, default 0 + Limit on total number of bytes to be returned per read, + or 0 if there is no limit. + pass_read_limit : size_t, default 1024000000 + Limit on the amount of memory used for reading and decompressing data + or 0 if there is no limit. + """ + def __init__( + self, + SourceInfo source_info, + list columns=None, + list row_groups=None, + bool use_pandas_metadata=True, + bool convert_strings_to_categories=False, + int64_t skip_rows = 0, + size_type num_rows = -1, + size_t chunk_read_limit=0, + size_t pass_read_limit=1024000000 + ): + + cdef parquet_reader_options opts = _setup_parquet_reader_options( + source_info, + columns, + row_groups, + filters=None, + convert_strings_to_categories=convert_strings_to_categories, + use_pandas_metadata=use_pandas_metadata, + skip_rows=skip_rows, + num_rows=num_rows, + ) + + with nogil: + self.reader.reset( + new cpp_chunked_parquet_reader( + chunk_read_limit, + pass_read_limit, + opts + ) + ) + + cpdef bool has_next(self): + """ + Returns True if there is another chunk in the Parquet file + to be read. + + Returns + ------- + True if we have not finished reading the file. + """ + with nogil: + return self.reader.get()[0].has_next() + + cpdef TableWithMetadata read_chunk(self): + """ + Read the next chunk into a :py:class:`~.types.TableWithMetadata` + + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. + """ + # Read Parquet + cdef table_with_metadata c_result + + with nogil: + c_result = move(self.reader.get()[0].read_chunk()) + + return TableWithMetadata.from_libcudf(c_result) + +cpdef read_parquet( + SourceInfo source_info, + list columns = None, + list row_groups = None, + Expression filters = None, + bool convert_strings_to_categories = False, + bool use_pandas_metadata = True, + int64_t skip_rows = 0, + size_type num_rows = -1, + # Disabled, these aren't used by cudf-python + # we should only add them back in if there's user demand + # ReaderColumnSchema reader_column_schema = None, + # DataType timestamp_type = DataType(type_id.EMPTY) +): + """Reads an Parquet file into a :py:class:`~.types.TableWithMetadata`. + + Parameters + ---------- + source_info : SourceInfo + The SourceInfo object to read the Parquet file from. + columns : list, default None + The string names of the columns to be read. + row_groups : list[list[size_type]], default None + List of row groups to be read. + filters : Expression, default None + An AST :py:class:`cudf._lib.pylibcudf.expressions.Expression` + to use for predicate pushdown. + convert_strings_to_categories : bool, default False + Whether to convert string columns to the category type + use_pandas_metadata : bool, default True + If True, return metadata about the index column in + the per-file user metadata of the ``TableWithMetadata`` + skip_rows : int64_t, default 0 + The number of rows to skip from the start of the file. + num_rows : size_type, default -1 + The number of rows to read. By default, read the entire file. + + Returns + ------- + TableWithMetadata + The Table and its corresponding metadata (column names) that were read in. + """ + cdef table_with_metadata c_result + cdef parquet_reader_options opts = _setup_parquet_reader_options( + source_info, + columns, + row_groups, + filters, + convert_strings_to_categories, + use_pandas_metadata, + skip_rows, + num_rows, + ) + + with nogil: + c_result = move(cpp_read_parquet(opts)) + + return TableWithMetadata.from_libcudf(c_result) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx index 68498ff88f4..95fa7d4c2ee 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/types.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/types.pyx @@ -122,6 +122,14 @@ cdef class TableWithMetadata: out.metadata = tbl_with_meta.metadata return out + @property + def per_file_user_data(self): + """ + Returns a list containing a dict + containing file-format specific metadata, + for each file being read in. + """ + return self.metadata.per_file_user_data cdef class SourceInfo: """A class containing details on a source to read from. diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index c38f39f7749..d86915c7da9 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -1,6 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.functional cimport reference_wrapper from libcpp.map cimport map @@ -27,8 +27,11 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: # setter + void set_filter(expression &filter) except + void set_columns(vector[string] col_names) except + + void set_num_rows(size_type val) except + void set_row_groups(vector[vector[size_type]] row_grp) except + + void set_skip_rows(int64_t val) except + void enable_use_arrow_schema(bool val) except + void enable_use_pandas_metadata(bool val) except + void set_timestamp_type(data_type type) except + @@ -49,6 +52,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_reader_options_builder& row_groups( vector[vector[size_type]] row_grp ) except + + parquet_reader_options_builder& convert_strings_to_categories( + bool val + ) except + parquet_reader_options_builder& use_pandas_metadata( bool val ) except + diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 0f0a240b5d0..7dab2f20100 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -929,12 +929,12 @@ def _read_parquet( f"following positional arguments: {list(args)}" ) if cudf.get_option("io.parquet.low_memory"): - return libparquet.ParquetReader( + return libparquet.read_parquet_chunked( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, - ).read() + ) else: return libparquet.read_parquet( filepaths_or_buffers, diff --git a/python/cudf/cudf/pylibcudf_tests/common/utils.py b/python/cudf/cudf/pylibcudf_tests/common/utils.py index ed2c5ca06c9..e19ff58927f 100644 --- a/python/cudf/cudf/pylibcudf_tests/common/utils.py +++ b/python/cudf/cudf/pylibcudf_tests/common/utils.py @@ -7,6 +7,7 @@ import numpy as np import pyarrow as pa import pytest +from pyarrow.parquet import write_table as pq_write_table from cudf._lib import pylibcudf as plc from cudf._lib.pylibcudf.io.types import CompressionType @@ -103,25 +104,68 @@ def _make_fields_nullable(typ): return pa.list_(new_fields[0]) return typ + def _contains_type(parent_typ, typ_checker): + """ + Check whether the parent or one of the children + satisfies the typ_checker. + """ + if typ_checker(parent_typ): + return True + if pa.types.is_nested(parent_typ): + for i in range(parent_typ.num_fields): + if _contains_type(parent_typ.field(i).type, typ_checker): + return True + return False + if not check_field_nullability: rhs_type = _make_fields_nullable(rhs.type) rhs = rhs.cast(rhs_type) lhs_type = _make_fields_nullable(lhs.type) - lhs = rhs.cast(lhs_type) - - if pa.types.is_floating(lhs.type) and pa.types.is_floating(rhs.type): - lhs_nans = pa.compute.is_nan(lhs) - rhs_nans = pa.compute.is_nan(rhs) - assert lhs_nans.equals(rhs_nans) - - if pa.compute.any(lhs_nans) or pa.compute.any(rhs_nans): - # masks must be equal at this point - mask = pa.compute.fill_null(pa.compute.invert(lhs_nans), True) - lhs = lhs.filter(mask) - rhs = rhs.filter(mask) + lhs = lhs.cast(lhs_type) - np.testing.assert_array_almost_equal(lhs, rhs) + assert lhs.type == rhs.type, f"{lhs.type} != {rhs.type}" + if _contains_type(lhs.type, pa.types.is_floating) and _contains_type( + rhs.type, pa.types.is_floating + ): + # Flatten nested arrays to liststo do comparisons if nested + # This is so we can do approximate comparisons + # for floats in numpy + def _flatten_arrays(arr): + if pa.types.is_nested(arr.type): + flattened = arr.flatten() + flat_arrs = [] + if isinstance(flattened, list): + for flat_arr in flattened: + flat_arrs += _flatten_arrays(flat_arr) + else: + flat_arrs = [flattened] + else: + flat_arrs = [arr] + return flat_arrs + + if isinstance(lhs, (pa.ListArray, pa.StructArray)): + lhs = _flatten_arrays(lhs) + rhs = _flatten_arrays(rhs) + else: + # Just a regular doublearray + lhs = [lhs] + rhs = [rhs] + + for lh_arr, rh_arr in zip(lhs, rhs): + # Check NaNs positions match + # and then filter out nans + lhs_nans = pa.compute.is_nan(lh_arr) + rhs_nans = pa.compute.is_nan(rh_arr) + assert lhs_nans.equals(rhs_nans) + + if pa.compute.any(lhs_nans) or pa.compute.any(rhs_nans): + # masks must be equal at this point + mask = pa.compute.fill_null(pa.compute.invert(lhs_nans), True) + lh_arr = lh_arr.filter(mask) + rh_arr = rh_arr.filter(mask) + + np.testing.assert_array_almost_equal(lh_arr, rh_arr) else: assert lhs.equals(rhs) @@ -276,6 +320,16 @@ def make_source(path_or_buf, pa_table, format, **kwargs): df.to_json(path_or_buf, mode=mode, **kwargs) elif format == "csv": df.to_csv(path_or_buf, mode=mode, **kwargs) + elif format == "parquet": + # The conversion to pandas is lossy (doesn't preserve + # nested types) so we + # will just use pyarrow directly to write this + pq_write_table( + pa_table, + pa.PythonFile(path_or_buf) + if isinstance(path_or_buf, io.IOBase) + else path_or_buf, + ) if isinstance(path_or_buf, io.IOBase): path_or_buf.seek(0) return path_or_buf diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 4a7194a6d8d..945e1689229 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -170,6 +170,21 @@ def source_or_sink(request, tmp_path): return fp_or_buf() +@pytest.fixture( + params=["a.txt", pathlib.Path("a.txt"), io.BytesIO], +) +def binary_source_or_sink(request, tmp_path): + fp_or_buf = request.param + if isinstance(fp_or_buf, str): + return f"{tmp_path}/{fp_or_buf}" + elif isinstance(fp_or_buf, os.PathLike): + return tmp_path.joinpath(fp_or_buf) + elif issubclass(fp_or_buf, io.IOBase): + # Must construct io.StringIO/io.BytesIO inside + # fixture, or we'll end up re-using it + return fp_or_buf() + + unsupported_types = { # Not supported by pandas # TODO: find a way to test these diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py new file mode 100644 index 00000000000..07d2ab3d69a --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py @@ -0,0 +1,109 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +import pyarrow as pa +import pyarrow.compute as pc +import pytest +from pyarrow.parquet import read_table +from utils import assert_table_and_meta_eq, make_source + +import cudf._lib.pylibcudf as plc +from cudf._lib.pylibcudf.expressions import ( + ASTOperator, + ColumnNameReference, + ColumnReference, + Literal, + Operation, +) + +# Shared kwargs to pass to make_source +_COMMON_PARQUET_SOURCE_KWARGS = {"format": "parquet"} + + +@pytest.mark.parametrize("columns", [None, ["col_int64", "col_bool"]]) +def test_read_parquet_basic( + table_data, binary_source_or_sink, nrows_skiprows, columns +): + _, pa_table = table_data + nrows, skiprows = nrows_skiprows + + source = make_source( + binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS + ) + + res = plc.io.parquet.read_parquet( + plc.io.SourceInfo([source]), + num_rows=nrows, + skip_rows=skiprows, + columns=columns, + ) + + if columns is not None: + pa_table = pa_table.select(columns) + + # Adapt to nrows/skiprows + pa_table = pa_table.slice( + offset=skiprows, length=nrows if nrows != -1 else None + ) + + assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) + + +@pytest.mark.parametrize( + "pa_filters,plc_filters", + [ + ( + pc.field("col_int64") >= 10, + Operation( + ASTOperator.GREATER_EQUAL, + ColumnNameReference("col_int64"), + Literal(plc.interop.from_arrow(pa.scalar(10))), + ), + ), + ( + (pc.field("col_int64") >= 10) & (pc.field("col_double") < 0), + Operation( + ASTOperator.LOGICAL_AND, + Operation( + ASTOperator.GREATER_EQUAL, + ColumnNameReference("col_int64"), + Literal(plc.interop.from_arrow(pa.scalar(10))), + ), + Operation( + ASTOperator.LESS, + ColumnNameReference("col_double"), + Literal(plc.interop.from_arrow(pa.scalar(0.0))), + ), + ), + ), + ( + (pc.field(0) == 10), + Operation( + ASTOperator.EQUAL, + ColumnReference(0), + Literal(plc.interop.from_arrow(pa.scalar(10))), + ), + ), + ], +) +def test_read_parquet_filters( + table_data, binary_source_or_sink, pa_filters, plc_filters +): + _, pa_table = table_data + + source = make_source( + binary_source_or_sink, pa_table, **_COMMON_PARQUET_SOURCE_KWARGS + ) + + plc_table_w_meta = plc.io.parquet.read_parquet( + plc.io.SourceInfo([source]), filters=plc_filters + ) + exp = read_table(source, filters=pa_filters) + assert_table_and_meta_eq( + exp, plc_table_w_meta, check_field_nullability=False + ) + + +# TODO: Test these options +# list row_groups = None, +# ^^^ This one is not tested since it's not in pyarrow/pandas, deprecate? +# bool convert_strings_to_categories = False, +# bool use_pandas_metadata = True diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index f2820d9c112..3806b901b10 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -22,7 +22,7 @@ from pyarrow import fs as pa_fs, parquet as pq import cudf -from cudf._lib.parquet import ParquetReader +from cudf._lib.parquet import read_parquet_chunked from cudf.io.parquet import ( ParquetDatasetWriter, ParquetWriter, @@ -3755,7 +3755,7 @@ def test_parquet_chunked_reader( ) buffer = BytesIO() df.to_parquet(buffer) - reader = ParquetReader( + actual = read_parquet_chunked( [buffer], chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, @@ -3765,7 +3765,6 @@ def test_parquet_chunked_reader( expected = cudf.read_parquet( buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups ) - actual = reader.read() assert_eq(expected, actual)