Skip to content

Commit

Permalink
Expose some Parquet per-column configuration options via the python A…
Browse files Browse the repository at this point in the history
…PI (#15613)

Several recent PRs (#15081, #15411, #15600) added the ability to control some aspects of Parquet file writing on a per-column basis. During discussion of #15081 it was [suggested](#15081 (comment)) that these options be exposed by cuDF-python in a manner similar to pyarrow. This PR adds the ability to control per-column encoding, compression, binary output, and fixed-length data width, using fully qualified Parquet column names. For example, given a cuDF table with an integer column 'a', and a `list<int32>` column 'b', the fully qualified column names would be 'a' and 'b.list.element'.

Addresses "Add cuDF-python API support for specifying encodings" task in #13501.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #15613
  • Loading branch information
etseidl authored May 22, 2024
1 parent 57444ed commit b4ce6e4
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 4 deletions.
74 changes: 71 additions & 3 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ def write_parquet(
object force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
object skip_compression=None,
object column_encoding=None,
object column_type_length=None,
object output_as_binary=None,
):
"""
Cython function to call into libcudf API, see `write_parquet`.
Expand Down Expand Up @@ -458,7 +462,12 @@ def write_parquet(
_set_col_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
force_nullable_schema
force_nullable_schema,
None,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)

cdef map[string, string] tmp_user_data
Expand Down Expand Up @@ -810,16 +819,62 @@ cdef cudf_io_types.compression_type _get_comp_type(object compression):
raise ValueError("Unsupported `compression` type")


cdef cudf_io_types.column_encoding _get_encoding_type(object encoding):
if encoding is None:
return cudf_io_types.column_encoding.USE_DEFAULT

enc = str(encoding).upper()
if enc == "PLAIN":
return cudf_io_types.column_encoding.PLAIN
elif enc == "DICTIONARY":
return cudf_io_types.column_encoding.DICTIONARY
elif enc == "DELTA_BINARY_PACKED":
return cudf_io_types.column_encoding.DELTA_BINARY_PACKED
elif enc == "DELTA_LENGTH_BYTE_ARRAY":
return cudf_io_types.column_encoding.DELTA_LENGTH_BYTE_ARRAY
elif enc == "DELTA_BYTE_ARRAY":
return cudf_io_types.column_encoding.DELTA_BYTE_ARRAY
elif enc == "BYTE_STREAM_SPLIT":
return cudf_io_types.column_encoding.BYTE_STREAM_SPLIT
elif enc == "USE_DEFAULT":
return cudf_io_types.column_encoding.USE_DEFAULT
else:
raise ValueError("Unsupported `column_encoding` type")


cdef _set_col_metadata(
Column col,
column_in_metadata& col_meta,
bool force_nullable_schema=False,
str path=None,
object skip_compression=None,
object column_encoding=None,
object column_type_length=None,
object output_as_binary=None,
):
need_path = (skip_compression is not None or column_encoding is not None or
column_type_length is not None or output_as_binary is not None)
name = col_meta.get_name().decode('UTF-8') if need_path else None
full_path = path + "." + name if path is not None else name

if force_nullable_schema:
# Only set nullability if `force_nullable_schema`
# is true.
col_meta.set_nullability(True)

if skip_compression is not None and full_path in skip_compression:
col_meta.set_skip_compression(True)

if column_encoding is not None and full_path in column_encoding:
col_meta.set_encoding(_get_encoding_type(column_encoding[full_path]))

if column_type_length is not None and full_path in column_type_length:
col_meta.set_output_as_binary(True)
col_meta.set_type_length(column_type_length[full_path])

if output_as_binary is not None and full_path in output_as_binary:
col_meta.set_output_as_binary(True)

if isinstance(col.dtype, cudf.StructDtype):
for i, (child_col, name) in enumerate(
zip(col.children, list(col.dtype.fields))
Expand All @@ -828,13 +883,26 @@ cdef _set_col_metadata(
_set_col_metadata(
child_col,
col_meta.child(i),
force_nullable_schema
force_nullable_schema,
full_path,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)
elif isinstance(col.dtype, cudf.ListDtype):
if full_path is not None:
full_path = full_path + ".list"
col_meta.child(1).set_name("element".encode())
_set_col_metadata(
col.children[1],
col_meta.child(1),
force_nullable_schema
force_nullable_schema,
full_path,
skip_compression,
column_encoding,
column_type_length,
output_as_binary
)
elif isinstance(col.dtype, cudf.core.dtypes.DecimalDtype):
col_meta.set_decimal_precision(col.dtype.precision)
18 changes: 17 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/libcudf/io/types.pxd
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2020-2024, NVIDIA CORPORATION.

from libc.stdint cimport uint8_t
from libc.stdint cimport int32_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport shared_ptr, unique_ptr
Expand Down Expand Up @@ -57,6 +57,19 @@ cdef extern from "cudf/io/types.hpp" \
ADAPTIVE = 1,
ALWAYS = 2,

cdef extern from "cudf/io/types.hpp" namespace "cudf::io" nogil:
cpdef enum class column_encoding:
USE_DEFAULT = -1
DICTIONARY = 0
PLAIN = 1
DELTA_BINARY_PACKED = 2
DELTA_LENGTH_BYTE_ARRAY =3
DELTA_BYTE_ARRAY = 4
BYTE_STREAM_SPLIT = 5
DIRECT = 6
DIRECT_V2 = 7
DICTIONARY_V2 = 8

cdef cppclass column_name_info:
string name
vector[column_name_info] children
Expand All @@ -81,6 +94,9 @@ cdef extern from "cudf/io/types.hpp" \
column_in_metadata& set_decimal_precision(uint8_t precision)
column_in_metadata& child(size_type i)
column_in_metadata& set_output_as_binary(bool binary)
column_in_metadata& set_type_length(int32_t type_length)
column_in_metadata& set_skip_compression(bool skip)
column_in_metadata& set_encoding(column_encoding enc)
string get_name()

cdef cppclass table_input_metadata:
Expand Down
8 changes: 8 additions & 0 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6707,6 +6707,10 @@ def to_parquet(
return_metadata=False,
use_dictionary=True,
header_version="1.0",
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
*args,
**kwargs,
):
Expand All @@ -6733,6 +6737,10 @@ def to_parquet(
return_metadata=return_metadata,
use_dictionary=use_dictionary,
header_version=header_version,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
*args,
**kwargs,
)
Expand Down
64 changes: 64 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ def _write_parquet(
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
):
if is_list_like(paths) and len(paths) > 1:
if partitions_info is None:
Expand Down Expand Up @@ -102,6 +106,10 @@ def _write_parquet(
"force_nullable_schema": force_nullable_schema,
"header_version": header_version,
"use_dictionary": use_dictionary,
"skip_compression": skip_compression,
"column_encoding": column_encoding,
"column_type_length": column_type_length,
"output_as_binary": output_as_binary,
}
if all(ioutils.is_fsspec_open_file(buf) for buf in paths_or_bufs):
with ExitStack() as stack:
Expand Down Expand Up @@ -140,6 +148,12 @@ def write_to_dataset(
max_page_size_rows=None,
storage_options=None,
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
):
"""Wraps `to_parquet` to write partitioned Parquet datasets.
For each combination of partition group and value,
Expand Down Expand Up @@ -204,6 +218,30 @@ def write_to_dataset(
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
header_version : {{'1.0', '2.0'}}, default "1.0"
Controls whether to use version 1.0 or version 2.0 page headers when
encoding. Version 1.0 is more portable, but version 2.0 enables the
use of newer encoding schemes.
force_nullable_schema : bool, default False.
If True, writes all columns as `null` in schema.
If False, columns are written as `null` if they contain null values,
otherwise as `not null`.
skip_compression : set, optional, default None
If a column name is present in the set, that column will not be compressed,
regardless of the ``compression`` setting.
column_encoding : dict, optional, default None
Sets the page encoding to use on a per-column basis. The key is a column
name, and the value is one of: 'PLAIN', 'DICTIONARY', 'DELTA_BINARY_PACKED',
'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY', 'BYTE_STREAM_SPLIT', or
'USE_DEFAULT'.
column_type_length : dict, optional, default None
Specifies the width in bytes of ``FIXED_LEN_BYTE_ARRAY`` column elements.
The key is a column name and the value is an integer. The named column
will be output as unannotated binary (i.e. the column will behave as if
``output_as_binary`` was set).
output_as_binary : set, optional, default None
If a column name is present in the set, that column will be output as
unannotated binary, rather than the default 'UTF-8'.
"""

fs = ioutils._ensure_filesystem(fs, root_path, storage_options)
Expand Down Expand Up @@ -241,6 +279,12 @@ def write_to_dataset(
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

else:
Expand All @@ -262,6 +306,12 @@ def write_to_dataset(
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

return metadata
Expand Down Expand Up @@ -906,6 +956,10 @@ def to_parquet(
force_nullable_schema=False,
header_version="1.0",
use_dictionary=True,
skip_compression=None,
column_encoding=None,
column_type_length=None,
output_as_binary=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -955,6 +1009,12 @@ def to_parquet(
return_metadata=return_metadata,
storage_options=storage_options,
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

partition_info = (
Expand Down Expand Up @@ -983,6 +1043,10 @@ def to_parquet(
force_nullable_schema=force_nullable_schema,
header_version=header_version,
use_dictionary=use_dictionary,
skip_compression=skip_compression,
column_encoding=column_encoding,
column_type_length=column_type_length,
output_as_binary=output_as_binary,
)

else:
Expand Down
76 changes: 76 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,82 @@ def flba(i):
assert_eq(expect, got)


def test_parquet_flba_round_trip(tmpdir):
def flba(i):
hasher = hashlib.sha256()
hasher.update(i.to_bytes(4, "little"))
return hasher.digest()

# use pyarrow to write table of fixed_len_byte_array
num_rows = 200
data = pa.array([flba(i) for i in range(num_rows)], type=pa.binary(32))
padf = pa.Table.from_arrays([data], names=["flba"])
padf_fname = tmpdir.join("padf.parquet")
pq.write_table(padf, padf_fname)

# round trip data with cudf
cdf = cudf.read_parquet(padf_fname)
cdf_fname = tmpdir.join("cdf.parquet")
cdf.to_parquet(cdf_fname, column_type_length={"flba": 32})

# now read back in with pyarrow to test it was written properly by cudf
padf2 = pq.read_table(padf_fname)
padf3 = pq.read_table(cdf_fname)
assert_eq(padf2, padf3)
assert_eq(padf2.schema[0].type, padf3.schema[0].type)


@pytest.mark.parametrize(
"encoding",
[
"PLAIN",
"DICTIONARY",
"DELTA_BINARY_PACKED",
"BYTE_STREAM_SPLIT",
"USE_DEFAULT",
],
)
def test_per_column_options(tmpdir, encoding):
pdf = pd.DataFrame({"ilist": [[1, 2, 3, 1, 2, 3]], "i1": [1]})
cdf = cudf.from_pandas(pdf)
fname = tmpdir.join("ilist.parquet")
cdf.to_parquet(
fname,
column_encoding={"ilist.list.element": encoding},
compression="SNAPPY",
skip_compression={"ilist.list.element"},
)
# DICTIONARY and USE_DEFAULT should both result in a PLAIN_DICTIONARY encoding in parquet
encoding_name = (
"PLAIN_DICTIONARY"
if encoding == "DICTIONARY" or encoding == "USE_DEFAULT"
else encoding
)
pf = pq.ParquetFile(fname)
fmd = pf.metadata
assert encoding_name in fmd.row_group(0).column(0).encodings
assert fmd.row_group(0).column(0).compression == "UNCOMPRESSED"
assert fmd.row_group(0).column(1).compression == "SNAPPY"


@pytest.mark.parametrize(
"encoding",
["DELTA_LENGTH_BYTE_ARRAY", "DELTA_BYTE_ARRAY"],
)
def test_per_column_options_string_col(tmpdir, encoding):
pdf = pd.DataFrame({"s": ["a string"], "i1": [1]})
cdf = cudf.from_pandas(pdf)
fname = tmpdir.join("strcol.parquet")
cdf.to_parquet(
fname,
column_encoding={"s": encoding},
compression="SNAPPY",
)
pf = pq.ParquetFile(fname)
fmd = pf.metadata
assert encoding in fmd.row_group(0).column(0).encodings


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

Expand Down
Loading

0 comments on commit b4ce6e4

Please sign in to comment.