Skip to content

Commit

Permalink
Add force_nullable_schema parameter to Parquet writer. (#12952)
Browse files Browse the repository at this point in the history
Requires: #12933

This PR adds `nullability` parameter to parquet writer. When it is `True`, all columns are written as `null` in the schema. When `False`, all columns are written as `not null` in the schema, however, if a column contains null values, this parameter is ignored.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #12952
  • Loading branch information
galipremsagar authored Mar 22, 2023
1 parent bf18cea commit 00c6000
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 9 deletions.
3 changes: 2 additions & 1 deletion python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

from libc.stdint cimport uint8_t
from libcpp cimport bool
Expand Down Expand Up @@ -74,6 +74,7 @@ cdef extern from "cudf/io/types.hpp" \
column_in_metadata& set_decimal_precision(uint8_t precision)
column_in_metadata& child(size_type i)
column_in_metadata& set_output_as_binary(bool binary)
string get_name()

cdef cppclass table_input_metadata:
table_input_metadata() except +
Expand Down
46 changes: 39 additions & 7 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ def write_parquet(
object row_group_size_rows=None,
object max_page_size_bytes=None,
object max_page_size_rows=None,
object partitions_info=None
object partitions_info=None,
object force_nullable_schema=False,
):
"""
Cython function to call into libcudf API, see `write_parquet`.
Expand Down Expand Up @@ -364,7 +365,9 @@ def write_parquet(

tbl_meta.get().column_metadata[i].set_name(name.encode())
_set_col_metadata(
table[name]._column, tbl_meta.get().column_metadata[i]
table[name]._column,
tbl_meta.get().column_metadata[i],
force_nullable_schema
)

cdef map[string, string] tmp_user_data
Expand Down Expand Up @@ -467,6 +470,16 @@ cdef class ParquetWriter:
max_page_size_rows: int, default 20000
Maximum number of rows of each page of the output.
By default, 20000 will be used.
force_nullable_schema : bool, default True.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
Notes
-----
`DataFrame.to_parquet` and `ParquetWriter` differ in the default
value for `force_nullable_schema` to enable all the chunks being
written by chunked parquet writer to be schema identical.
See Also
--------
Expand All @@ -484,13 +497,15 @@ cdef class ParquetWriter:
cdef size_type row_group_size_rows
cdef size_t max_page_size_bytes
cdef size_type max_page_size_rows
cdef bool force_nullable_schema

def __cinit__(self, object filepath_or_buffer, object index=None,
object compression="snappy", str statistics="ROWGROUP",
int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT,
int row_group_size_rows=1000000,
int max_page_size_bytes=524288,
int max_page_size_rows=20000):
int max_page_size_rows=20000,
bool force_nullable_schema=True):
filepaths_or_buffers = (
list(filepath_or_buffer)
if is_list_like(filepath_or_buffer)
Expand All @@ -505,6 +520,7 @@ cdef class ParquetWriter:
self.row_group_size_rows = row_group_size_rows
self.max_page_size_bytes = max_page_size_bytes
self.max_page_size_rows = max_page_size_rows
self.force_nullable_schema = force_nullable_schema

def write_table(self, table, object partitions_info=None):
""" Writes a single table to the file """
Expand Down Expand Up @@ -597,7 +613,9 @@ cdef class ParquetWriter:
for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.get().column_metadata[i].set_name(name.encode())
_set_col_metadata(
table[name]._column, self.tbl_meta.get().column_metadata[i]
table[name]._column,
self.tbl_meta.get().column_metadata[i],
self.force_nullable_schema
)

index = (
Expand Down Expand Up @@ -675,15 +693,29 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression):
raise ValueError("Unsupported `compression` type")


cdef _set_col_metadata(Column col, column_in_metadata& col_meta):
cdef _set_col_metadata(
Column col,
column_in_metadata& col_meta,
bool force_nullable_schema
):
col_meta.set_nullability(force_nullable_schema or col.nullable)

if is_struct_dtype(col):
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
):
col_meta.child(i).set_name(name.encode())
_set_col_metadata(child_col, col_meta.child(i))
_set_col_metadata(
child_col,
col_meta.child(i),
force_nullable_schema
)
elif is_list_dtype(col):
_set_col_metadata(col.children[1], col_meta.child(1))
_set_col_metadata(
col.children[1],
col_meta.child(1),
force_nullable_schema
)
else:
if is_decimal_dtype(col):
col_meta.set_decimal_precision(col.dtype.precision)
Expand Down
13 changes: 12 additions & 1 deletion python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _write_parquet(
max_page_size_rows=None,
partitions_info=None,
storage_options=None,
force_nullable_schema=False,
):
if is_list_like(paths) and len(paths) > 1:
if partitions_info is None:
Expand Down Expand Up @@ -89,6 +90,7 @@ def _write_parquet(
"max_page_size_bytes": max_page_size_bytes,
"max_page_size_rows": max_page_size_rows,
"partitions_info": partitions_info,
"force_nullable_schema": force_nullable_schema,
}
if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs):
with ExitStack() as stack:
Expand Down Expand Up @@ -126,6 +128,7 @@ def write_to_dataset(
max_page_size_bytes=None,
max_page_size_rows=None,
storage_options=None,
force_nullable_schema=False,
):
"""Wraps `to_parquet` to write partitioned Parquet datasets.
For each combination of partition group and value,
Expand Down Expand Up @@ -179,14 +182,17 @@ def write_to_dataset(
max_page_size_rows: integer or None, default None
Maximum number of rows of each page of the output.
If None, 20000 will be used.
storage_options : dict, optional, default None
Extra options that make sense for a particular storage connection,
e.g. host, port, username, password, etc. For HTTP(S) URLs the
key-value pairs are forwarded to ``urllib.request.Request`` as
header options. For other URLs (e.g. starting with "s3://", and
"gcs://") the key-value pairs are forwarded to ``fsspec.open``.
Please see ``fsspec`` and ``urllib`` for more details.
force_nullable_schema : bool, default False.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
"""

fs = ioutils._ensure_filesystem(fs, root_path, storage_options)
Expand Down Expand Up @@ -224,6 +230,7 @@ def write_to_dataset(
row_group_size_rows=row_group_size_rows,
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
)

else:
Expand All @@ -244,6 +251,7 @@ def write_to_dataset(
row_group_size_rows=row_group_size_rows,
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
)

return metadata
Expand Down Expand Up @@ -712,6 +720,7 @@ def to_parquet(
max_page_size_rows=None,
storage_options=None,
return_metadata=False,
force_nullable_schema=False,
*args,
**kwargs,
):
Expand Down Expand Up @@ -760,6 +769,7 @@ def to_parquet(
max_page_size_rows=max_page_size_rows,
return_metadata=return_metadata,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
)

partition_info = (
Expand All @@ -784,6 +794,7 @@ def to_parquet(
max_page_size_rows=max_page_size_rows,
partitions_info=partition_info,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
)

else:
Expand Down
33 changes: 33 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2775,3 +2775,36 @@ def test_parquet_reader_unsupported_page_encoding(datadir):
# expect a failure when reading the whole file
with pytest.raises(RuntimeError):
cudf.read_parquet(fname)


@pytest.mark.parametrize("data", [{"a": [1, 2, 3, 4]}, {"b": [1, None, 2, 3]}])
@pytest.mark.parametrize("force_nullable_schema", [True, False])
def test_parquet_writer_schema_nullability(data, force_nullable_schema):
df = cudf.DataFrame(data)
file_obj = BytesIO()

df.to_parquet(file_obj, force_nullable_schema=force_nullable_schema)

assert pa.parquet.read_schema(file_obj).field(0).nullable == (
force_nullable_schema or df.isnull().any().any()
)


@pytest.mark.parametrize("data", [{"a": [1, 2, 3, 4]}, {"b": [1, None, 2, 3]}])
@pytest.mark.parametrize("force_nullable_schema", [True, False])
def test_parquet_chunked_writer_schema_nullability(
data, force_nullable_schema
):
df = cudf.DataFrame(data)
file_obj = BytesIO()

writer = ParquetWriter(
file_obj, force_nullable_schema=force_nullable_schema
)

writer.write_table(df)

writer.close()
assert pa.parquet.read_schema(file_obj).field(0).nullable == (
force_nullable_schema or df.isnull().any().any()
)
4 changes: 4 additions & 0 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@
include the file path metadata (relative to `root_path`).
To request metadata binary blob when using with ``partition_cols``, Pass
``return_metadata=True`` instead of specifying ``metadata_file_path``
force_nullable_schema : bool, default False.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
**kwargs
Additional parameters will be passed to execution engines other
than ``cudf``.
Expand Down

0 comments on commit 00c6000

Please sign in to comment.