-
Notifications
You must be signed in to change notification settings - Fork 913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add new dask_cudf.read_parquet
API
#17250
Add new dask_cudf.read_parquet
API
#17250
Conversation
dask_cudf.read_parquet
APIdask_cudf.read_parquet
API
elif ( | ||
filters is None | ||
and isinstance(dataset_kwargs, dict) | ||
and dataset_kwargs.get("partitioning") is None | ||
): | ||
# Skip dataset processing if we have no filters | ||
# or hive/directory partitioning to deal with. | ||
return paths, row_groups, [], {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pyarrow.dataset
logic below has non-negligible overhead on remove storage. This code block allows us to pass in dataset_kwargs={"partitioning": None}
to skip unnecessary PyArrow processing when we know we are not reading from hive-partitioned data (and we aren't applying filters).
By default (when we don't pass in dataset_kwargs={"partitioning": None}
), cudf will still pre-process the dataset with PyArrow just in case it is hive partitioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the context @rjzamora ! Do you happen to have the rough overhead numbers/magnitude?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work @rjzamora, looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small suggestions
**to_pandas_kwargs, | ||
) | ||
class CudfReadParquetFSSpec(ReadParquetFSSpec): | ||
_STATS_CACHE: MutableMapping[str, Any] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should this be a cache of bounded size (via lru_cache
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I'm very worried about this cache getting too large. We're just storing a dict with two elements (schema name, and storage size) for each column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cudf python approval.
/merge |
@GregoryKimball @vyasr - FYI: I'd really like this to be included in 24.12 (it makes it much easier to use/demonstrate the recent KvikIO + S3 improvements). |
All good! I had this on the board slated for this release. |
#17250 started using `pynvml` but did not add the proper dependency, this change fixes the missing dependency. Authors: - Peter Andreas Entschev (https://github.com/pentschev) - https://github.com/jakirkham Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) - https://github.com/jakirkham URL: #17386
Description
It's time to clean up the
dask_cudf.read_parquet
API and prioritize GPU-specific optimizations. To this end, it makes sense to expose our ownread_parquet
API within Dask cuDF.Notes:
dask_cudf.read_parquet
API is only relevant when query-planning is enabled (the default).filesystem="arrow"
now usescudf.read_parquet
when reading from local storage (rather than PyArrow).blocksize
argument is now specific to the "smallest" NVIDIA device detected within the active dask cluster (or the first device visible to the the client). More specifically, we usepynvml
to find this representative device size, and we setblocksize
to be 1/32 this size.blocksize=0.125
to use1/8
the minimum device size (orblocksize='1GiB'
to bypass the default logic altogether).blocksize
isNone
, we disable partition fusion at optimization time.blocksize
is notNone
, we use the parquet metadata from the first few files to inform partition fusion at optimization time (instead of a rough column-count ratio).Checklist