Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new dask_cudf.read_parquet API #17250

Merged
merged 28 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3e853f8
add new read_parquet API to dask_cudf
rjzamora Nov 5, 2024
b30c529
fix non-expr deprecation
rjzamora Nov 5, 2024
e3c640a
fix CudfReadParquetFSSpec fusion
rjzamora Nov 6, 2024
e482026
correct for aggregate_files=False
rjzamora Nov 6, 2024
b9af7b7
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 6, 2024
2ad1867
update default blocksize, and add docstring
rjzamora Nov 6, 2024
6c37a9c
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 6, 2024
53dfbdf
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 7, 2024
900ebf6
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 12, 2024
3fb23fd
revise _normalize_blocksize
rjzamora Nov 12, 2024
7af233e
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 13, 2024
0552b33
fix test
rjzamora Nov 13, 2024
564c13c
proper test fix - and disable dataset processing unless necessary
rjzamora Nov 13, 2024
64dd105
preserve default hive handling in cudf
rjzamora Nov 13, 2024
68b8faf
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 13, 2024
e074004
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 14, 2024
a305398
Merge branch 'branch-24.12' into new-read-parquet-api
vyasr Nov 15, 2024
b539ed4
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
0291b95
address code review
rjzamora Nov 18, 2024
beafffb
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
0832e55
sample single worker for device size
rjzamora Nov 18, 2024
db82e0b
Merge remote-tracking branch 'upstream/branch-24.12' into new-read-pa…
rjzamora Nov 18, 2024
df1e283
Merge branch 'branch-24.12' into new-read-parquet-api
rjzamora Nov 19, 2024
2195947
use CUDA_VISIBLE_DEVICES
rjzamora Nov 19, 2024
1038170
Merge branch 'new-read-parquet-api' of github.com:rjzamora/cudf into …
rjzamora Nov 19, 2024
0ecf6dd
support mig
rjzamora Nov 19, 2024
fa03396
Update python/dask_cudf/dask_cudf/io/parquet.py
rjzamora Nov 19, 2024
1b79270
Merge branch 'branch-24.12' into new-read-parquet-api
madsbk Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,14 @@ def _process_dataset(
file_list = paths
if len(paths) == 1 and ioutils.is_directory(paths[0]):
paths = ioutils.stringify_pathlike(paths[0])
elif (
filters is None
and isinstance(dataset_kwargs, dict)
and dataset_kwargs.get("partitioning") is None
):
# Skip dataset processing if we have no filters
# or hive/directory partitioning to deal with.
return paths, row_groups, [], {}
Comment on lines +371 to +378
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pyarrow.dataset logic below has non-negligible overhead on remove storage. This code block allows us to pass in dataset_kwargs={"partitioning": None} to skip unnecessary PyArrow processing when we know we are not reading from hive-partitioned data (and we aren't applying filters).

By default (when we don't pass in dataset_kwargs={"partitioning": None}), cudf will still pre-process the dataset with PyArrow just in case it is hive partitioned.

Copy link
Contributor

@galipremsagar galipremsagar Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context @rjzamora ! Do you happen to have the rough overhead numbers/magnitude?

Copy link
Member Author

@rjzamora rjzamora Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

profile-with-dataset-overhead

The overhead is measurable but not huge (~4-5%) for the specific case I was benchmarking.


# Convert filters to ds.Expression
if filters is not None:
Expand Down
3 changes: 2 additions & 1 deletion python/dask_cudf/dask_cudf/_legacy/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def _read_paths(
)

dataset_kwargs = dataset_kwargs or {}
dataset_kwargs["partitioning"] = partitioning or "hive"
if partitions:
dataset_kwargs["partitioning"] = partitioning or "hive"

# Use cudf to read in data
try:
Expand Down
136 changes: 3 additions & 133 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,140 +700,10 @@ def from_dict(
)

@staticmethod
def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs):
import dask_expr as dx
import fsspec

if (
isinstance(filesystem, fsspec.AbstractFileSystem)
or isinstance(filesystem, str)
and filesystem.lower() == "fsspec"
):
# Default "fsspec" filesystem
from dask_cudf._legacy.io.parquet import CudfEngine
def read_parquet(*args, **kwargs):
from dask_cudf.io.parquet import read_parquet as read_parquet_expr

_raise_unsupported_parquet_kwargs(**kwargs)
return _default_backend(
dx.read_parquet,
path,
*args,
filesystem=filesystem,
engine=CudfEngine,
**kwargs,
)

else:
# EXPERIMENTAL filesystem="arrow" support.
# This code path uses PyArrow for IO, which is only
# beneficial for remote storage (e.g. S3)

from fsspec.utils import stringify_path
from pyarrow import fs as pa_fs

# CudfReadParquetPyarrowFS requires import of distributed beforehand
# (See: https://github.com/dask/dask/issues/11352)
import distributed # noqa: F401
from dask.core import flatten
from dask.dataframe.utils import pyarrow_strings_enabled

from dask_cudf.io.parquet import CudfReadParquetPyarrowFS

if args:
raise ValueError(f"Unexpected positional arguments: {args}")

if not (
isinstance(filesystem, pa_fs.FileSystem)
or isinstance(filesystem, str)
and filesystem.lower() in ("arrow", "pyarrow")
):
raise ValueError(f"Unexpected filesystem value: {filesystem}.")

if not PYARROW_GE_15:
raise NotImplementedError(
"Experimental Arrow filesystem support requires pyarrow>=15"
)

if not isinstance(path, str):
path = stringify_path(path)

# Extract kwargs
columns = kwargs.pop("columns", None)
filters = kwargs.pop("filters", None)
categories = kwargs.pop("categories", None)
index = kwargs.pop("index", None)
storage_options = kwargs.pop("storage_options", None)
dtype_backend = kwargs.pop("dtype_backend", None)
calculate_divisions = kwargs.pop("calculate_divisions", False)
ignore_metadata_file = kwargs.pop("ignore_metadata_file", False)
metadata_task_size = kwargs.pop("metadata_task_size", None)
split_row_groups = kwargs.pop("split_row_groups", "infer")
blocksize = kwargs.pop("blocksize", "default")
aggregate_files = kwargs.pop("aggregate_files", None)
parquet_file_extension = kwargs.pop(
"parquet_file_extension", (".parq", ".parquet", ".pq")
)
arrow_to_pandas = kwargs.pop("arrow_to_pandas", None)
open_file_options = kwargs.pop("open_file_options", None)

# Validate and normalize kwargs
kwargs["dtype_backend"] = dtype_backend
if arrow_to_pandas is not None:
raise ValueError(
"arrow_to_pandas not supported for the 'cudf' backend."
)
if open_file_options is not None:
raise ValueError(
"The open_file_options argument is no longer supported "
"by the 'cudf' backend."
)
if filters is not None:
for filter in flatten(filters, container=list):
_, op, val = filter
if op == "in" and not isinstance(val, (set, list, tuple)):
raise TypeError(
"Value of 'in' filter must be a list, set or tuple."
)
if metadata_task_size is not None:
raise NotImplementedError(
"metadata_task_size is not supported when using the pyarrow filesystem."
)
if split_row_groups != "infer":
raise NotImplementedError(
"split_row_groups is not supported when using the pyarrow filesystem."
)
if parquet_file_extension != (".parq", ".parquet", ".pq"):
raise NotImplementedError(
"parquet_file_extension is not supported when using the pyarrow filesystem."
)
if blocksize is not None and blocksize != "default":
warnings.warn(
"blocksize is not supported when using the pyarrow filesystem."
"blocksize argument will be ignored."
)
if aggregate_files is not None:
warnings.warn(
"aggregate_files is not supported when using the pyarrow filesystem. "
"Please use the 'dataframe.parquet.minimum-partition-size' config."
"aggregate_files argument will be ignored."
)

return dx.new_collection(
CudfReadParquetPyarrowFS(
path,
columns=dx._util._convert_to_list(columns),
filters=filters,
categories=categories,
index=index,
calculate_divisions=calculate_divisions,
storage_options=storage_options,
filesystem=filesystem,
ignore_metadata_file=ignore_metadata_file,
arrow_to_pandas=arrow_to_pandas,
pyarrow_strings_enabled=pyarrow_strings_enabled(),
kwargs=kwargs,
_series=isinstance(columns, str),
)
)
return read_parquet_expr(*args, **kwargs)

@staticmethod
def read_csv(
Expand Down
13 changes: 9 additions & 4 deletions python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from dask_cudf import _deprecated_api
from dask_cudf import _deprecated_api, QUERY_PLANNING_ON

from . import csv, orc, json, parquet, text # noqa: F401

Expand All @@ -22,9 +22,14 @@
read_text = _deprecated_api(
"dask_cudf.io.read_text", new_api="dask_cudf.read_text"
)
read_parquet = _deprecated_api(
"dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet"
)
if QUERY_PLANNING_ON:
read_parquet = parquet.read_parquet
else:
read_parquet = _deprecated_api(
"The legacy dask_cudf.io.read_parquet API",
new_api="dask_cudf.read_parquet",
rec="",
)
to_parquet = _deprecated_api(
"dask_cudf.io.to_parquet",
new_api="dask_cudf._legacy.io.parquet.to_parquet",
Expand Down
Loading
Loading