diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index abf400da102..50119e60882 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -195,9 +195,9 @@ class writer_compression_statistics { * @brief Control use of dictionary encoding for parquet writer */ enum dictionary_policy { - NEVER, ///< Never use dictionary encoding - ADAPTIVE, ///< Use dictionary when it will not impact compression - ALWAYS ///< Use dictionary reqardless of impact on compression + NEVER = 0, ///< Never use dictionary encoding + ADAPTIVE = 1, ///< Use dictionary when it will not impact compression + ALWAYS = 2 ///< Use dictionary regardless of impact on compression }; /** diff --git a/cpp/src/io/parquet/delta_enc.cuh b/cpp/src/io/parquet/delta_enc.cuh index 28f8cdfe2c1..b0a7493fcab 100644 --- a/cpp/src/io/parquet/delta_enc.cuh +++ b/cpp/src/io/parquet/delta_enc.cuh @@ -46,6 +46,8 @@ inline __device__ void put_zz128(uint8_t*& p, zigzag128_t v) // too much shared memory. // The parquet spec requires block_size to be a multiple of 128, and values_per_mini_block // to be a multiple of 32. +// TODO: if these are ever made configurable, be sure to fix the page size calculation in +// delta_data_len() (page_enc.cu). constexpr int block_size = 128; constexpr int num_mini_blocks = 4; constexpr int values_per_mini_block = block_size / num_mini_blocks; diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 9acafd50585..2b7980c93e9 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -393,13 +393,20 @@ __device__ size_t delta_data_len(Type physical_type, cudf::type_id type_id, uint auto const vals_per_block = delta::block_size; size_t const num_blocks = util::div_rounding_up_unsafe(num_values, vals_per_block); - // need max dtype_len + 1 bytes for min_delta + // need max dtype_len + 1 bytes for min_delta (because we only encode 7 bits per byte) // one byte per mini block for the bitwidth - // and block_size * dtype_len bytes for the actual encoded data - auto const block_size = dtype_len + 1 + delta::num_mini_blocks + vals_per_block * dtype_len; + auto const mini_block_header_size = dtype_len + 1 + delta::num_mini_blocks; + // each encoded value can be at most sizeof(type) * 8 + 1 bits + auto const max_bits = dtype_len * 8 + 1; + // each data block will then be max_bits * values per block. vals_per_block is guaranteed to be + // divisible by 128 (via static assert on delta::block_size), but do safe division anyway. + auto const bytes_per_block = cudf::util::div_rounding_up_unsafe(max_bits * vals_per_block, 8); + auto const block_size = mini_block_header_size + bytes_per_block; // delta header is 2 bytes for the block_size, 1 byte for number of mini-blocks, // max 5 bytes for number of values, and max dtype_len + 1 for first value. + // TODO: if we ever allow configurable block sizes then this calculation will need to be + // modified. auto const header_size = 2 + 1 + 5 + dtype_len + 1; return header_size + num_blocks * block_size; @@ -1279,6 +1286,7 @@ __device__ void finish_page_encode(state_buf* s, uint8_t const* const base = s->page.page_data + s->page.max_hdr_size; auto const actual_data_size = static_cast(end_ptr - base); if (actual_data_size > s->page.max_data_size) { + // FIXME(ets): this needs to do error propagation back to the host CUDF_UNREACHABLE("detected possible page data corruption"); } s->page.max_data_size = actual_data_size; diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index cace29b5d45..a6a7ba034aa 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -100,6 +100,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + + void enable_write_v2_headers(bool val) except + + void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + @staticmethod parquet_writer_options_builder builder( @@ -150,6 +152,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& max_page_size_rows( size_type val ) except + + parquet_writer_options_builder& write_v2_headers( + bool val + ) except + + parquet_writer_options_builder& dictionary_policy( + cudf_io_types.dictionary_policy val + ) except + parquet_writer_options build() except + @@ -191,6 +199,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + void set_max_page_size_rows(size_type val) except + + void enable_write_v2_headers(bool val) except + + void set_dictionary_policy(cudf_io_types.dictionary_policy policy)except + @staticmethod chunked_parquet_writer_options_builder builder( @@ -232,6 +242,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& max_page_size_rows( size_type val ) except + + parquet_writer_options_builder& write_v2_headers( + bool val + ) except + + parquet_writer_options_builder& dictionary_policy( + cudf_io_types.dictionary_policy val + ) except + chunked_parquet_writer_options build() except + diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 01eaca82692..d8cc329b0a0 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -52,6 +52,11 @@ cdef extern from "cudf/io/types.hpp" \ STATISTICS_PAGE = 2, STATISTICS_COLUMN = 3, + ctypedef enum dictionary_policy: + NEVER = 0, + ADAPTIVE = 1, + ALWAYS = 2, + cdef cppclass column_name_info: string name vector[column_name_info] children diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index f75a6c2b20e..d8d363686cc 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -321,6 +321,8 @@ def write_parquet( object max_page_size_rows=None, object partitions_info=None, object force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, ): """ Cython function to call into libcudf API, see `write_parquet`. @@ -383,6 +385,18 @@ def write_parquet( tmp_user_data[str.encode("pandas")] = str.encode(pandas_metadata) user_data.push_back(tmp_user_data) + if header_version not in ("1.0", "2.0"): + raise ValueError( + f"Invalid parquet header version: {header_version}. " + "Valid values are '1.0' and '2.0'" + ) + + dict_policy = ( + cudf_io_types.dictionary_policy.ALWAYS + if use_dictionary + else cudf_io_types.dictionary_policy.NEVER + ) + cdef cudf_io_types.compression_type comp_type = _get_comp_type(compression) cdef cudf_io_types.statistics_freq stat_freq = _get_stat_freq(statistics) @@ -399,6 +413,8 @@ def write_parquet( .compression(comp_type) .stats_level(stat_freq) .int96_timestamps(_int96_timestamps) + .write_v2_headers(header_version == "2.0") + .dictionary_policy(dict_policy) .utc_timestamps(False) .build() ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index b38345af83d..fb498442a7d 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6370,6 +6370,8 @@ def to_parquet( max_page_size_rows=None, storage_options=None, return_metadata=False, + use_dictionary=True, + header_version="1.0", *args, **kwargs, ): @@ -6394,6 +6396,8 @@ def to_parquet( max_page_size_rows=max_page_size_rows, storage_options=storage_options, return_metadata=return_metadata, + use_dictionary=use_dictionary, + header_version=header_version, *args, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 1f346578d70..494f90f170a 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -66,6 +66,8 @@ def _write_parquet( partitions_info=None, storage_options=None, force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -96,6 +98,8 @@ def _write_parquet( "max_page_size_rows": max_page_size_rows, "partitions_info": partitions_info, "force_nullable_schema": force_nullable_schema, + "header_version": header_version, + "use_dictionary": use_dictionary, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): with ExitStack() as stack: @@ -204,7 +208,6 @@ def write_to_dataset( fs.mkdirs(root_path, exist_ok=True) if partition_cols is not None and len(partition_cols) > 0: - ( full_paths, metadata_file_paths, @@ -709,7 +712,6 @@ def _parquet_to_frame( dataset_kwargs=None, **kwargs, ): - # If this is not a partitioned read, only need # one call to `_read_parquet` if not partition_keys: @@ -753,7 +755,7 @@ def _parquet_to_frame( ) ) # Add partition columns to the last DataFrame - for (name, value) in part_key: + for name, value in part_key: _len = len(dfs[-1]) if partition_categories and name in partition_categories: # Build the categorical column from `codes` @@ -858,6 +860,8 @@ def to_parquet( storage_options=None, return_metadata=False, force_nullable_schema=False, + header_version="1.0", + use_dictionary=True, *args, **kwargs, ): @@ -932,6 +936,8 @@ def to_parquet( partitions_info=partition_info, storage_options=storage_options, force_nullable_schema=force_nullable_schema, + header_version=header_version, + use_dictionary=use_dictionary, ) else: @@ -1034,7 +1040,6 @@ def _get_groups_and_offsets( preserve_index=False, **kwargs, ): - if not (set(df._data) - set(partition_cols)): warnings.warn("No data left to save outside partition columns") diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index d2c08246518..da41c9cd679 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1285,32 +1285,29 @@ def test_parquet_reader_v2(tmpdir, simple_pdf): simple_pdf.to_parquet(pdf_fname, data_page_version="2.0") assert_eq(cudf.read_parquet(pdf_fname), simple_pdf) + cudf.from_pandas(simple_pdf).to_parquet(pdf_fname, header_version="2.0") + assert_eq(cudf.read_parquet(pdf_fname), simple_pdf) + @pytest.mark.parametrize("nrows", [1, 100000]) @pytest.mark.parametrize("add_nulls", [True, False]) -def test_delta_binary(nrows, add_nulls, tmpdir): +@pytest.mark.parametrize( + "dtype", + [ + "int8", + "int16", + "int32", + "int64", + ], +) +def test_delta_binary(nrows, add_nulls, dtype, tmpdir): null_frequency = 0.25 if add_nulls else 0 # Create a pandas dataframe with random data of mixed types arrow_table = dg.rand_dataframe( dtypes_meta=[ { - "dtype": "int8", - "null_frequency": null_frequency, - "cardinality": nrows, - }, - { - "dtype": "int16", - "null_frequency": null_frequency, - "cardinality": nrows, - }, - { - "dtype": "int32", - "null_frequency": null_frequency, - "cardinality": nrows, - }, - { - "dtype": "int64", + "dtype": dtype, "null_frequency": null_frequency, "cardinality": nrows, }, @@ -1335,6 +1332,28 @@ def test_delta_binary(nrows, add_nulls, tmpdir): pcdf = cudf.from_pandas(test_pdf) assert_eq(cdf, pcdf) + # Write back out with cudf and make sure pyarrow can read it + cudf_fname = tmpdir.join("cudfv2.parquet") + pcdf.to_parquet( + cudf_fname, + compression=None, + header_version="2.0", + use_dictionary=False, + ) + + # FIXME(ets): should probably not use more bits than the data type + try: + cdf2 = cudf.from_pandas(pd.read_parquet(cudf_fname)) + except OSError as e: + if dtype == "int32" and nrows == 100000: + pytest.mark.xfail( + reason="arrow does not support 33-bit delta encoding" + ) + else: + raise e + else: + assert_eq(cdf2, cdf) + @pytest.mark.parametrize( "data", @@ -1469,7 +1488,6 @@ def test_parquet_writer_int96_timestamps(tmpdir, pdf, gdf): def test_multifile_parquet_folder(tmpdir): - test_pdf1 = make_pdf(nrows=10, nvalids=10 // 2) test_pdf2 = make_pdf(nrows=20) expect = pd.concat([test_pdf1, test_pdf2]) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index d2739b35049..6641bd8290a 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -287,6 +287,14 @@ include the file path metadata (relative to `root_path`). To request metadata binary blob when using with ``partition_cols``, Pass ``return_metadata=True`` instead of specifying ``metadata_file_path`` +use_dictionary : bool, default True + When ``False``, prevents the use of dictionary encoding for Parquet page + data. When ``True``, dictionary encoding is preferred when not disabled due + to dictionary size constraints. +header_version : {{'1.0', '2.0'}}, default "1.0" + Controls whether to use version 1.0 or version 2.0 page headers when + encoding. Version 1.0 is more portable, but version 2.0 enables the + use of newer encoding schemes. force_nullable_schema : bool, default False. If True, writes all columns as `null` in schema. If False, columns are written as `null` if they contain null values,