Skip to content

Commit

Permalink
Drop force_nullable_schema from chunked parquet writer (#12996)
Browse files Browse the repository at this point in the history
`force_nullable_schema` was introduced in #12952, however strangely only after it has been merged to `branch-23.04` we are seeing the following pytest failure occur locally:
```python
(cudfdev) pgali@dt07:/nvme/0/pgali/cudf$ pytest python/dask_cudf/dask_cudf/io/tests/test_parquet.py::test_cudf_list_struct_write
====================================================================================== test session starts =======================================================================================
platform linux -- Python 3.10.9, pytest-7.2.2, pluggy-1.0.0
benchmark: 4.0.0 (defaults: timer=time.perf_counter disable_gc=False min_rounds=5 min_time=0.000005 max_time=1.0 calibration_precision=10 warmup=False warmup_iterations=100000)
rootdir: /nvme/0/pgali/cudf/python/dask_cudf
plugins: cases-3.6.14, anyio-3.6.2, benchmark-4.0.0, xdist-3.2.1, hypothesis-6.70.0, cov-4.0.0
collected 1 item                                                                                                                                                                                 

python/dask_cudf/dask_cudf/io/tests/test_parquet.py F                                                                                                                                      [100%]

============================================================================================ FAILURES ============================================================================================
__________________________________________________________________________________ test_cudf_list_struct_write ___________________________________________________________________________________

tmpdir = local('/tmp/pytest-of-pgali/pytest-84/test_cudf_list_struct_write0')

    def test_cudf_list_struct_write(tmpdir):
        df = cudf.DataFrame(
            {
                "a": [1, 2, 3],
                "b": [[[1, 2]], [[2, 3]], None],
                "c": [[[["a", "z"]]], [[["b", "d", "e"]]], None],
            }
        )
        df["d"] = df.to_struct()
    
        ddf = dask_cudf.from_cudf(df, 3)
        temp_file = str(tmpdir.join("list_struct.parquet"))
    
>       ddf.to_parquet(temp_file)

python/dask_cudf/dask_cudf/io/tests/test_parquet.py:493: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner
    return func(*args, **kwds)
python/dask_cudf/dask_cudf/core.py:252: in to_parquet
    return to_parquet(self, path, *args, **kwargs)
../envs/cudfdev/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:1061: in to_parquet
    out = out.compute(**compute_kwargs)
../envs/cudfdev/lib/python3.10/site-packages/dask/base.py:314: in compute
    (result,) = compute(self, traverse=False, **kwargs)
../envs/cudfdev/lib/python3.10/site-packages/dask/base.py:599: in compute
    results = schedule(dsk, keys, **kwargs)
../envs/cudfdev/lib/python3.10/site-packages/dask/threaded.py:89: in get
    results = get_async(
../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:511: in get_async
    raise_exception(exc, tb)
../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:319: in reraise
    raise exc
../envs/cudfdev/lib/python3.10/site-packages/dask/local.py:224: in execute_task
    result = _execute_task(task, data)
../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:119: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
../envs/cudfdev/lib/python3.10/site-packages/dask/optimization.py:990: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:149: in get
    result = _execute_task(task, cache)
../envs/cudfdev/lib/python3.10/site-packages/dask/core.py:119: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
../envs/cudfdev/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:171: in __call__
    return self.engine.write_partition(
python/dask_cudf/dask_cudf/io/parquet.py:349: in write_partition
    md = df.to_parquet(
../envs/cudfdev/lib/python3.10/site-packages/cudf/core/dataframe.py:6322: in to_parquet
    return parquet.to_parquet(
../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner
    return func(*args, **kwds)
../envs/cudfdev/lib/python3.10/site-packages/cudf/io/parquet.py:783: in to_parquet
    return _write_parquet(
../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner
    return func(*args, **kwds)
../envs/cudfdev/lib/python3.10/site-packages/cudf/io/parquet.py:105: in _write_parquet
    write_parquet_res = libparquet.write_parquet(
../envs/cudfdev/lib/python3.10/contextlib.py:79: in inner
    return func(*args, **kwds)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

>   ???
E   RuntimeError: CUDF failure at: /nvme/0/pgali/cudf/cpp/src/io/parquet/writer_impl.cu:513: Mismatch in metadata prescribed nullability and input column nullability. Metadata for nullable input column cannot prescribe nullability = false

parquet.pyx:432: RuntimeError
==================================================================================== short test summary info =====================================================================================
FAILED python/dask_cudf/dask_cudf/io/tests/test_parquet.py::test_cudf_list_struct_write - RuntimeError: CUDF failure at: /nvme/0/pgali/cudf/cpp/src/io/parquet/writer_impl.cu:513: Mismatch in metadata prescribed nullability and input column nullability. Metadata for nullable inpu...
======================================================================================= 1 failed in 3.90s ========================================================================================
```

This PR fixes the issue by dropping `force_nullable_schema` from chunked parquet writer.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #12996
  • Loading branch information
galipremsagar authored Mar 24, 2023
1 parent 9fbc249 commit 4c4fdd2
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 37 deletions.
23 changes: 6 additions & 17 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -470,16 +470,6 @@ cdef class ParquetWriter:
max_page_size_rows: int, default 20000
Maximum number of rows of each page of the output.
By default, 20000 will be used.
force_nullable_schema : bool, default True.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
Notes
-----
`DataFrame.to_parquet` and `ParquetWriter` differ in the default
value for `force_nullable_schema` to enable all the chunks being
written by chunked parquet writer to be schema identical.
See Also
--------
Expand All @@ -497,15 +487,13 @@ cdef class ParquetWriter:
cdef size_type row_group_size_rows
cdef size_t max_page_size_bytes
cdef size_type max_page_size_rows
cdef bool force_nullable_schema

def __cinit__(self, object filepath_or_buffer, object index=None,
object compression="snappy", str statistics="ROWGROUP",
int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
int row_group_size_rows=1000000,
int max_page_size_bytes=524288,
int max_page_size_rows=20000,
bool force_nullable_schema=True):
int max_page_size_rows=20000):
filepaths_or_buffers = (
list(filepath_or_buffer)
if is_list_like(filepath_or_buffer)
Expand All @@ -520,7 +508,6 @@ cdef class ParquetWriter:
self.row_group_size_rows = row_group_size_rows
self.max_page_size_bytes = max_page_size_bytes
self.max_page_size_rows = max_page_size_rows
self.force_nullable_schema = force_nullable_schema

def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
Expand Down Expand Up @@ -615,7 +602,6 @@ cdef class ParquetWriter:
_set_col_metadata(
table[name]._column,
self.tbl_meta.get().column_metadata[i],
self.force_nullable_schema
)

index = (
Expand Down Expand Up @@ -696,9 +682,12 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression):
cdef _set_col_metadata(
Column col,
column_in_metadata& col_meta,
bool force_nullable_schema
bool force_nullable_schema=False,
):
col_meta.set_nullability(force_nullable_schema or col.nullable)
if force_nullable_schema:
# Only set nullability if `force_nullable_schema`
# is true.
col_meta.set_nullability(True)

if is_struct_dtype(col):
for i, (child_col, name) in enumerate(
Expand Down
20 changes: 0 additions & 20 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2788,23 +2788,3 @@ def test_parquet_writer_schema_nullability(data, force_nullable_schema):
assert pa.parquet.read_schema(file_obj).field(0).nullable == (
force_nullable_schema or df.isnull().any().any()
)


@pytest.mark.parametrize("data", [{"a": [1, 2, 3, 4]}, {"b": [1, None, 2, 3]}])
@pytest.mark.parametrize("force_nullable_schema", [True, False])
def test_parquet_chunked_writer_schema_nullability(
data, force_nullable_schema
):
df = cudf.DataFrame(data)
file_obj = BytesIO()

writer = ParquetWriter(
file_obj, force_nullable_schema=force_nullable_schema
)

writer.write_table(df)

writer.close()
assert pa.parquet.read_schema(file_obj).field(0).nullable == (
force_nullable_schema or df.isnull().any().any()
)

0 comments on commit 4c4fdd2

Please sign in to comment.