Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Reduce/Remove reliance on **kwargs and *args in IO readers & writers #12025

Merged
merged 52 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
400df15
Initial work on removing **kwargs from IO code.
bdice Jun 1, 2022
abc632a
Merge remote-tracking branch 'bdice/io-kwargs' into 11780
galipremsagar Oct 5, 2022
ce0748c
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 26, 2022
8466097
first pass
galipremsagar Oct 26, 2022
f733a95
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 27, 2022
29c1bfa
update avro docstring
galipremsagar Oct 27, 2022
43b5699
add docs
galipremsagar Oct 27, 2022
81c07a2
improve json reader
galipremsagar Oct 27, 2022
e74644a
parameter update
galipremsagar Oct 27, 2022
367cac6
update docstring
galipremsagar Oct 27, 2022
a2baf55
orc update
galipremsagar Oct 27, 2022
ad35233
updates
galipremsagar Oct 27, 2022
393a43b
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 27, 2022
49bbe87
update
galipremsagar Oct 27, 2022
8511242
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 27, 2022
006d871
updates
galipremsagar Oct 28, 2022
05824ff
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 28, 2022
420f452
update
galipremsagar Oct 28, 2022
643cabe
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 28, 2022
d89039d
update
galipremsagar Oct 28, 2022
8b86691
cleanup
galipremsagar Oct 28, 2022
ed9c126
docstring update
galipremsagar Oct 28, 2022
26acc52
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 28, 2022
99d5da1
refactor
galipremsagar Oct 28, 2022
7dbfa66
add issue for xfail
galipremsagar Oct 28, 2022
3a5ca50
Apply suggestions from code review
galipremsagar Oct 31, 2022
174f5ef
Merge remote-tracking branch 'upstream/branch-22.12' into 11780
galipremsagar Oct 31, 2022
8e1f2a3
Merge branch '11780' of https://github.com/galipremsagar/cudf into 11780
galipremsagar Oct 31, 2022
8817667
docs
galipremsagar Oct 31, 2022
1960213
address reviews
galipremsagar Oct 31, 2022
7f4b043
address reviews
galipremsagar Oct 31, 2022
9f6fbf8
review
galipremsagar Oct 31, 2022
9d41534
reviews
galipremsagar Oct 31, 2022
c96c5ab
reviews
galipremsagar Oct 31, 2022
e97442c
Update python/cudf/cudf/io/avro.py
galipremsagar Oct 31, 2022
e3e2859
Update python/cudf/cudf/utils/ioutils.py
galipremsagar Oct 31, 2022
05b5cfd
switch to readable bytes
galipremsagar Oct 31, 2022
c0cd57a
Merge branch '11780' of https://github.com/galipremsagar/cudf into 11780
galipremsagar Oct 31, 2022
f03cd19
Update python/cudf/cudf/io/json.py
galipremsagar Oct 31, 2022
5f3da21
Update python/cudf/cudf/utils/ioutils.py
galipremsagar Oct 31, 2022
c8b7cc0
bytes_per_thread rename
galipremsagar Oct 31, 2022
43bd35c
remove np usage
galipremsagar Oct 31, 2022
23478de
address review
galipremsagar Oct 31, 2022
5812fa6
reviews
galipremsagar Oct 31, 2022
24c27df
dynamic docstring
galipremsagar Oct 31, 2022
199b57d
Merge branch 'rapidsai:branch-22.12' into 11780
galipremsagar Nov 1, 2022
62f3d09
fix
galipremsagar Nov 1, 2022
238e7a9
fix docstring
galipremsagar Nov 1, 2022
1e7d763
use common defaults
galipremsagar Nov 1, 2022
35aa27e
docs
galipremsagar Nov 1, 2022
4041457
Apply suggestions from code review
galipremsagar Nov 1, 2022
15a0314
style
galipremsagar Nov 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ cpdef read_orc(object filepaths_or_buffers,
c_result = move(libcudf_read_orc(c_orc_reader_options))

names = [name.decode() for name in c_result.metadata.column_names]
actual_index_names, names, is_range_index, reset_index_name, range_idx = \
_get_index_from_metadata(c_result.metadata.user_data,
names,
skip_rows,
num_rows)
actual_index_names, col_names, is_range_index, reset_index_name, \
range_idx = _get_index_from_metadata(c_result.metadata.user_data,
names,
skip_rows,
num_rows)

data, index = data_from_unique_ptr(
move(c_result.tbl),
names,
col_names if columns is None else names,
wence- marked this conversation as resolved.
Show resolved Hide resolved
actual_index_names
)

Expand Down Expand Up @@ -238,9 +238,10 @@ cpdef write_orc(table,
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None,
object cols_as_map_type=None):
object cols_as_map_type=None,
object index=None):
"""
Cython function to call into libcudf API, see `write_orc`.
Cython function to call into libcudf API, see `cudf::io::write_orc`.

See Also
--------
Expand All @@ -252,10 +253,12 @@ cpdef write_orc(table,
cdef unique_ptr[table_input_metadata] tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, None)
table, index)
)

if not isinstance(table._index, cudf.RangeIndex):
if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = make_unique[table_input_metadata](tv)
for level, idx_name in enumerate(table._index.names):
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ cpdef write_parquet(
object statistics="ROWGROUP",
object metadata_file_path=None,
object int96_timestamps=False,
object row_group_size_bytes=None,
object row_group_size_bytes=134217728,
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
object row_group_size_rows=None,
object max_page_size_bytes=None,
object max_page_size_rows=None,
Expand Down
70 changes: 66 additions & 4 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6019,11 +6019,51 @@ def select_dtypes(self, include=None, exclude=None):
return df

@ioutils.doc_to_parquet()
def to_parquet(self, path, *args, **kwargs):
def to_parquet(
self,
path,
engine="cudf",
compression="snappy",
index=None,
partition_cols=None,
partition_file_name=None,
partition_offsets=None,
statistics="ROWGROUP",
metadata_file_path=None,
int96_timestamps=False,
row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT,
row_group_size_rows=None,
max_page_size_bytes=None,
max_page_size_rows=None,
storage_options=None,
return_metadata=False,
*args,
**kwargs,
bdice marked this conversation as resolved.
Show resolved Hide resolved
):
"""{docstring}"""
from cudf.io import parquet

return parquet.to_parquet(self, path, *args, **kwargs)
return parquet.to_parquet(
self,
path=path,
engine=engine,
compression=compression,
index=index,
partition_cols=partition_cols,
partition_file_name=partition_file_name,
partition_offsets=partition_offsets,
statistics=statistics,
metadata_file_path=metadata_file_path,
int96_timestamps=int96_timestamps,
row_group_size_bytes=row_group_size_bytes,
row_group_size_rows=row_group_size_rows,
max_page_size_bytes=max_page_size_bytes,
max_page_size_rows=max_page_size_rows,
storage_options=storage_options,
return_metadata=return_metadata,
*args,
**kwargs,
)

@ioutils.doc_to_feather()
def to_feather(self, path, *args, **kwargs):
Expand Down Expand Up @@ -6066,11 +6106,33 @@ def to_csv(
)

@ioutils.doc_to_orc()
def to_orc(self, fname, compression="snappy", *args, **kwargs):
def to_orc(
self,
fname,
compression="snappy",
statistics="ROWGROUP",
stripe_size_bytes=None,
stripe_size_rows=None,
row_index_stride=None,
cols_as_map_type=None,
storage_options=None,
index=None,
):
"""{docstring}"""
from cudf.io import orc

orc.to_orc(self, fname, compression, *args, **kwargs)
return orc.to_orc(
df=self,
fname=fname,
compression=compression,
statistics=statistics,
stripe_size_bytes=stripe_size_bytes,
stripe_size_rows=stripe_size_rows,
row_index_stride=row_index_stride,
cols_as_map_type=cols_as_map_type,
storage_options=storage_options,
index=index,
)

@_cudf_nvtx_annotate
def stack(self, level=-1, dropna=True):
Expand Down
16 changes: 13 additions & 3 deletions python/cudf/cudf/io/avro.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import warnings

import cudf
from cudf import _lib as libcudf
from cudf.utils import ioutils
Expand All @@ -11,26 +14,33 @@ def read_avro(
columns=None,
skiprows=None,
num_rows=None,
**kwargs,
storage_options=None,
):
"""{docstring}"""

is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer(
path_or_data=filepath_or_buffer,
**kwargs,
storage_options=storage_options,
)
if not is_single_filepath_or_buffer:
raise NotImplementedError(
"`read_avro` does not yet support reading multiple files"
)

filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=filepath_or_buffer, compression=None, **kwargs
path_or_data=filepath_or_buffer,
compression=None,
storage_options=storage_options,
)
if compression is not None:
ValueError("URL content-encoding decompression is not supported")

if engine == "cudf":
warnings.warn(
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
"The `engine` parameter is deprecated and will be removed in a "
"future release",
FutureWarning,
)
return cudf.DataFrame._from_data(
*libcudf.avro.read_avro(
filepath_or_buffer, columns, skiprows, num_rows
Expand Down
7 changes: 4 additions & 3 deletions python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ def read_csv(
"`use_python_file_object=False`"
)

if bytes_per_thread is None:
bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT

is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer(
path_or_data=filepath_or_buffer,
storage_options=storage_options,
Expand All @@ -76,9 +79,7 @@ def read_csv(
iotypes=(BytesIO, StringIO, NativeFile),
use_python_file_object=use_python_file_object,
storage_options=storage_options,
bytes_per_thread=256_000_000
if bytes_per_thread is None
else bytes_per_thread,
bytes_per_thread=bytes_per_thread,
)

if na_values is not None and is_scalar(na_values):
Expand Down
74 changes: 45 additions & 29 deletions python/cudf/cudf/io/json.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.

import warnings
from collections import abc
from io import BytesIO, StringIO
Expand All @@ -17,22 +18,23 @@
def read_json(
path_or_buf,
engine="auto",
dtype=True,
orient=None,
dtype=None,
lines=False,
compression="infer",
byte_range=None,
keep_quotes=False,
storage_options=None,
*args,
bdice marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
"""{docstring}"""

if not isinstance(dtype, (abc.Mapping, bool)):
warnings.warn(
"passing 'dtype' as list is deprecated, instead pass "
"a dict of column name and types key-value paris."
"in future versions 'dtype' can only be a dict or bool",
FutureWarning,
if dtype is not None and not isinstance(dtype, (abc.Mapping, bool)):
raise TypeError(
"'dtype' parameter only supports "
"a dict of column names and types as key-value pairs, "
f"or a bool, or None. Got {type(dtype)}"
)

if engine == "cudf" and not lines:
Expand All @@ -45,16 +47,34 @@ def read_json(
if engine == "auto":
engine = "cudf" if lines else "pandas"
if engine == "cudf" or engine == "cudf_experimental":
if dtype is None:
dtype = True

if kwargs:
raise ValueError(
"cudf engine doesn't support the "
f"following keyword arguments: {list(kwargs.keys())}"
)
if args:
raise ValueError(
"cudf engine doesn't support the "
f"following positional arguments: {list(args)}"
)

# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(path_or_buf):
path_or_buf = [path_or_buf]

filepaths_or_buffers = []
for source in path_or_buf:
if ioutils.is_directory(source, **kwargs):
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
):
fs = ioutils._ensure_filesystem(
passed_filesystem=None, path=source, **kwargs
passed_filesystem=None,
path=source,
storage_options=storage_options,
)
source = ioutils.stringify_pathlike(source)
source = fs.sep.join([source, "*.json"])
Expand All @@ -64,7 +84,7 @@ def read_json(
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
**kwargs,
storage_options=storage_options,
)
if isinstance(tmp_source, list):
filepaths_or_buffers.extend(tmp_source)
Expand All @@ -88,7 +108,7 @@ def read_json(

if not ioutils.ensure_single_filepath_or_buffer(
path_or_data=path_or_buf,
**kwargs,
storage_options=storage_options,
):
raise NotImplementedError(
"`read_json` does not yet support reading "
Expand All @@ -100,28 +120,24 @@ def read_json(
compression=compression,
iotypes=(BytesIO, StringIO),
allow_raw_text_input=True,
**kwargs,
storage_options=storage_options,
)

if kwargs.get("orient") == "table":
pd_value = pd.read_json(
path_or_buf,
lines=lines,
compression=compression,
*args,
**kwargs,
)
else:
pd_value = pd.read_json(
path_or_buf,
lines=lines,
dtype=dtype,
compression=compression,
*args,
**kwargs,
)
pd_value = pd.read_json(
path_or_buf,
lines=lines,
dtype=dtype,
compression=compression,
storage_options=storage_options,
orient=orient,
*args,
**kwargs,
)
df = cudf.from_pandas(pd_value)

if dtype is None:
dtype = True

if dtype is True or isinstance(dtype, abc.Mapping):
# There exists some dtypes in the result columns that is inferred.
# Find them and map them to the default dtypes.
Expand Down
Loading