Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Fix an issue with reading raw string in cudf.read_json #10924

Merged
merged 7 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cudf/cudf/io/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def read_avro(
"`read_avro` does not yet support reading multiple files"
)

filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
filepath_or_buffer, compression = ioutils.get_reader_path_or_buf(
path_or_data=filepath_or_buffer, compression=None, **kwargs
)
if compression is not None:
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def read_csv(
"`read_csv` does not yet support reading multiple files"
)

filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
filepath_or_buffer, compression = ioutils.get_reader_path_or_buf(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO, NativeFile),
Expand Down Expand Up @@ -146,7 +146,7 @@ def to_csv(
path_or_buf = StringIO()
return_as_string = True

path_or_buf = ioutils.get_writer_filepath_or_buffer(
path_or_buf = ioutils.get_writer_path_or_buf(
path_or_data=path_or_buf, mode="w", **kwargs
)

Expand Down
6 changes: 4 additions & 2 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ def read_json(
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])

tmp_source, compression = ioutils.get_filepath_or_buffer(
tmp_source, compression = ioutils.get_reader_path_or_buf(
path_or_data=source,
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
**kwargs,
)
if isinstance(tmp_source, list):
Expand Down Expand Up @@ -73,10 +74,11 @@ def read_json(
"multiple files via pandas"
)

path_or_buf, compression = ioutils.get_filepath_or_buffer(
path_or_buf, compression = ioutils.get_reader_path_or_buf(
path_or_data=path_or_buf,
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
**kwargs,
)

Expand Down
6 changes: 3 additions & 3 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def read_orc_statistics(
files_statistics = []
stripes_statistics = []
for source in filepaths_or_buffers:
filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
(filepath_or_buffer, compression,) = ioutils.get_reader_path_or_buf(
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
path_or_data=source, compression=None, **kwargs
)
if compression is not None:
Expand Down Expand Up @@ -323,7 +323,7 @@ def read_orc(
source = stringify_path(source)
source = fs.sep.join([source, "*.orc"])

tmp_source, compression = ioutils.get_filepath_or_buffer(
tmp_source, compression = ioutils.get_reader_path_or_buf(
path_or_data=source,
compression=None,
use_python_file_object=use_python_file_object,
Expand Down Expand Up @@ -422,7 +422,7 @@ def to_orc(
"Categorical columns."
)

path_or_buf = ioutils.get_writer_filepath_or_buffer(
path_or_buf = ioutils.get_writer_path_or_buf(
path_or_data=fname, mode="wb", **kwargs
)
if ioutils.is_fsspec_open_file(path_or_buf):
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _write_parquet(
ValueError("paths must be list-like when partitions_info provided")

paths_or_bufs = [
ioutils.get_writer_filepath_or_buffer(path, mode="wb", **kwargs)
ioutils.get_writer_path_or_buf(path, mode="wb", **kwargs)
for path in paths
]
common_args = {
Expand Down Expand Up @@ -435,7 +435,7 @@ def read_parquet(
fs=fs,
)
for i, source in enumerate(filepath_or_buffer):
tmp_source, compression = ioutils.get_filepath_or_buffer(
tmp_source, compression = ioutils.get_reader_path_or_buf(
path_or_data=source,
compression=None,
fs=fs,
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def read_text(
):
"""{docstring}"""

filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
filepath_or_buffer, compression = ioutils.get_reader_path_or_buf(
path_or_data=filepath_or_buffer,
compression=None,
iotypes=(BytesIO, StringIO),
Expand Down
28 changes: 19 additions & 9 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ def _open_remote_files(
]


def get_filepath_or_buffer(
def get_reader_path_or_buf(
path_or_data,
compression,
mode="rb",
Expand All @@ -1328,6 +1328,7 @@ def get_filepath_or_buffer(
byte_ranges=None,
use_python_file_object=False,
open_file_options=None,
allow_raw_text_input=False,
**kwargs,
):
"""Return either a filepath string to data, or a memory buffer of data.
Expand All @@ -1352,6 +1353,11 @@ def get_filepath_or_buffer(
open_file_options : dict, optional
Optional dictionary of key-word arguments to pass to
`_open_remote_files` (used for remote storage only).
allow_raw_text_input : boolean, default False
If True, this indicates the input `path_or_data` could be a raw text
input and will not check for its existence in the filesystem. If False,
the input must be a path and an error will be raised if it does not
exist.

Returns
-------
Expand All @@ -1372,18 +1378,22 @@ def get_filepath_or_buffer(
if fs is None:
return path_or_data, compression

if len(paths) == 0:
raise FileNotFoundError(
f"{path_or_data} could not be resolved to any files"
)

if _is_local_filesystem(fs):
# Doing this as `read_json` accepts a json string
# path_or_data need not be a filepath like string
if os.path.exists(paths[0]):
path_or_data = paths if len(paths) > 1 else paths[0]
if len(paths):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we lose the check for len(paths) == 0 above this intentionally? Even if _is_local_filesystem returns false, I think we still need paths to be nonempty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this was intentional because fsspec tends to not return any paths now. However, this comment lead me to think about the check when _is_local_filesystem is False i.e., in else block I was missing this check. Added it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we should be erroring based on len(paths) == 0 only in the case when _is_local_filesystem returns False. In the True case we should check for paths being nonempty but only error if paths is nonempty and those paths don't exist. Empty paths does not constitute an error, which is the change in fsspec that is causing our tests to fail.

if fs.exists(paths[0]):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
path_or_data = paths if len(paths) > 1 else paths[0]
elif not allow_raw_text_input:
raise FileNotFoundError(
f"{path_or_data} could not be resolved to any files"
)

else:
if len(paths) == 0:
raise FileNotFoundError(
f"{path_or_data} could not be resolved to any files"
)
if use_python_file_object:
path_or_data = _open_remote_files(
paths,
Expand Down Expand Up @@ -1418,7 +1428,7 @@ def get_filepath_or_buffer(
return path_or_data, compression


def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs):
def get_writer_path_or_buf(path_or_data, mode, **kwargs):
"""
Return either a filepath string to data,
or a open file object to the output filesystem
Expand Down