From f222b4adc78187539092ad14de9d407451975514 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Wed, 17 Apr 2024 15:44:22 -0700 Subject: [PATCH] Bind `read_parquet_metadata` API to libcudf instead of pyarrow and extract `RowGroup` information (#15398) The `cudf.io.read_parquet_metadata` is now bound to corresponding libcudf API instead of relying on pyarrow. The libcudf API now also returns high level `RowGroup` metadata to solve #11214. Added additional tests and doc updates as well. More metadata information such `min, max` values for each column in each row group can also be extracted and returned if needed. Thoughts? Recommend: Closing #15320 without merging in favor of this PR. Authors: - Muhammad Haseeb (https://github.com/mhaseeb123) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/15398 --- cpp/include/cudf/io/parquet_metadata.hpp | 41 ++++++++++- cpp/src/io/parquet/reader_impl.cpp | 3 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 20 ++++++ cpp/src/io/parquet/reader_impl_helpers.hpp | 8 +++ .../cudf/_lib/cpp/io/parquet_metadata.pxd | 32 +++++++++ python/cudf/cudf/_lib/parquet.pyx | 69 +++++++++++++++++++ python/cudf/cudf/io/parquet.py | 42 +++++++++-- python/cudf/cudf/tests/test_parquet.py | 47 +++++++++++-- python/cudf/cudf/utils/ioutils.py | 4 +- 9 files changed, 249 insertions(+), 17 deletions(-) create mode 100644 python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index 3149b5b5945..e0c406c180c 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,13 @@ enum class TypeKind : int8_t { */ struct parquet_column_schema { public: + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_column_schema() = default; + /** * @brief constructor * @@ -134,6 +141,13 @@ struct parquet_column_schema { */ struct parquet_schema { public: + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_schema() = default; + /** * @brief constructor * @@ -165,6 +179,15 @@ class parquet_metadata { public: /// Key-value metadata in the file footer. using key_value_metadata = std::unordered_map; + /// row group metadata from each RowGroup element. + using row_group_metadata = std::unordered_map; + + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_metadata() = default; /** * @brief constructor @@ -173,15 +196,18 @@ class parquet_metadata { * @param num_rows number of rows * @param num_rowgroups number of row groups * @param file_metadata key-value metadata in the file footer + * @param rg_metadata vector of maps containing metadata for each row group */ parquet_metadata(parquet_schema schema, int64_t num_rows, size_type num_rowgroups, - key_value_metadata file_metadata) + key_value_metadata file_metadata, + std::vector rg_metadata) : _schema{std::move(schema)}, _num_rows{num_rows}, _num_rowgroups{num_rowgroups}, - _file_metadata{std::move(file_metadata)} + _file_metadata{std::move(file_metadata)}, + _rowgroup_metadata{std::move(rg_metadata)} { } @@ -207,6 +233,7 @@ class parquet_metadata { * @return Number of row groups */ [[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; } + /** * @brief Returns the Key value metadata in the file footer. * @@ -214,11 +241,19 @@ class parquet_metadata { */ [[nodiscard]] auto const& metadata() const { return _file_metadata; } + /** + * @brief Returns the row group metadata in the file footer. + * + * @return vector of row group metadata as maps + */ + [[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; } + private: parquet_schema _schema; int64_t _num_rows; size_type _num_rowgroups; key_value_metadata _file_metadata; + std::vector _rowgroup_metadata; }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index e7409f45e13..a524e7c6dcc 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -609,7 +609,8 @@ parquet_metadata read_parquet_metadata(host_span con return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)}, metadata.get_num_rows(), metadata.get_num_row_groups(), - metadata.get_key_value_metadata()[0]}; + metadata.get_key_value_metadata()[0], + metadata.get_rowgroup_metadata()}; } } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index bfc69264ab2..402ccef7a15 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -560,6 +560,26 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t return col->meta_data; } +std::vector> +aggregate_reader_metadata::get_rowgroup_metadata() const +{ + std::vector> rg_metadata; + + std::for_each( + per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) { + std::transform(pfm.row_groups.cbegin(), + pfm.row_groups.cend(), + std::back_inserter(rg_metadata), + [](auto const& rg) { + std::unordered_map rg_meta_map; + rg_meta_map["num_rows"] = rg.num_rows; + rg_meta_map["total_byte_size"] = rg.total_byte_size; + return rg_meta_map; + }); + }); + return rg_metadata; +} + std::string aggregate_reader_metadata::get_pandas_index() const { // Assumes that all input files have the same metadata diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 8295654764e..09f65f9c388 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -166,6 +166,13 @@ class aggregate_reader_metadata { size_type src_idx, int schema_idx) const; + /** + * @brief Extracts high-level metadata for all row groups + * + * @return List of maps containing metadata information for each row group + */ + [[nodiscard]] std::vector> get_rowgroup_metadata() const; + [[nodiscard]] auto get_num_rows() const { return num_rows; } [[nodiscard]] auto get_num_row_groups() const { return num_row_groups; } @@ -178,6 +185,7 @@ class aggregate_reader_metadata { [[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; } [[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); } + /** * @brief Gets the concrete nesting depth of output cudf columns * diff --git a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd new file mode 100644 index 00000000000..e9def2aea5d --- /dev/null +++ b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd @@ -0,0 +1,32 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libc.stdint cimport int64_t +from libcpp.string cimport string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector + +cimport cudf._lib.cpp.io.types as cudf_io_types +from cudf._lib.cpp.types cimport size_type + + +cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: + cdef cppclass parquet_column_schema: + parquet_column_schema() except+ + string name() except+ + size_type num_children() except+ + parquet_column_schema child(int idx) except+ + vector[parquet_column_schema] children() except+ + + cdef cppclass parquet_schema: + parquet_schema() except+ + parquet_column_schema root() except+ + + cdef cppclass parquet_metadata: + parquet_metadata() except+ + parquet_schema schema() except+ + int64_t num_rows() except+ + size_type num_rowgroups() except+ + unordered_map[string, string] metadata() except+ + vector[unordered_map[string, int64_t]] rowgroup_metadata() except+ + + cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+ diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index ce1cba59bec..9ce9aad18f7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -46,6 +46,10 @@ from cudf._lib.cpp.io.parquet cimport ( read_parquet as parquet_reader, write_parquet as parquet_writer, ) +from cudf._lib.cpp.io.parquet_metadata cimport ( + parquet_metadata, + read_parquet_metadata as parquet_metadata_reader, +) from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport data_type, size_type @@ -316,6 +320,71 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, df._data.label_dtype = cudf.dtype(column_index_type) return df +cpdef read_parquet_metadata(filepaths_or_buffers): + """ + Cython function to call into libcudf API, see `read_parquet_metadata`. + + See Also + -------- + cudf.io.parquet.read_parquet + cudf.io.parquet.to_parquet + """ + # Convert NativeFile buffers to NativeFileDatasource + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + + cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers) + + args = move(source) + + cdef parquet_metadata c_result + + # Read Parquet metadata + with nogil: + c_result = move(parquet_metadata_reader(args)) + + # access and return results + num_rows = c_result.num_rows() + num_rowgroups = c_result.num_rowgroups() + + # extract row group metadata and sanitize keys + row_group_metadata = [{k.decode(): v for k, v in metadata} + for metadata in c_result.rowgroup_metadata()] + + # read all column names including index column, if any + col_names = [info.name().decode() for info in c_result.schema().root().children()] + + # access the Parquet file_footer to find the index + index_col = None + cdef unordered_map[string, string] file_footer = c_result.metadata() + + # get index column name(s) + index_col_names = None + json_str = file_footer[b'pandas'].decode('utf-8') + meta = None + if json_str != "": + meta = json.loads(json_str) + file_is_range_index, index_col, _ = _parse_metadata(meta) + if not file_is_range_index and index_col is not None \ + and index_col_names is None: + index_col_names = {} + for idx_col in index_col: + for c in meta['columns']: + if c['field_name'] == idx_col: + index_col_names[idx_col] = c['name'] + + # remove the index column from the list of column names + # only if index_col_names is not None + if index_col_names is not None: + col_names = [name for name in col_names if name not in index_col_names] + + # num_columns = length of list(col_names) + num_columns = len(col_names) + + # return the metadata + return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata + @acquire_spill_lock() def write_parquet( diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index e55898de675..e7f1ad0751f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -267,17 +267,45 @@ def write_to_dataset( @ioutils.doc_read_parquet_metadata() @_cudf_nvtx_annotate -def read_parquet_metadata(path): +def read_parquet_metadata(filepath_or_buffer): """{docstring}""" - import pyarrow.parquet as pq + # Multiple sources are passed as a list. If a single source is passed, + # wrap it in a list for unified processing downstream. + if not is_list_like(filepath_or_buffer): + filepath_or_buffer = [filepath_or_buffer] - pq_file = pq.ParquetFile(path) + # Start by trying to construct a filesystem object + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=None + ) - num_rows = pq_file.metadata.num_rows - num_row_groups = pq_file.num_row_groups - col_names = pq_file.schema.names + # Check if filepath or buffer + filepath_or_buffer = paths if paths else filepath_or_buffer + + # List of filepaths or buffers + filepaths_or_buffers = [] + + for source in filepath_or_buffer: + tmp_source, compression = ioutils.get_reader_filepath_or_buffer( + path_or_data=source, + compression=None, + fs=fs, + use_python_file_object=True, + open_file_options=None, + storage_options=None, + bytes_per_thread=None, + ) + + if compression is not None: + raise ValueError( + "URL content-encoding decompression is not supported" + ) + if isinstance(tmp_source, list): + filepath_or_buffer.extend(tmp_source) + else: + filepaths_or_buffers.append(tmp_source) - return num_rows, num_row_groups, col_names + return libparquet.read_parquet_metadata(filepaths_or_buffers) @_cudf_nvtx_annotate diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9ba71b28637..56a4281aad9 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -415,8 +415,15 @@ def num_row_groups(rows, group_size): row_group_size = 5 pdf.to_parquet(fname, compression="snappy", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + ( + num_rows, + row_groups, + col_names, + num_columns, + _, # rowgroup_metadata + ) = cudf.io.read_parquet_metadata(fname) + assert num_columns == len(pdf.columns) assert num_rows == len(pdf.index) assert row_groups == num_row_groups(num_rows, row_group_size) for a, b in zip(col_names, pdf.columns): @@ -561,7 +568,9 @@ def test_parquet_read_row_groups(tmpdir, pdf, row_group_size): fname = tmpdir.join("row_group.parquet") pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) gdf = [cudf.read_parquet(fname, row_groups=[i]) for i in range(row_groups)] gdf = cudf.concat(gdf) @@ -586,7 +595,9 @@ def test_parquet_read_row_groups_non_contiguous(tmpdir, pdf, row_group_size): fname = tmpdir.join("row_group.parquet") pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) # alternate rows between the two sources gdf = cudf.read_parquet( @@ -1803,7 +1814,9 @@ def test_parquet_writer_row_group_size(tmpdir, row_group_size_kwargs): writer.write_table(gdf) # Simple check for multiple row-groups - nrows, nrow_groups, columns = cudf.io.parquet.read_parquet_metadata(fname) + nrows, nrow_groups, columns, _, _ = cudf.io.parquet.read_parquet_metadata( + fname + ) assert nrows == size assert nrow_groups > 1 assert columns == ["a", "b"] @@ -2853,7 +2866,9 @@ def test_to_parquet_row_group_size( fname, row_group_size_bytes=size_bytes, row_group_size_rows=size_rows ) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) # 8 bytes per row, as the column is int64 expected_num_rows = max( math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes) @@ -2861,6 +2876,28 @@ def test_to_parquet_row_group_size( assert expected_num_rows == row_groups +@pytest.mark.parametrize("size_rows", [500_000, 100_000, 10_000]) +def test_parquet_row_group_metadata(tmpdir, large_int64_gdf, size_rows): + fname = tmpdir.join("row_group_size.parquet") + large_int64_gdf.to_parquet(fname, row_group_size_rows=size_rows) + + # read file metadata from parquet + ( + num_rows, + row_groups, + _, # col_names + _, # num_columns + row_group_metadata, + ) = cudf.io.read_parquet_metadata(fname) + + # length(RowGroupsMetaData) == number of row groups + assert len(row_group_metadata) == row_groups + # sum of rows in row groups == total rows + assert num_rows == sum( + [row_group["num_rows"] for row_group in row_group_metadata] + ) + + def test_parquet_reader_decimal_columns(): df = cudf.DataFrame( { diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 8c58f2b859e..66e14f4b9de 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -101,11 +101,13 @@ Total number of rows Number of row groups List of column names +Number of columns +List of metadata of row groups Examples -------- >>> import cudf ->>> num_rows, num_row_groups, names = cudf.io.read_parquet_metadata(filename) +>>> num_rows, num_row_groups, names, num_columns, row_group_metadata = cudf.io.read_parquet_metadata(filename) >>> df = [cudf.read_parquet(fname, row_group=i) for i in range(row_groups)] >>> df = cudf.concat(df) >>> df