Skip to content

Commit

Permalink
Prepare dask_cudf test_parquet.py for upcoming API changes (#10709)
Browse files Browse the repository at this point in the history
This is a relatively-simple PR to clean up `dask_cudf`'s `to/read_parquet` tests. These changes are mostly meant to avoid **future** test failures that will arise after impending changes are implemented in up-stream Dask. These changes include:

- The default value for `write_metadata_file` will become `False` for `to_parquet` (because writing the _metadata file scales very poorly)
- The default value for `split_row_groups` will become `False` (because this setting is typically optimal when the file are not too large). Users with larger-than-memory files will need to specify `split_row_groups=True/int` explicitly.
- The `gather_statistics` argument will be removed in favor of a more descriptive `calculate_divisions` argument.

This PR also removes the long-deprecated `row_groups_per_part` argument from `dask_cudf.read_parquet` (established replacement is `split_row_groups`).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Ray Douglass (https://github.com/raydouglass)
  - gpuCI (https://github.com/GPUtester)
  - Mike Wendt (https://github.com/mike-wendt)
  - AJ Schmidt (https://github.com/ajschmidt8)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Randy Gelhausen (https://github.com/randerzander)

URL: #10709
  • Loading branch information
rjzamora authored Apr 28, 2022
1 parent f0b9117 commit 03d419d
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 121 deletions.
212 changes: 128 additions & 84 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,65 +177,98 @@ def read_partition(
strings_to_cats = kwargs.get("strings_to_categorical", False)
read_kwargs = kwargs.get("read", {})
read_kwargs.update(open_file_options or {})

# Assume multi-piece read
paths = []
rgs = []
last_partition_keys = None
dfs = []

for i, piece in enumerate(pieces):

(path, row_group, partition_keys) = piece
row_group = None if row_group == [None] else row_group

if i > 0 and partition_keys != last_partition_keys:
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
check_file_size = read_kwargs.pop("check_file_size", None)

# Wrap reading logic in a `try` block so that we can
# inform the user that the `read_parquet` partition
# size is too large for the available memory
try:

# Assume multi-piece read
paths = []
rgs = []
last_partition_keys = None
dfs = []

for i, piece in enumerate(pieces):

(path, row_group, partition_keys) = piece
row_group = None if row_group == [None] else row_group

# File-size check to help "protect" users from change
# to up-stream `split_row_groups` default. We only
# check the file size if this partition corresponds
# to a full file, and `check_file_size` is defined
if check_file_size and len(pieces) == 1 and row_group is None:
file_size = fs.size(path)
if file_size > check_file_size:
warnings.warn(
f"A large parquet file ({file_size}B) is being "
f"used to create a DataFrame partition in "
f"read_parquet. This may cause out of memory "
f"exceptions in operations downstream. See the "
f"notes on split_row_groups in the read_parquet "
f"documentation. Setting split_row_groups "
f"explicitly will silence this warning."
)

if i > 0 and partition_keys != last_partition_keys:
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
)
)
paths = rgs = []
last_partition_keys = None
paths.append(path)
rgs.append(
[row_group]
if not isinstance(row_group, list)
and row_group is not None
else row_group
)
paths = rgs = []
last_partition_keys = None
paths.append(path)
rgs.append(
[row_group]
if not isinstance(row_group, list) and row_group is not None
else row_group
)
last_partition_keys = partition_keys
last_partition_keys = partition_keys

dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
)
)
)
df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]

# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, schema)
df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and df.index.names != (None,):
# If index=False, we shouldn't have a named index
df.reset_index(inplace=True)
# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, schema)

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and df.index.names != (None,):
# If index=False, we shouldn't have a named index
df.reset_index(inplace=True)

except MemoryError as err:
raise MemoryError(
"Parquet data was larger than the available GPU memory!\n\n"
"See the notes on split_row_groups in the read_parquet "
"documentation.\n\n"
"Original Error: " + str(err)
)
raise err

return df

Expand Down Expand Up @@ -349,25 +382,34 @@ def set_object_dtypes_from_pa_schema(df, schema):
df._data[col_name] = col.astype(typ)


def read_parquet(
path,
columns=None,
split_row_groups=None,
row_groups_per_part=None,
**kwargs,
):
def read_parquet(path, columns=None, **kwargs):
"""Read parquet files into a Dask DataFrame
Calls ``dask.dataframe.read_parquet`` to cordinate the execution of
``cudf.read_parquet``, and ultimately read multiple partitions into
a single Dask dataframe. The Dask version must supply an
``ArrowDatasetEngine`` class to support full functionality.
See ``cudf.read_parquet`` and Dask documentation for further details.
Calls ``dask.dataframe.read_parquet`` with ``engine=CudfEngine``
to cordinate the execution of ``cudf.read_parquet``, and to
ultimately create a ``dask_cudf.DataFrame`` collection.
See the ``dask.dataframe.read_parquet`` documentation for
all available options.
Examples
--------
>>> import dask_cudf
>>> df = dask_cudf.read_parquet("/path/to/dataset/") # doctest: +SKIP
>>> from dask_cudf import read_parquet
>>> df = read_parquet("/path/to/dataset/") # doctest: +SKIP
When dealing with one or more large parquet files having an
in-memory footprint >15% device memory, the ``split_row_groups``
argument should be used to map Parquet **row-groups** to DataFrame
partitions (instead of **files** to partitions). For example, the
following code will map each row-group to a distinct partition:
>>> df = read_parquet(..., split_row_groups=True) # doctest: +SKIP
To map **multiple** row-groups to each partition, an integer can be
passed to ``split_row_groups`` to specify the **maximum** number of
row-groups allowed in each output partition:
>>> df = read_parquet(..., split_row_groups=10) # doctest: +SKIP
See Also
--------
Expand All @@ -376,22 +418,24 @@ def read_parquet(
if isinstance(columns, str):
columns = [columns]

if row_groups_per_part:
warnings.warn(
"row_groups_per_part is deprecated. "
"Pass an integer value to split_row_groups instead.",
FutureWarning,
)
if split_row_groups is None:
split_row_groups = row_groups_per_part

return dd.read_parquet(
path,
columns=columns,
split_row_groups=split_row_groups,
engine=CudfEngine,
**kwargs,
)
# Set "check_file_size" option to determine whether we
# should check the parquet-file size. This check is meant
# to "protect" users from `split_row_groups` default changes
check_file_size = kwargs.pop("check_file_size", 500_000_000)
if (
check_file_size
and ("split_row_groups" not in kwargs)
and ("chunksize" not in kwargs)
):
# User is not specifying `split_row_groups` or `chunksize`,
# so we should warn them if/when a file is ~>0.5GB on disk.
# They can set `split_row_groups` explicitly to silence/skip
# this check
if "read" not in kwargs:
kwargs["read"] = {}
kwargs["read"]["check_file_size"] = check_file_size

return dd.read_parquet(path, columns=columns, engine=CudfEngine, **kwargs)


to_parquet = partial(dd.to_parquet, engine=CudfEngine)
Expand Down
Loading

0 comments on commit 03d419d

Please sign in to comment.