From 4efe9c8112646b1bfd118cdf86ea2659920f7f68 Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 20 Jun 2022 15:03:52 -0700 Subject: [PATCH 1/2] add page size controls to python --- python/cudf/cudf/_lib/cpp/io/parquet.pxd | 20 ++++++++++++++++++++ python/cudf/cudf/_lib/parquet.pyx | 22 +++++++++++++++++++++- python/cudf/cudf/io/parquet.py | 10 ++++++++++ python/cudf/cudf/utils/ioutils.py | 6 ++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index d152503e82a..4710a26f985 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -75,6 +75,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: string get_column_chunks_file_paths() except+ size_t get_row_group_size_bytes() except+ size_type get_row_group_size_rows() except+ + size_t get_max_page_size_bytes() except+ + size_type get_max_page_size_rows() except+ void set_partitions( vector[cudf_io_types.partition_info] partitions @@ -96,6 +98,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except + void set_row_group_size_bytes(size_t val) except+ void set_row_group_size_rows(size_type val) except+ + void set_max_page_size_bytes(size_t val) except+ + void set_max_page_size_rows(size_type val) except+ @staticmethod parquet_writer_options_builder builder( @@ -137,6 +141,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& row_group_size_rows( size_type val ) except+ + parquet_writer_options_builder& max_page_size_bytes( + size_t val + ) except+ + parquet_writer_options_builder& max_page_size_rows( + size_type val + ) except+ parquet_writer_options build() except + @@ -153,6 +163,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except+ size_t get_row_group_size_bytes() except+ size_type get_row_group_size_rows() except+ + size_t get_max_page_size_bytes() except+ + size_type get_max_page_size_rows() except+ void set_metadata( cudf_io_types.table_input_metadata *m @@ -168,6 +180,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except + void set_row_group_size_bytes(size_t val) except+ void set_row_group_size_rows(size_type val) except+ + void set_max_page_size_bytes(size_t val) except+ + void set_max_page_size_rows(size_type val) except+ @staticmethod chunked_parquet_writer_options_builder builder( @@ -197,6 +211,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& row_group_size_rows( size_type val ) except+ + chunked_parquet_writer_options_builder& max_page_size_bytes( + size_t val + ) except+ + chunked_parquet_writer_options_builder& max_page_size_rows( + size_type val + ) except+ chunked_parquet_writer_options build() except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 264b1fb507b..57f61e92afb 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -328,6 +328,8 @@ cpdef write_parquet( object int96_timestamps=False, object row_group_size_bytes=None, object row_group_size_rows=None, + object max_page_size_bytes=None, + object max_page_size_rows=None, object partitions_info=None): """ Cython function to call into libcudf API, see `write_parquet`. @@ -426,6 +428,10 @@ cpdef write_parquet( args.set_row_group_size_bytes(row_group_size_bytes) if row_group_size_rows is not None: args.set_row_group_size_rows(row_group_size_rows) + if max_page_size_bytes is not None: + args.set_max_page_size_bytes(max_page_size_bytes) + if max_page_size_rows is not None: + args.set_max_page_size_rows(max_page_size_rows) with nogil: out_metadata_c = move(parquet_writer(args)) @@ -463,6 +469,12 @@ cdef class ParquetWriter: row_group_size_rows: int, default 1000000 Maximum number of rows of each stripe of the output. By default, 1000000 (10^6 rows) will be used. + max_page_size_bytes: int, default 524288 + Maximum uncompressed size of each page of the output. + By default, 524288 (512KB) will be used. + max_page_size_rows: int, default 20000 + Maximum number of rows of each page of the output. + By default, 20000 will be used. See Also -------- @@ -478,11 +490,15 @@ cdef class ParquetWriter: cdef object index cdef size_t row_group_size_bytes cdef size_type row_group_size_rows + cdef size_t max_page_size_bytes + cdef size_type max_page_size_rows def __cinit__(self, object filepath_or_buffer, object index=None, object compression=None, str statistics="ROWGROUP", int row_group_size_bytes=134217728, - int row_group_size_rows=1000000): + int row_group_size_rows=1000000, + int max_page_size_bytes=524288, + int max_page_size_rows=20000): filepaths_or_buffers = ( list(filepath_or_buffer) if is_list_like(filepath_or_buffer) @@ -495,6 +511,8 @@ cdef class ParquetWriter: self.initialized = False self.row_group_size_bytes = row_group_size_bytes self.row_group_size_rows = row_group_size_rows + self.max_page_size_bytes = max_page_size_bytes + self.max_page_size_rows = max_page_size_rows def write_table(self, table, object partitions_info=None): """ Writes a single table to the file """ @@ -609,6 +627,8 @@ cdef class ParquetWriter: .stats_level(self.stat_freq) .row_group_size_bytes(self.row_group_size_bytes) .row_group_size_rows(self.row_group_size_rows) + .max_page_size_bytes(self.max_page_size_bytes) + .max_page_size_rows(self.max_page_size_rows) .build() ) self.writer.reset(new cpp_parquet_chunked_writer(args)) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 51c2ac8b828..6f46061505c 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -56,6 +56,8 @@ def _write_parquet( int96_timestamps=False, row_group_size_bytes=None, row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, partitions_info=None, **kwargs, ): @@ -82,6 +84,8 @@ def _write_parquet( "int96_timestamps": int96_timestamps, "row_group_size_bytes": row_group_size_bytes, "row_group_size_rows": row_group_size_rows, + "max_page_size_bytes": max_page_size_bytes, + "max_page_size_rows": max_page_size_rows, "partitions_info": partitions_info, } if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs): @@ -606,6 +610,8 @@ def to_parquet( int96_timestamps=False, row_group_size_bytes=None, row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, *args, **kwargs, ): @@ -635,6 +641,8 @@ def to_parquet( "int96_timestamps": int96_timestamps, "row_group_size_bytes": row_group_size_bytes, "row_group_size_rows": row_group_size_rows, + "max_page_size_bytes": max_page_size_bytes, + "max_page_size_rows": max_page_size_rows, } ) return write_to_dataset( @@ -664,6 +672,8 @@ def to_parquet( int96_timestamps=int96_timestamps, row_group_size_bytes=row_group_size_bytes, row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, **kwargs, ) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 3771587eb47..51e29b79bec 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -251,6 +251,12 @@ row_group_size_rows: integer or None, default None Maximum number of rows of each stripe of the output. If None, 1000000 will be used. +max_page_size_bytes: integer or None, default None + Maximum uncompressed size of each page of the output. + If None, 524288 (512KB) will be used. +max_page_size_rows: integer or None, default None + Maximum number of rows of each page of the output. + If None, 20000 will be used. **kwargs To request metadata binary blob when using with ``partition_cols``, Pass ``return_metadata=True`` instead of specifying ``metadata_file_path`` From 40344c4fcfacf715cbe202abc8ad05de1c8e9fb5 Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 22 Jun 2022 15:52:45 -0700 Subject: [PATCH 2/2] add test of max_page_size params --- python/cudf/cudf/tests/test_parquet.py | 31 ++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 1916417f306..1315e283e5f 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1652,6 +1652,37 @@ def test_parquet_writer_row_group_size(tmpdir, row_group_size_kwargs): assert_eq(cudf.read_parquet(fname), gdf) +@pytest.mark.parametrize( + "max_page_size_kwargs", + [ + {"max_page_size_bytes": 4 * 1024}, + {"max_page_size_rows": 5000}, + ], +) +def test_parquet_writer_max_page_size(tmpdir, max_page_size_kwargs): + # Check that max_page_size options are exposed in Python + # Since we don't have access to page metadata, instead check that + # file written with more pages will be slightly larger + + size = 20000 + gdf = cudf.DataFrame({"a": range(size), "b": [1] * size}) + + fname = tmpdir.join("gdf.parquet") + with ParquetWriter(fname, **max_page_size_kwargs) as writer: + writer.write_table(gdf) + s1 = os.path.getsize(fname) + + assert_eq(cudf.read_parquet(fname), gdf) + + fname = tmpdir.join("gdf0.parquet") + with ParquetWriter(fname) as writer: + writer.write_table(gdf) + s2 = os.path.getsize(fname) + + assert_eq(cudf.read_parquet(fname), gdf) + assert s1 > s2 + + @pytest.mark.parametrize("filename", ["myfile.parquet", None]) @pytest.mark.parametrize("cols", [["b"], ["c", "b"]]) def test_parquet_partitioned(tmpdir_factory, cols, filename):