Skip to content

Commit

Permalink
Expose engine argument in dask_cudf.read_json (#12101)
Browse files Browse the repository at this point in the history
Exposes the `engine` argument in `dask_cudf.read_json`, enabling `dask_cudf.read_json(... engine="cudf_experimental")` for nested json data.

TODO (~maybe this PR?~):

- [ ] (**EDIT**: This should be done in a separate PR) Add simple/optimized code path to leverage the `byte_range` parameter for local storage (similar to what is done in [`dask_cudf.read_csv`](https://github.com/rapidsai/cudf/blob/7535f31cfaf7e01578c413bb3ba46b03d2014806/python/dask_cudf/dask_cudf/io/csv.py#L72)). This would depend on #12017 for nested json data.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #12101
  • Loading branch information
rjzamora authored Nov 9, 2022
1 parent fbac4b4 commit 6f78e74
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 8 deletions.
11 changes: 4 additions & 7 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,13 +493,10 @@ def read_parquet(*args, engine=None, **kwargs):
)

@staticmethod
def read_json(*args, engine=None, **kwargs):
return _default_backend(
dd.read_json,
*args,
engine=cudf.read_json,
**kwargs,
)
def read_json(*args, **kwargs):
from dask_cudf.io.json import read_json

return read_json(*args, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
Expand Down
64 changes: 63 additions & 1 deletion python/dask_cudf/dask_cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,66 @@

import cudf

read_json = partial(dask.dataframe.read_json, engine=cudf.read_json)
from dask_cudf.backends import _default_backend


def read_json(url_path, engine="auto", **kwargs):
"""Create a dask_cudf DataFrame collection from JSON data
This function wraps ``dask.dataframe.read_json``, and passes
``engine=partial(cudf.read_json, engine="auto")`` by default.
Parameters
----------
url_path: str, list of str
Location to read from. If a string, can include a glob character to
find a set of file names.
Supports protocol specifications such as ``"s3://"``.
engine : str or Callable, default "auto"
If str, this value will be used as the ``engine`` argument when
``cudf.read_json`` is used to create each partition. If Callable,
this value will be used as the underlying function used to create
each partition from JSON data. The default value is "auto", so
that ``engine=partial(cudf.read_json, engine="auto")`` will be
pased to ``dask.dataframe.read_json`` by default.
**kwargs :
Key-word arguments to pass through to ``dask.dataframe.read_json``.
Returns
-------
dask_cudf.DataFrame
Examples
--------
Load single file
>>> from dask_cudf import read_json
>>> read_json('myfile.json') # doctest: +SKIP
Load large line-delimited JSON files using partitions of approx
256MB size
>>> read_json('data/file*.csv', blocksize=2**28) # doctest: +SKIP
Load nested JSON data
>>> read_json('myfile.json', engine='cudf_experimental') # doctest: +SKIP
See Also
--------
dask.dataframe.io.json.read_json
"""

# TODO: Add optimized code path to leverage the
# `byte_range` argument in `cudf.read_json` for
# local storage (see `dask_cudf.read_csv`)
return _default_backend(
dask.dataframe.read_json,
url_path,
engine=(
partial(cudf.read_json, engine=engine)
if isinstance(engine, str)
else engine
),
**kwargs,
)
18 changes: 18 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,21 @@ def test_read_json_lines(lines):
actual = dask_cudf.read_json(f, orient="records", lines=lines)
actual_pd = pd.read_json(f, orient="records", lines=lines)
dd.assert_eq(actual, actual_pd)


def test_read_json_nested_experimental(tmp_path):
# Check that `engine="cudf_experimental"` can
# be used to support nested data
df = pd.DataFrame(
{
"a": [{"y": 2}, {"y": 4}, {"y": 6}, {"y": 8}],
"b": [[1, 2, 3], [4, 5], [6], [7]],
"c": [1, 3, 5, 7],
}
)
kwargs = dict(orient="records", lines=True)
with tmp_path / "data.json" as f:
df.to_json(f, **kwargs)
actual = dask_cudf.read_json(f, engine="cudf_experimental", **kwargs)
actual_pd = pd.read_json(f, **kwargs)
dd.assert_eq(actual, actual_pd)

0 comments on commit 6f78e74

Please sign in to comment.