Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare dask_cudf test_parquet.py for upcoming API changes #10709

Merged
merged 67 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
19f5174
Merge pull request #4714 from rapidsai/branch-0.13
raydouglass Mar 30, 2020
a2804c3
REL v0.13.0 release
GPUtester Mar 31, 2020
fef2a2b
REL v0.13.0 CHANGELOG Updates
mike-wendt Apr 1, 2020
ab00eb0
Merge pull request #5310 from rapidsai/branch-0.14
raydouglass Jun 3, 2020
b34b838
REL v0.14.0 release
GPUtester Jun 3, 2020
9ff9cdb
update master references
ajschmidt8 Jul 14, 2020
789d19b
REL DOC Updates for main branch switch
mike-wendt Jul 16, 2020
819f514
Merge pull request #6079 from rapidsai/branch-0.15
raydouglass Aug 26, 2020
3a0f214
REL v0.15.0 release
GPUtester Aug 26, 2020
f947393
Merge pull request #6101 from rapidsai/branch-0.15
raydouglass Aug 27, 2020
71cb8c0
REL v0.15.0 release
GPUtester Aug 27, 2020
7ef8174
Merge pull request #6547 from rapidsai/branch-0.16
raydouglass Oct 21, 2020
2b8298f
REL v0.16.0 release
GPUtester Oct 21, 2020
d72b1eb
Merge pull request #6935 from rapidsai/branch-0.17
ajschmidt8 Dec 10, 2020
f56ef85
REL v0.17.0 release
GPUtester Dec 10, 2020
b7e1a85
Merge pull request #7405 from rapidsai/branch-0.18
raydouglass Feb 24, 2021
20778e5
REL v0.18.0 release
GPUtester Feb 24, 2021
042c20f
Merge pull request #7585 from rapidsai/branch-0.18
raydouglass Mar 15, 2021
999be56
REL v0.18.1 release
raydouglass Mar 15, 2021
2391864
Merge pull request #7969 from rapidsai/branch-0.18
raydouglass Apr 15, 2021
3341561
REL v0.18.2 release
raydouglass Apr 15, 2021
6573759
Merge pull request #7626 from rapidsai/branch-0.19
raydouglass Apr 21, 2021
f07b251
REL v0.19.0 release
GPUtester Apr 21, 2021
61e5a20
REL Changelog update
ajschmidt8 Apr 21, 2021
a13e8dc
Merge pull request #8037 from rapidsai/branch-0.19
raydouglass Apr 22, 2021
a9f3453
REL v0.19.1 release
GPUtester Apr 22, 2021
2089fc9
Merge pull request #8100 from rapidsai/branch-0.19
raydouglass Apr 28, 2021
ab3b3f6
REL v0.19.2 release
GPUtester Apr 28, 2021
f9d5e2e
Merge pull request #8418 from rapidsai/branch-21.06
raydouglass Jun 9, 2021
ae44046
REL v21.06.00 release
GPUtester Jun 9, 2021
3b831c3
Merge pull request #8488 from rapidsai/branch-21.06
ajschmidt8 Jun 10, 2021
d56ac1d
Merge pull request #8542 from rapidsai/branch-21.06
raydouglass Jun 17, 2021
cddc64f
REL v21.06.01 release
GPUtester Jun 17, 2021
101fc0f
REL Merge pull request #8544 from rapidsai/branch-21.06
raydouglass Jun 17, 2021
e9dabf8
Merge pull request #8840 from rapidsai/branch-21.08
raydouglass Aug 4, 2021
106039c
REL v21.08.00 release
GPUtester Aug 4, 2021
8055721
Merge pull request #8986 from rapidsai/branch-21.08
raydouglass Aug 6, 2021
e0a8114
REL v21.08.01 release
GPUtester Aug 6, 2021
a7391e6
Merge pull request #8990 from rapidsai/branch-21.08
raydouglass Aug 6, 2021
f6d31fa
REL v21.08.02 release
GPUtester Aug 6, 2021
dff45e5
Merge pull request #9116 from rapidsai/branch-21.08
ajschmidt8 Sep 16, 2021
e4313b6
REL v21.08.03 release
GPUtester Sep 16, 2021
5638329
Merge pull request #9301 from rapidsai/branch-21.10
ajschmidt8 Oct 6, 2021
072fd86
REL v21.10.00 release
GPUtester Oct 6, 2021
8cfb8e5
Merge pull request #9420 from rapidsai/branch-21.10
raydouglass Oct 12, 2021
a1d2d13
REL v21.10.01 release
GPUtester Oct 12, 2021
3ceb0c0
Merge pull request #9689 from rapidsai/branch-21.12
raydouglass Dec 3, 2021
f1ef2d2
REL v21.12.00 release
GPUtester Dec 3, 2021
fd04831
Merge pull request #9880 from rapidsai/branch-21.12
raydouglass Dec 9, 2021
a0a0a3a
REL v21.12.01 release
GPUtester Dec 9, 2021
c74e24f
Merge pull request #9924 from rapidsai/branch-21.12
raydouglass Dec 16, 2021
06540b9
REL v21.12.02 release
GPUtester Dec 16, 2021
f39f559
Merge pull request #10101 from rapidsai/branch-22.02
raydouglass Feb 2, 2022
774d859
REL v22.02.00 release
GPUtester Feb 2, 2022
803c42a
Merge pull request #10512 from rapidsai/branch-22.04
raydouglass Apr 6, 2022
8bf0520
REL v22.04.00 release
GPUtester Apr 6, 2022
0363197
REL Merge pull request #10633 from rapidsai/branch-22.04
raydouglass Apr 11, 2022
f92b0bb
remove row_groups_per_part and clean up divisions and split_row_group…
rjzamora Apr 21, 2022
8b4e11c
clarify comment
rjzamora Apr 21, 2022
bd5b692
add file-size and memory checks to better-inform the user of split_ro…
rjzamora Apr 22, 2022
0b6c7f8
Merge remote-tracking branch 'upstream/main' into remove-row_groups_p…
rjzamora Apr 25, 2022
d0423aa
simplify error and warning messages
rjzamora Apr 26, 2022
3798cf3
improve docstring for read_parquet to discuss split_row_groups argument
rjzamora Apr 26, 2022
d6eb7a6
another tweak
rjzamora Apr 26, 2022
3791b19
inform the user that setting split_row_groups will silence the file-s…
rjzamora Apr 26, 2022
17b0867
Merge remote-tracking branch 'upstream/branch-22.06' into remove-row_…
rjzamora Apr 27, 2022
0c775dc
Merge remote-tracking branch 'upstream/branch-22.06' into remove-row_…
rjzamora Apr 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 128 additions & 84 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,65 +177,98 @@ def read_partition(
strings_to_cats = kwargs.get("strings_to_categorical", False)
read_kwargs = kwargs.get("read", {})
read_kwargs.update(open_file_options or {})

# Assume multi-piece read
paths = []
rgs = []
last_partition_keys = None
dfs = []

for i, piece in enumerate(pieces):

(path, row_group, partition_keys) = piece
row_group = None if row_group == [None] else row_group

if i > 0 and partition_keys != last_partition_keys:
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
check_file_size = read_kwargs.pop("check_file_size", None)

# Wrap reading logic in a `try` block so that we can
# inform the user that the `read_parquet` partition
# size is too large for the available memory
try:

# Assume multi-piece read
paths = []
rgs = []
last_partition_keys = None
dfs = []

for i, piece in enumerate(pieces):

(path, row_group, partition_keys) = piece
row_group = None if row_group == [None] else row_group

# File-size check to help "protect" users from change
# to up-stream `split_row_groups` default. We only
# check the file size if this partition corresponds
# to a full file, and `check_file_size` is defined
if check_file_size and len(pieces) == 1 and row_group is None:
file_size = fs.size(path)
if file_size > check_file_size:
warnings.warn(
f"A large parquet file ({file_size}B) is being "
f"used to create a DataFrame partition in "
f"read_parquet. This may cause out of memory "
f"exceptions in operations downstream. See the "
f"notes on split_row_groups in the read_parquet "
f"documentation. Setting split_row_groups "
f"explicitly will silence this warning."
)

if i > 0 and partition_keys != last_partition_keys:
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
)
)
paths = rgs = []
last_partition_keys = None
paths.append(path)
rgs.append(
[row_group]
if not isinstance(row_group, list)
and row_group is not None
else row_group
)
paths = rgs = []
last_partition_keys = None
paths.append(path)
rgs.append(
[row_group]
if not isinstance(row_group, list) and row_group is not None
else row_group
)
last_partition_keys = partition_keys
last_partition_keys = partition_keys

dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
dfs.append(
cls._read_paths(
paths,
fs,
columns=read_columns,
row_groups=rgs if rgs else None,
strings_to_categorical=strings_to_cats,
partitions=partitions,
partitioning=partitioning,
partition_keys=last_partition_keys,
**read_kwargs,
)
)
)
df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]

# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, schema)
df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0]

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and df.index.names != (None,):
# If index=False, we shouldn't have a named index
df.reset_index(inplace=True)
# Re-set "object" dtypes align with pa schema
set_object_dtypes_from_pa_schema(df, schema)

if index and (index[0] in df.columns):
df = df.set_index(index[0])
elif index is False and df.index.names != (None,):
# If index=False, we shouldn't have a named index
df.reset_index(inplace=True)

except MemoryError as err:
raise MemoryError(
"Parquet data was larger than the available GPU memory!\n\n"
"See the notes on split_row_groups in the read_parquet "
"documentation.\n\n"
"Original Error: " + str(err)
)
raise err

return df

Expand Down Expand Up @@ -349,25 +382,34 @@ def set_object_dtypes_from_pa_schema(df, schema):
df._data[col_name] = col.astype(typ)


def read_parquet(
path,
columns=None,
split_row_groups=None,
row_groups_per_part=None,
**kwargs,
):
def read_parquet(path, columns=None, **kwargs):
"""Read parquet files into a Dask DataFrame

Calls ``dask.dataframe.read_parquet`` to cordinate the execution of
``cudf.read_parquet``, and ultimately read multiple partitions into
a single Dask dataframe. The Dask version must supply an
``ArrowDatasetEngine`` class to support full functionality.
See ``cudf.read_parquet`` and Dask documentation for further details.
Calls ``dask.dataframe.read_parquet`` with ``engine=CudfEngine``
to cordinate the execution of ``cudf.read_parquet``, and to
ultimately create a ``dask_cudf.DataFrame`` collection.

See the ``dask.dataframe.read_parquet`` documentation for
all available options.

Examples
--------
>>> import dask_cudf
>>> df = dask_cudf.read_parquet("/path/to/dataset/") # doctest: +SKIP
>>> from dask_cudf import read_parquet
>>> df = read_parquet("/path/to/dataset/") # doctest: +SKIP

When dealing with one or more large parquet files having an
in-memory footprint >15% device memory, the ``split_row_groups``
argument should be used to map Parquet **row-groups** to DataFrame
partitions (instead of **files** to partitions). For example, the
following code will map each row-group to a distinct partition:

>>> df = read_parquet(..., split_row_groups=True) # doctest: +SKIP

To map **multiple** row-groups to each partition, an integer can be
passed to ``split_row_groups`` to specify the **maximum** number of
row-groups allowed in each output partition:

>>> df = read_parquet(..., split_row_groups=10) # doctest: +SKIP

See Also
--------
Expand All @@ -376,22 +418,24 @@ def read_parquet(
if isinstance(columns, str):
columns = [columns]

if row_groups_per_part:
warnings.warn(
"row_groups_per_part is deprecated. "
"Pass an integer value to split_row_groups instead.",
FutureWarning,
)
if split_row_groups is None:
split_row_groups = row_groups_per_part

return dd.read_parquet(
path,
columns=columns,
split_row_groups=split_row_groups,
engine=CudfEngine,
**kwargs,
)
# Set "check_file_size" option to determine whether we
# should check the parquet-file size. This check is meant
# to "protect" users from `split_row_groups` default changes
check_file_size = kwargs.pop("check_file_size", 500_000_000)
if (
check_file_size
and ("split_row_groups" not in kwargs)
and ("chunksize" not in kwargs)
):
# User is not specifying `split_row_groups` or `chunksize`,
# so we should warn them if/when a file is ~>0.5GB on disk.
# They can set `split_row_groups` explicitly to silence/skip
# this check
if "read" not in kwargs:
kwargs["read"] = {}
kwargs["read"]["check_file_size"] = check_file_size

return dd.read_parquet(path, columns=columns, engine=CudfEngine, **kwargs)


to_parquet = partial(dd.to_parquet, engine=CudfEngine)
Expand Down
Loading