From 4c4fdd2c86deef16bb68a13bc401193274c45266 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Fri, 24 Mar 2023 14:12:37 -0500 Subject: [PATCH] Drop `force_nullable_schema` from chunked parquet writer (#12996) `force_nullable_schema` was introduced in https://github.com/rapidsai/cudf/pull/12952, however strangely only after it has been merged to `branch-23.04` we are seeing the following pytest failure occur locally: ```python (cudfdev) pgali@dt07:/nvme/0/pgali/cudf$ pytest python/dask_cudf/dask_cudf/io/tests/test_parquet.py::test_cudf_list_struct_write ====================================================================================== test session starts ======================================================================================= platform linux -- Python 3.10.9, pytest-7.2.2, pluggy-1.0.0 benchmark: 4.0.0 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000) rootdir: /nvme/0/pgali/cudf/python/dask_cudf plugins: cases-3.6.14, anyio-3.6.2, benchmark-4.0.0, xdist-3.2.1, hypothesis-6.70.0, cov-4.0.0 collected 1 item python/dask_cudf/dask_cudf/io/tests/test_parquet.py F [100%] ============================================================================================ FAILURES ============================================================================================ __________________________________________________________________________________ test_cudf_list_struct_write ___________________________________________________________________________________ tmpdir = local('/tmp/pytest-of-pgali/pytest-84/test_cudf_list_struct_write0') def test_cudf_list_struct_write(tmpdir): df = cudf.DataFrame( { "a": [1, 2, 3], "b": [[[1, 2]], [[2, 3]], None], "c": [[[["a", "z"]]], [[["b", "d", "e"]]], None], } ) df["d"] = df.to_struct() ddf = dask_cudf.from_cudf(df, 3) temp_file = str(tmpdir.join("list_struct.parquet")) > ddf.to_parquet(temp_file) python/dask_cudf/dask_cudf/io/tests/test_parquet.py:493: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner return func(*args, **kwds) python/dask_cudf/dask_cudf/core.py:252: in to_parquet return to_parquet(self, path, *args, **kwargs) ../envs/cudfdev/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:1061: in to_parquet out = out.compute(**compute_kwargs) ../envs/cudfdev/lib/python3.10/site-packages/dask/base.py:314: in compute (result,) = compute(self, traverse=False, **kwargs) ../envs/cudfdev/lib/python3.10/site-packages/dask/base.py:599: in compute results = schedule(dsk, keys, **kwargs) ../envs/cudfdev/lib/python3.10/site-packages/dask/threaded.py:89: in get results = get_async( ../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:511: in get_async raise_exception(exc, tb) ../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:319: in reraise raise exc ../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:224: in execute_task result = _execute_task(task, data) ../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) ../envs/cudfdev/lib/python3.10/site-packages/dask/optimization.py:990: in __call__ return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args))) ../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:149: in get result = _execute_task(task, cache) ../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:119: in _execute_task return func(*(_execute_task(a, cache) for a in args)) ../envs/cudfdev/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:171: in __call__ return self.engine.write_partition( python/dask_cudf/dask_cudf/io/parquet.py:349: in write_partition md = df.to_parquet( ../envs/cudfdev/lib/python3.10/site-packages/cudf/core/dataframe.py:6322: in to_parquet return parquet.to_parquet( ../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner return func(*args, **kwds) ../envs/cudfdev/lib/python3.10/site-packages/cudf/io/parquet.py:783: in to_parquet return _write_parquet( ../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner return func(*args, **kwds) ../envs/cudfdev/lib/python3.10/site-packages/cudf/io/parquet.py:105: in _write_parquet write_parquet_res = libparquet.write_parquet( ../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner return func(*args, **kwds) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > ??? E RuntimeError: CUDF failure at: /nvme/0/pgali/cudf/cpp/src/io/parquet/writer_impl.cu:513: Mismatch in metadata prescribed nullability and input column nullability. Metadata for nullable input column cannot prescribe nullability = false parquet.pyx:432: RuntimeError ==================================================================================== short test summary info ===================================================================================== FAILED python/dask_cudf/dask_cudf/io/tests/test_parquet.py::test_cudf_list_struct_write - RuntimeError: CUDF failure at: /nvme/0/pgali/cudf/cpp/src/io/parquet/writer_impl.cu:513: Mismatch in metadata prescribed nullability and input column nullability. Metadata for nullable inpu... ======================================================================================= 1 failed in 3.90s ======================================================================================== ``` This PR fixes the issue by dropping `force_nullable_schema` from chunked parquet writer. Authors: - GALI PREM SAGAR (https://github.com/galipremsagar) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Bradley Dice (https://github.com/bdice) URL: https://github.com/rapidsai/cudf/pull/12996 --- python/cudf/cudf/_lib/parquet.pyx | 23 ++++++----------------- python/cudf/cudf/tests/test_parquet.py | 20 -------------------- 2 files changed, 6 insertions(+), 37 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 59571b0e4b3..923f5c4089f 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -470,16 +470,6 @@ cdef class ParquetWriter: max_page_size_rows: int, default 20000 Maximum number of rows of each page of the output. By default, 20000 will be used. - force_nullable_schema : bool, default True. - If True, writes all columns as `null` in schema. - If False, columns are written as `null` if they contain null values, - otherwise as `not null`. - - Notes - ----- - `DataFrame.to_parquet` and `ParquetWriter` differ in the default - value for `force_nullable_schema` to enable all the chunks being - written by chunked parquet writer to be schema identical. See Also -------- @@ -497,15 +487,13 @@ cdef class ParquetWriter: cdef size_type row_group_size_rows cdef size_t max_page_size_bytes cdef size_type max_page_size_rows - cdef bool force_nullable_schema def __cinit__(self, object filepath_or_buffer, object index=None, object compression="snappy", str statistics="ROWGROUP", int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, int row_group_size_rows=1000000, int max_page_size_bytes=524288, - int max_page_size_rows=20000, - bool force_nullable_schema=True): + int max_page_size_rows=20000): filepaths_or_buffers = ( list(filepath_or_buffer) if is_list_like(filepath_or_buffer) @@ -520,7 +508,6 @@ cdef class ParquetWriter: self.row_group_size_rows = row_group_size_rows self.max_page_size_bytes = max_page_size_bytes self.max_page_size_rows = max_page_size_rows - self.force_nullable_schema = force_nullable_schema def write_table(self, table, object partitions_info=None): """ Writes a single table to the file """ @@ -615,7 +602,6 @@ cdef class ParquetWriter: _set_col_metadata( table[name]._column, self.tbl_meta.get().column_metadata[i], - self.force_nullable_schema ) index = ( @@ -696,9 +682,12 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression): cdef _set_col_metadata( Column col, column_in_metadata& col_meta, - bool force_nullable_schema + bool force_nullable_schema=False, ): - col_meta.set_nullability(force_nullable_schema or col.nullable) + if force_nullable_schema: + # Only set nullability if `force_nullable_schema` + # is true. + col_meta.set_nullability(True) if is_struct_dtype(col): for i, (child_col, name) in enumerate( diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9b783b03dad..c24ff080033 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2788,23 +2788,3 @@ def test_parquet_writer_schema_nullability(data, force_nullable_schema): assert pa.parquet.read_schema(file_obj).field(0).nullable == ( force_nullable_schema or df.isnull().any().any() ) - - -@pytest.mark.parametrize("data", [{"a": [1, 2, 3, 4]}, {"b": [1, None, 2, 3]}]) -@pytest.mark.parametrize("force_nullable_schema", [True, False]) -def test_parquet_chunked_writer_schema_nullability( - data, force_nullable_schema -): - df = cudf.DataFrame(data) - file_obj = BytesIO() - - writer = ParquetWriter( - file_obj, force_nullable_schema=force_nullable_schema - ) - - writer.write_table(df) - - writer.close() - assert pa.parquet.read_schema(file_obj).field(0).nullable == ( - force_nullable_schema or df.isnull().any().any() - )