From 61d8d3b87ee95b887401490df8c23cec33f286b5 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:49:31 -0800 Subject: [PATCH 1/3] Add read_parquet_metadata to pylibcudf --- .../api_docs/pylibcudf/io/index.rst | 1 + .../pylibcudf/io/parquet_metadata.rst | 6 + python/cudf/cudf/_lib/io/utils.pxd | 1 - python/cudf/cudf/_lib/io/utils.pyx | 56 ----- python/cudf/cudf/_lib/parquet.pyx | 67 ++---- python/cudf/cudf/tests/test_parquet.py | 4 +- python/pylibcudf/pylibcudf/io/CMakeLists.txt | 4 +- python/pylibcudf/pylibcudf/io/__init__.pxd | 2 +- python/pylibcudf/pylibcudf/io/__init__.py | 12 +- .../pylibcudf/io/parquet_metadata.pxd | 51 +++++ .../pylibcudf/io/parquet_metadata.pyx | 207 ++++++++++++++++++ .../pylibcudf/libcudf/io/parquet_metadata.pxd | 4 +- 12 files changed, 305 insertions(+), 110 deletions(-) create mode 100644 docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet_metadata.rst create mode 100644 python/pylibcudf/pylibcudf/io/parquet_metadata.pxd create mode 100644 python/pylibcudf/pylibcudf/io/parquet_metadata.pyx diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst index 53638f071cc..1a1aae9adb9 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/index.rst @@ -19,4 +19,5 @@ I/O Functions csv json parquet + parquet_metadata timezone diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet_metadata.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet_metadata.rst new file mode 100644 index 00000000000..fce964f9714 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/io/parquet_metadata.rst @@ -0,0 +1,6 @@ +================ +Parquet Metadata +================ + +.. automodule:: pylibcudf.io.parquet_metadata + :members: diff --git a/python/cudf/cudf/_lib/io/utils.pxd b/python/cudf/cudf/_lib/io/utils.pxd index 76a6e32fde0..96504ebdd66 100644 --- a/python/cudf/cudf/_lib/io/utils.pxd +++ b/python/cudf/cudf/_lib/io/utils.pxd @@ -13,7 +13,6 @@ from pylibcudf.libcudf.io.types cimport ( from cudf._lib.column cimport Column -cdef source_info make_source_info(list src) except* cdef sink_info make_sinks_info( list src, vector[unique_ptr[data_sink]] & data) except* cdef sink_info make_sink_info(src, unique_ptr[data_sink] & data) except* diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 564daefbae2..f23980b387a 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -7,76 +7,20 @@ from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector -from pylibcudf.io.datasource cimport Datasource from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.datasource cimport datasource from pylibcudf.libcudf.io.types cimport ( column_name_info, - host_buffer, sink_info, - source_info, ) from cudf._lib.column cimport Column import codecs -import errno import io import os from cudf.core.dtypes import StructDtype - -# Converts the Python source input to libcudf IO source_info -# with the appropriate type and source values -cdef source_info make_source_info(list src) except*: - if not src: - raise ValueError("Need to pass at least one source") - - cdef const unsigned char[::1] c_buffer - cdef vector[host_buffer] c_host_buffers - cdef vector[string] c_files - cdef Datasource csrc - cdef vector[datasource*] c_datasources - empty_buffer = False - if isinstance(src[0], bytes): - empty_buffer = True - for buffer in src: - if (len(buffer) > 0): - c_buffer = buffer - c_host_buffers.push_back(host_buffer(&c_buffer[0], - c_buffer.shape[0])) - empty_buffer = False - elif isinstance(src[0], io.BytesIO): - for bio in src: - c_buffer = bio.getbuffer() # check if empty? - c_host_buffers.push_back(host_buffer(&c_buffer[0], - c_buffer.shape[0])) - # Otherwise src is expected to be a numeric fd, string path, or PathLike. - # TODO (ptaylor): Might need to update this check if accepted input types - # change when UCX and/or cuStreamz support is added. - elif isinstance(src[0], Datasource): - for csrc in src: - c_datasources.push_back(csrc.get_datasource()) - return source_info(c_datasources) - elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)): - # If source is a file, return source_info where type=FILEPATH - if not all(os.path.isfile(file) for file in src): - raise FileNotFoundError(errno.ENOENT, - os.strerror(errno.ENOENT), - src) - - files = [ str(elem).encode() for elem in src] - c_files = files - return source_info(c_files) - else: - raise TypeError("Unrecognized input type: {}".format(type(src[0]))) - - if empty_buffer is True: - c_host_buffers.push_back(host_buffer(NULL, 0)) - - return source_info(c_host_buffers) - # Converts the Python sink input to libcudf IO sink_info. cdef sink_info make_sinks_info( list src, vector[unique_ptr[data_sink]] & sink diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 1212637d330..1d959ddce40 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -27,7 +27,6 @@ from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr from libcpp.string cimport string -from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move from libcpp.vector cimport vector @@ -41,12 +40,7 @@ from pylibcudf.libcudf.io.parquet cimport ( parquet_writer_options, write_parquet as parquet_writer, ) -from pylibcudf.libcudf.io.parquet_metadata cimport ( - parquet_metadata, - read_parquet_metadata as parquet_metadata_reader, -) from pylibcudf.libcudf.io.types cimport ( - source_info, sink_info, column_in_metadata, table_input_metadata, @@ -62,7 +56,6 @@ from cudf._lib.column cimport Column from cudf._lib.io.utils cimport ( add_df_col_struct_names, make_sinks_info, - make_source_info, ) from cudf._lib.utils cimport table_view_from_table @@ -373,7 +366,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, nrows=nrows, skip_rows=skip_rows) return df -cpdef read_parquet_metadata(filepaths_or_buffers): +cpdef read_parquet_metadata(list filepaths_or_buffers): """ Cython function to call into libcudf API, see `read_parquet_metadata`. @@ -382,56 +375,40 @@ cpdef read_parquet_metadata(filepaths_or_buffers): cudf.io.parquet.read_parquet cudf.io.parquet.to_parquet """ - cdef source_info source = make_source_info(filepaths_or_buffers) - - args = move(source) - - cdef parquet_metadata c_result - - # Read Parquet metadata - with nogil: - c_result = move(parquet_metadata_reader(args)) - - # access and return results - num_rows = c_result.num_rows() - num_rowgroups = c_result.num_rowgroups() - - # extract row group metadata and sanitize keys - row_group_metadata = [{k.decode(): v for k, v in metadata} - for metadata in c_result.rowgroup_metadata()] + parquet_metadata = plc.io.parquet_metadata.read_parquet_metadata( + plc.io.SourceInfo(filepaths_or_buffers) + ) # read all column names including index column, if any - col_names = [info.name().decode() for info in c_result.schema().root().children()] - - # access the Parquet file_footer to find the index - index_col = None - cdef unordered_map[string, string] file_footer = c_result.metadata() + col_names = [info.name() for info in parquet_metadata.schema().root().children()] - # get index column name(s) - index_col_names = None - json_str = file_footer[b'pandas'].decode('utf-8') - meta = None + index_col_names = set() + json_str = parquet_metadata.metadata()[b'pandas'].decode('utf-8') if json_str != "": meta = json.loads(json_str) file_is_range_index, index_col, _ = _parse_metadata(meta) - if not file_is_range_index and index_col is not None \ - and index_col_names is None: - index_col_names = {} + if ( + not file_is_range_index + and index_col is not None + ): + columns = meta['columns'] for idx_col in index_col: - for c in meta['columns']: + for c in columns: if c['field_name'] == idx_col: - index_col_names[idx_col] = c['name'] + index_col_names.add(idx_col) # remove the index column from the list of column names # only if index_col_names is not None - if index_col_names is not None: + if len(index_col_names) >= 0: col_names = [name for name in col_names if name not in index_col_names] - # num_columns = length of list(col_names) - num_columns = len(col_names) - - # return the metadata - return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata + return ( + parquet_metadata.num_rows(), + parquet_metadata.num_rowgroups(), + col_names, + len(col_names), + parquet_metadata.rowgroup_metadata() + ) @acquire_spill_lock() diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index c9ce24d2a5b..3c4398a87de 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -405,14 +405,14 @@ def test_parquet_range_index_pandas_metadata(tmpdir, pandas_compat, as_bytes): assert_eq(expect, got) -def test_parquet_read_metadata(tmpdir, pdf): +def test_parquet_read_metadata(tmp_path, pdf): if len(pdf) > 100: pytest.skip("Skipping long setup test") def num_row_groups(rows, group_size): return max(1, (rows + (group_size - 1)) // group_size) - fname = tmpdir.join("metadata.parquet") + fname = tmp_path / "metadata.parquet" row_group_size = 5 pdf.to_parquet(fname, compression="snappy", row_group_size=row_group_size) diff --git a/python/pylibcudf/pylibcudf/io/CMakeLists.txt b/python/pylibcudf/pylibcudf/io/CMakeLists.txt index 965724a47b1..11743f0779f 100644 --- a/python/pylibcudf/pylibcudf/io/CMakeLists.txt +++ b/python/pylibcudf/pylibcudf/io/CMakeLists.txt @@ -12,8 +12,8 @@ # the License. # ============================================================================= -set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx timezone.pyx - types.pyx +set(cython_sources avro.pyx csv.pyx datasource.pyx json.pyx orc.pyx parquet.pyx + parquet_metadata.pyx timezone.pyx types.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/pylibcudf/pylibcudf/io/__init__.pxd b/python/pylibcudf/pylibcudf/io/__init__.pxd index 1bcc0a3f963..cba4341a308 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.pxd +++ b/python/pylibcudf/pylibcudf/io/__init__.pxd @@ -1,5 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. # CSV is removed since it is def not cpdef (to force kw-only arguments) -from . cimport avro, datasource, json, orc, parquet, timezone, types +from . cimport avro, datasource, json, orc, parquet, parquet_metadata, timezone, types from .types cimport SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/__init__.py b/python/pylibcudf/pylibcudf/io/__init__.py index 2e4f215b12c..29fdb5a5c1b 100644 --- a/python/pylibcudf/pylibcudf/io/__init__.py +++ b/python/pylibcudf/pylibcudf/io/__init__.py @@ -1,4 +1,14 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from . import avro, csv, datasource, json, orc, parquet, timezone, types +from . import ( + avro, + csv, + datasource, + json, + orc, + parquet, + parquet_metadata, + timezone, + types, +) from .types import SinkInfo, SourceInfo, TableWithMetadata diff --git a/python/pylibcudf/pylibcudf/io/parquet_metadata.pxd b/python/pylibcudf/pylibcudf/io/parquet_metadata.pxd new file mode 100644 index 00000000000..e421a64adc8 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/parquet_metadata.pxd @@ -0,0 +1,51 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from pylibcudf.io.types cimport SourceInfo +from pylibcudf.libcudf.io.parquet_metadata cimport( + parquet_metadata, + parquet_schema, + parquet_column_schema, +) + +cdef class ParquetColumnSchema: + cdef parquet_column_schema column_schema + + @staticmethod + cdef from_column_schema(parquet_column_schema column_schema) + + cpdef str name(self) + + cpdef int num_children(self) + + cpdef ParquetColumnSchema child(self, int idx) + + cpdef list children(self) + + +cdef class ParquetSchema: + cdef parquet_schema schema + + @staticmethod + cdef from_schema(parquet_schema schema) + + cpdef ParquetColumnSchema root(self) + + +cdef class ParquetMetadata: + cdef parquet_metadata meta + + @staticmethod + cdef from_metadata(parquet_metadata meta) + + cpdef ParquetSchema schema(self) + + cpdef int num_rows(self) + + cpdef int num_rowgroups(self) + + cpdef dict metadata(self) + + cpdef list rowgroup_metadata(self) + + +cpdef ParquetMetadata read_parquet_metadata(SourceInfo src_info) diff --git a/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx b/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx new file mode 100644 index 00000000000..6c66303b5b3 --- /dev/null +++ b/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx @@ -0,0 +1,207 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from pylibcudf.io.types cimport SourceInfo +from pylibcudf.libcudf.io cimport parquet_metadata as cpp_parquet_metadata + + +cdef class ParquetColumnSchema: + """ + Schema of a parquet column, including the nested columns. + + Parameters + ---------- + parquet_column_schema + """ + def __init__(self): + raise ValueError("Construct ParquetColumnSchema with from_column_schema.") + + @staticmethod + cdef from_column_schema(cpp_parquet_metadata.parquet_column_schema column_schema): + cdef ParquetColumnSchema result = ParquetColumnSchema.__new__( + ParquetColumnSchema + ) + result.column_schema = column_schema + return result + + cpdef str name(self): + """ + Returns parquet column name; can be empty. + + Returns + ------- + str + Column name + """ + return self.column_schema.name().decode() + + cpdef int num_children(self): + """ + Returns the number of child columns. + + Returns + ------- + int + Children count + """ + return self.column_schema.num_children() + + cpdef ParquetColumnSchema child(self, int idx): + """ + Returns schema of the child with the given index. + + Parameters + ---------- + idx : int + Child Index + + Returns + ------- + ParquetColumnSchema + Child schema + """ + return ParquetColumnSchema.from_column_schema(self.column_schema.child(idx)) + + cpdef list children(self): + """ + Returns schemas of all child columns. + + Returns + ------- + list[ParquetColumnSchema] + Children schemas. + """ + cdef cpp_parquet_metadata.parquet_column_schema child + return [ + ParquetColumnSchema.from_column_schema(child) + for child in self.column_schema.children() + ] + + +cdef class ParquetSchema: + """ + Schema of a parquet file. + + Parameters + ---------- + parquet_schema + """ + + def __init__(self): + raise ValueError("Construct ParquetSchema with from_schema.") + + @staticmethod + cdef from_schema(cpp_parquet_metadata.parquet_schema schema): + cdef ParquetSchema result = ParquetSchema.__new__(ParquetSchema) + result.schema = schema + return result + + cpdef ParquetColumnSchema root(self): + """ + Returns the schema of the struct column that contains all columns as fields. + + Returns + ------- + ParquetColumnSchema + Root column schema + """ + return ParquetColumnSchema.from_column_schema(self.schema.root()) + + +cdef class ParquetMetadata: + """ + Information about content of a parquet file. + + Parameters + ---------- + parquet_metadata + """ + + def __init__(self): + raise ValueError("Construct ParquetMetadata with from_metadata.") + + @staticmethod + cdef from_metadata(cpp_parquet_metadata.parquet_metadata meta): + cdef ParquetMetadata result = ParquetMetadata.__new__(ParquetMetadata) + result.meta = meta + return result + + cpdef ParquetSchema schema(self): + """ + Returns the parquet schema. + + Returns + ------- + ParquetSchema + Parquet schema + """ + return ParquetSchema.from_schema(self.meta.schema()) + + cpdef int num_rows(self): + """ + Returns the number of rows of the root column. + + Returns + ------- + int + Number of rows + """ + return self.meta.num_rows() + + cpdef int num_rowgroups(self): + """ + Returns the number of rowgroups in the file. + + Returns + ------- + int + Number of row groups. + """ + return self.meta.num_rowgroups() + + cpdef dict metadata(self): + """ + Returns the Key value metadata in the file footer. + + Returns + ------- + dict[bytes, bytes] + Key value metadata as a map. + """ + return self.meta.metadata() + + cpdef list rowgroup_metadata(self): + """ + Returns the row group metadata in the file footer. + + Returns + ------- + list[dict[str, int]] + Vector of row group metadata as maps. + """ + return [ + {key.decode(): val for key, val in metadata} + for metadata in self.meta.rowgroup_metadata() + ] + + +cpdef ParquetMetadata read_parquet_metadata(SourceInfo src_info): + """ + Reads metadata of parquet dataset. + + Parameters + ---------- + src_info : SourceInfo + Dataset source. + + Returns + ------- + ParquetMetadata + Parquet_metadata with parquet schema, number of rows, + number of row groups and key-value metadata. + """ + cdef cpp_parquet_metadata.parquet_metadata c_result + + with nogil: + c_result = cpp_parquet_metadata.read_parquet_metadata(src_info.c_obj) + + return ParquetMetadata.from_metadata(c_result) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet_metadata.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet_metadata.pxd index 8e6da56c9a6..b0ce13e4492 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet_metadata.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet_metadata.pxd @@ -1,11 +1,11 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -cimport pylibcudf.libcudf.io.types as cudf_io_types from libc.stdint cimport int64_t from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector from pylibcudf.libcudf.types cimport size_type +from pylibcudf.libcudf.io.types cimport source_info cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: @@ -28,4 +28,4 @@ cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: unordered_map[string, string] metadata() except+ vector[unordered_map[string, int64_t]] rowgroup_metadata() except+ - cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+ + cdef parquet_metadata read_parquet_metadata(source_info src_info) except+ From 4cdcc84f25178bf13e85559826bce00b43cd701e Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Wed, 6 Nov 2024 18:43:40 -0800 Subject: [PATCH 2/3] Address review --- python/cudf/cudf/_lib/parquet.pyx | 2 +- python/pylibcudf/pylibcudf/io/parquet_metadata.pyx | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 1d959ddce40..d4bd0cd306c 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -383,7 +383,7 @@ cpdef read_parquet_metadata(list filepaths_or_buffers): col_names = [info.name() for info in parquet_metadata.schema().root().children()] index_col_names = set() - json_str = parquet_metadata.metadata()[b'pandas'].decode('utf-8') + json_str = parquet_metadata.metadata()['pandas'] if json_str != "": meta = json.loads(json_str) file_is_range_index, index_col, _ = _parse_metadata(meta) diff --git a/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx b/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx index 6c66303b5b3..352905ff0f8 100644 --- a/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet_metadata.pyx @@ -68,7 +68,7 @@ cdef class ParquetColumnSchema: Returns ------- list[ParquetColumnSchema] - Children schemas. + Child schemas. """ cdef cpp_parquet_metadata.parquet_column_schema child return [ @@ -160,14 +160,14 @@ cdef class ParquetMetadata: cpdef dict metadata(self): """ - Returns the Key value metadata in the file footer. + Returns the key-value metadata in the file footer. Returns ------- dict[bytes, bytes] Key value metadata as a map. """ - return self.meta.metadata() + return {key.decode(): val.decode() for key, val in self.meta.metadata()} cpdef list rowgroup_metadata(self): """ From 1cc5f6930b0f78a7065012643024624be5880bcb Mon Sep 17 00:00:00 2001 From: Matthew Roeschke <10647082+mroeschke@users.noreply.github.com> Date: Thu, 7 Nov 2024 14:32:45 -0800 Subject: [PATCH 3/3] Ignore erroneously sphinx reference error --- docs/cudf/source/conf.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/cudf/source/conf.py b/docs/cudf/source/conf.py index 5942cc16850..0d463b918d3 100644 --- a/docs/cudf/source/conf.py +++ b/docs/cudf/source/conf.py @@ -554,6 +554,8 @@ def on_missing_reference(app, env, node, contnode): nitpick_ignore = [ + # Erroneously warned in ParquetColumnSchema.name + ("py:class", "unicode"), ("py:class", "SeriesOrIndex"), ("py:class", "Dtype"), # The following are erroneously warned due to