From b4ce6e4815dbf1af533312a2b0350303a7db785d Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 22 May 2024 13:20:10 -0700 Subject: [PATCH] Expose some Parquet per-column configuration options via the python API (#15613) Several recent PRs (#15081, #15411, #15600) added the ability to control some aspects of Parquet file writing on a per-column basis. During discussion of #15081 it was [suggested](https://github.com/rapidsai/cudf/pull/15081#issuecomment-1979731930) that these options be exposed by cuDF-python in a manner similar to pyarrow. This PR adds the ability to control per-column encoding, compression, binary output, and fixed-length data width, using fully qualified Parquet column names. For example, given a cuDF table with an integer column 'a', and a `list` column 'b', the fully qualified column names would be 'a' and 'b.list.element'. Addresses "Add cuDF-python API support for specifying encodings" task in #13501. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Muhammad Haseeb (https://github.com/mhaseeb123) - GALI PREM SAGAR (https://github.com/galipremsagar) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/15613 --- python/cudf/cudf/_lib/parquet.pyx | 74 +++++++++++++++++- .../cudf/_lib/pylibcudf/libcudf/io/types.pxd | 18 ++++- python/cudf/cudf/core/dataframe.py | 8 ++ python/cudf/cudf/io/parquet.py | 64 ++++++++++++++++ python/cudf/cudf/tests/test_parquet.py | 76 +++++++++++++++++++ python/cudf/cudf/utils/ioutils.py | 16 ++++ 6 files changed, 252 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 70acb7f917b..f0eef9be124 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -408,6 +408,10 @@ def write_parquet( object force_nullable_schema=False, header_version="1.0", use_dictionary=True, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, + object output_as_binary=None, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -458,7 +462,12 @@ def write_parquet( _set_col_metadata( table[name]._column, tbl_meta.column_metadata[i], - force_nullable_schema + force_nullable_schema, + None, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) cdef map[string, string] tmp_user_data @@ -810,16 +819,62 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): raise ValueError("Unsupported `compression` type") +cdef cudf_io_types.column_encoding _get_encoding_type(object encoding): + if encoding is None: + return cudf_io_types.column_encoding.USE_DEFAULT + + enc = str(encoding).upper() + if enc == "PLAIN": + return cudf_io_types.column_encoding.PLAIN + elif enc == "DICTIONARY": + return cudf_io_types.column_encoding.DICTIONARY + elif enc == "DELTA_BINARY_PACKED": + return cudf_io_types.column_encoding.DELTA_BINARY_PACKED + elif enc == "DELTA_LENGTH_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY + elif enc == "DELTA_BYTE_ARRAY": + return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY + elif enc == "BYTE_STREAM_SPLIT": + return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT + elif enc == "USE_DEFAULT": + return cudf_io_types.column_encoding.USE_DEFAULT + else: + raise ValueError("Unsupported `column_encoding` type") + + cdef _set_col_metadata( Column col, column_in_metadata& col_meta, bool force_nullable_schema=False, + str path=None, + object skip_compression=None, + object column_encoding=None, + object column_type_length=None, + object output_as_binary=None, ): + need_path = (skip_compression is not None or column_encoding is not None or + column_type_length is not None or output_as_binary is not None) + name = col_meta.get_name().decode('UTF-8') if need_path else None + full_path = path + "." + name if path is not None else name + if force_nullable_schema: # Only set nullability if `force_nullable_schema` # is true. col_meta.set_nullability(True) + if skip_compression is not None and full_path in skip_compression: + col_meta.set_skip_compression(True) + + if column_encoding is not None and full_path in column_encoding: + col_meta.set_encoding(_get_encoding_type(column_encoding[full_path])) + + if column_type_length is not None and full_path in column_type_length: + col_meta.set_output_as_binary(True) + col_meta.set_type_length(column_type_length[full_path]) + + if output_as_binary is not None and full_path in output_as_binary: + col_meta.set_output_as_binary(True) + if isinstance(col.dtype, cudf.StructDtype): for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) @@ -828,13 +883,26 @@ cdef _set_col_metadata( _set_col_metadata( child_col, col_meta.child(i), - force_nullable_schema + force_nullable_schema, + full_path, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.ListDtype): + if full_path is not None: + full_path = full_path + ".list" + col_meta.child(1).set_name("element".encode()) _set_col_metadata( col.children[1], col_meta.child(1), - force_nullable_schema + force_nullable_schema, + full_path, + skip_compression, + column_encoding, + column_type_length, + output_as_binary ) elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype): col_meta.set_decimal_precision(col.dtype.precision) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd index 4725c4e5937..38fae1df1e5 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd @@ -1,6 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int32_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport shared_ptr, unique_ptr @@ -57,6 +57,19 @@ cdef extern from "cudf/io/types.hpp" \ ADAPTIVE = 1, ALWAYS = 2, + cdef extern from "cudf/io/types.hpp" namespace "cudf::io" nogil: + cpdef enum class column_encoding: + USE_DEFAULT = -1 + DICTIONARY = 0 + PLAIN = 1 + DELTA_BINARY_PACKED = 2 + DELTA_LENGTH_BYTE_ARRAY =3 + DELTA_BYTE_ARRAY = 4 + BYTE_STREAM_SPLIT = 5 + DIRECT = 6 + DIRECT_V2 = 7 + DICTIONARY_V2 = 8 + cdef cppclass column_name_info: string name vector[column_name_info] children @@ -81,6 +94,9 @@ cdef extern from "cudf/io/types.hpp" \ column_in_metadata& set_decimal_precision(uint8_t precision) column_in_metadata& child(size_type i) column_in_metadata& set_output_as_binary(bool binary) + column_in_metadata& set_type_length(int32_t type_length) + column_in_metadata& set_skip_compression(bool skip) + column_in_metadata& set_encoding(column_encoding enc) string get_name() cdef cppclass table_input_metadata: diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 9f3f756a1e7..1f530aa3108 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6707,6 +6707,10 @@ def to_parquet( return_metadata=False, use_dictionary=True, header_version="1.0", + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -6733,6 +6737,10 @@ def to_parquet( return_metadata=return_metadata, use_dictionary=use_dictionary, header_version=header_version, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, *args, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index a6c67d22af7..dbdb2093b72 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -69,6 +69,10 @@ def _write_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -102,6 +106,10 @@ def _write_parquet( "force_nullable_schema": force_nullable_schema, "header_version": header_version, "use_dictionary": use_dictionary, + "skip_compression": skip_compression, + "column_encoding": column_encoding, + "column_type_length": column_type_length, + "output_as_binary": output_as_binary, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -140,6 +148,12 @@ def write_to_dataset( max_page_size_rows=None, storage_options=None, force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -204,6 +218,30 @@ def write_to_dataset( If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. + header_version : {{'1.0', '2.0'}}, default "1.0" + Controls whether to use version 1.0 or version 2.0 page headers when + encoding. Version 1.0 is more portable, but version 2.0 enables the + use of newer encoding schemes. + force_nullable_schema : bool, default False. + If True, writes all columns as `null` in schema. + If False, columns are written as `null` if they contain null values, + otherwise as `not null`. + skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. + column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. + column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). + output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. """ fs = ioutils._ensure_filesystem(fs, root_path, storage_options) @@ -241,6 +279,12 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: @@ -262,6 +306,12 @@ def write_to_dataset( max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) return metadata @@ -906,6 +956,10 @@ def to_parquet( force_nullable_schema=False, header_version="1.0", use_dictionary=True, + skip_compression=None, + column_encoding=None, + column_type_length=None, + output_as_binary=None, *args, **kwargs, ): @@ -955,6 +1009,12 @@ def to_parquet( return_metadata=return_metadata, storage_options=storage_options, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) partition_info = ( @@ -983,6 +1043,10 @@ def to_parquet( force_nullable_schema=force_nullable_schema, header_version=header_version, use_dictionary=use_dictionary, + skip_compression=skip_compression, + column_encoding=column_encoding, + column_type_length=column_type_length, + output_as_binary=output_as_binary, ) else: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index b2896d55b80..e32fdacd8d6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2870,6 +2870,82 @@ def flba(i): assert_eq(expect, got) +def test_parquet_flba_round_trip(tmpdir): + def flba(i): + hasher = hashlib.sha256() + hasher.update(i.to_bytes(4, "little")) + return hasher.digest() + + # use pyarrow to write table of fixed_len_byte_array + num_rows = 200 + data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32)) + padf = pa.Table.from_arrays([data], names=["flba"]) + padf_fname = tmpdir.join("padf.parquet") + pq.write_table(padf, padf_fname) + + # round trip data with cudf + cdf = cudf.read_parquet(padf_fname) + cdf_fname = tmpdir.join("cdf.parquet") + cdf.to_parquet(cdf_fname, column_type_length={"flba": 32}) + + # now read back in with pyarrow to test it was written properly by cudf + padf2 = pq.read_table(padf_fname) + padf3 = pq.read_table(cdf_fname) + assert_eq(padf2, padf3) + assert_eq(padf2.schema[0].type, padf3.schema[0].type) + + +@pytest.mark.parametrize( + "encoding", + [ + "PLAIN", + "DICTIONARY", + "DELTA_BINARY_PACKED", + "BYTE_STREAM_SPLIT", + "USE_DEFAULT", + ], +) +def test_per_column_options(tmpdir, encoding): + pdf = pd.DataFrame({"ilist": [[1, 2, 3, 1, 2, 3]], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("ilist.parquet") + cdf.to_parquet( + fname, + column_encoding={"ilist.list.element": encoding}, + compression="SNAPPY", + skip_compression={"ilist.list.element"}, + ) + # DICTIONARY and USE_DEFAULT should both result in a PLAIN_DICTIONARY encoding in parquet + encoding_name = ( + "PLAIN_DICTIONARY" + if encoding == "DICTIONARY" or encoding == "USE_DEFAULT" + else encoding + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert encoding_name in fmd.row_group(0).column(0).encodings + assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED" + assert fmd.row_group(0).column(1).compression == "SNAPPY" + + +@pytest.mark.parametrize( + "encoding", + ["DELTA_LENGTH_BYTE_ARRAY", "DELTA_BYTE_ARRAY"], +) +def test_per_column_options_string_col(tmpdir, encoding): + pdf = pd.DataFrame({"s": ["a string"], "i1": [1]}) + cdf = cudf.from_pandas(pdf) + fname = tmpdir.join("strcol.parquet") + cdf.to_parquet( + fname, + column_encoding={"s": encoding}, + compression="SNAPPY", + ) + pf = pq.ParquetFile(fname) + fmd = pf.metadata + assert encoding in fmd.row_group(0).column(0).encodings + + def test_parquet_reader_rle_boolean(datadir): fname = datadir / "rle_boolean_encoding.parquet" diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index dd9b44c5a53..1366a0b8e84 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -306,6 +306,22 @@ If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values, otherwise as `not null`. +skip_compression : set, optional, default None + If a column name is present in the set, that column will not be compressed, + regardless of the ``compression`` setting. +column_encoding : dict, optional, default None + Sets the page encoding to use on a per-column basis. The key is a column + name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED', + 'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or + 'USE_DEFAULT'. +column_type_length : dict, optional, default None + Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements. + The key is a column name and the value is an integer. The named column + will be output as unannotated binary (i.e. the column will behave as if + ``output_as_binary`` was set). +output_as_binary : set, optional, default None + If a column name is present in the set, that column will be output as + unannotated binary, rather than the default 'UTF-8'. **kwargs Additional parameters will be passed to execution engines other than ``cudf``.