Skip to content

Commit

Permalink
Fix dask_cudf metadata-inference when first ORC path is empty (#8021)
Browse files Browse the repository at this point in the history
Closes #8011 

Dask-cuDF currently reads a single stripe to infer metadata in `read_orc`.  When the first path corresponds to an empty file, there is no stripe "0" to read.  This PR includes a simple fix (and test coverage).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Keith Kraus (https://github.com/kkraus14)

URL: #8021
  • Loading branch information
rjzamora authored Apr 22, 2021
1 parent d4d64c0 commit dcebfe7
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
7 changes: 6 additions & 1 deletion python/dask_cudf/dask_cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
columns = list(schema)

with fs.open(paths[0], "rb") as f:
meta = cudf.read_orc(f, stripes=[0], columns=columns, **kwargs)
meta = cudf.read_orc(
f,
stripes=[0] if nstripes_per_file[0] else None,
columns=columns,
**kwargs,
)

name = "read-orc-" + tokenize(fs_token, path, columns, **kwargs)
dsk = {}
Expand Down
21 changes: 19 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from dask import dataframe as dd

import dask_cudf

import cudf

import dask_cudf

# import pyarrow.orc as orc

cur_dir = os.path.dirname(__file__)
Expand Down Expand Up @@ -74,6 +74,23 @@ def test_read_orc_filtered(tmpdir, engine, predicate, expected_len):
dd.assert_eq(len(df), expected_len)


def test_read_orc_first_file_empty(tmpdir):

# Write a 3-file dataset where the first file is empty
# See: https://github.com/rapidsai/cudf/issues/8011
path = str(tmpdir)
os.makedirs(path, exist_ok=True)
df1 = cudf.DataFrame({"id": [1, 2], "float": [1.0, 2.0]})
df1.iloc[:0].to_orc(os.path.join(path, "data.0"))
df1.iloc[:1].to_orc(os.path.join(path, "data.1"))
df1.iloc[1:].to_orc(os.path.join(path, "data.2"))

# Read back the files with dask_cudf,
# and check the result.
df2 = dask_cudf.read_orc(os.path.join(path, "*"))
dd.assert_eq(df1, df2, check_index=False)


@pytest.mark.parametrize("compute", [True, False])
@pytest.mark.parametrize("compression", [None, "snappy"])
@pytest.mark.parametrize(
Expand Down

0 comments on commit dcebfe7

Please sign in to comment.