diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index f57e4e8f281..cb364c86dd6 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -120,15 +120,15 @@ cpdef read_orc(object filepaths_or_buffers, c_result = move(libcudf_read_orc(c_orc_reader_options)) names = [name.decode() for name in c_result.metadata.column_names] - actual_index_names, names, is_range_index, reset_index_name, range_idx = \ - _get_index_from_metadata(c_result.metadata.user_data, - names, - skip_rows, - num_rows) + actual_index_names, col_names, is_range_index, reset_index_name, \ + range_idx = _get_index_from_metadata(c_result.metadata.user_data, + names, + skip_rows, + num_rows) data, index = data_from_unique_ptr( move(c_result.tbl), - names, + col_names if columns is None else names, actual_index_names ) @@ -238,9 +238,10 @@ cpdef write_orc(table, object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None, - object cols_as_map_type=None): + object cols_as_map_type=None, + object index=None): """ - Cython function to call into libcudf API, see `write_orc`. + Cython function to call into libcudf API, see `cudf::io::write_orc`. See Also -------- @@ -252,10 +253,12 @@ cpdef write_orc(table, cdef unique_ptr[table_input_metadata] tbl_meta cdef map[string, string] user_data user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata( - table, None) + table, index) ) - if not isinstance(table._index, cudf.RangeIndex): + if index is True or ( + index is None and not isinstance(table._index, cudf.RangeIndex) + ): tv = table_view_from_table(table) tbl_meta = make_unique[table_input_metadata](tv) for level, idx_name in enumerate(table._index.names): diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 6de84ce90c3..2667279e205 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -63,6 +63,8 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile +from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT + cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -312,7 +314,7 @@ cpdef write_parquet( object statistics="ROWGROUP", object metadata_file_path=None, object int96_timestamps=False, - object row_group_size_bytes=None, + object row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, object row_group_size_rows=None, object max_page_size_bytes=None, object max_page_size_rows=None, @@ -481,7 +483,7 @@ cdef class ParquetWriter: def __cinit__(self, object filepath_or_buffer, object index=None, object compression="snappy", str statistics="ROWGROUP", - int row_group_size_bytes=134217728, + int row_group_size_bytes=_ROW_GROUP_SIZE_BYTES_DEFAULT, int row_group_size_rows=1000000, int max_page_size_bytes=524288, int max_page_size_rows=20000): diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 92ca5148c1e..82a4a4a8b65 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -6019,11 +6019,51 @@ def select_dtypes(self, include=None, exclude=None): return df @ioutils.doc_to_parquet() - def to_parquet(self, path, *args, **kwargs): + def to_parquet( + self, + path, + engine="cudf", + compression="snappy", + index=None, + partition_cols=None, + partition_file_name=None, + partition_offsets=None, + statistics="ROWGROUP", + metadata_file_path=None, + int96_timestamps=False, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, + storage_options=None, + return_metadata=False, + *args, + **kwargs, + ): """{docstring}""" from cudf.io import parquet - return parquet.to_parquet(self, path, *args, **kwargs) + return parquet.to_parquet( + self, + path=path, + engine=engine, + compression=compression, + index=index, + partition_cols=partition_cols, + partition_file_name=partition_file_name, + partition_offsets=partition_offsets, + statistics=statistics, + metadata_file_path=metadata_file_path, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + storage_options=storage_options, + return_metadata=return_metadata, + *args, + **kwargs, + ) @ioutils.doc_to_feather() def to_feather(self, path, *args, **kwargs): @@ -6066,11 +6106,33 @@ def to_csv( ) @ioutils.doc_to_orc() - def to_orc(self, fname, compression="snappy", *args, **kwargs): + def to_orc( + self, + fname, + compression="snappy", + statistics="ROWGROUP", + stripe_size_bytes=None, + stripe_size_rows=None, + row_index_stride=None, + cols_as_map_type=None, + storage_options=None, + index=None, + ): """{docstring}""" from cudf.io import orc - orc.to_orc(self, fname, compression, *args, **kwargs) + return orc.to_orc( + df=self, + fname=fname, + compression=compression, + statistics=statistics, + stripe_size_bytes=stripe_size_bytes, + stripe_size_rows=stripe_size_rows, + row_index_stride=row_index_stride, + cols_as_map_type=cols_as_map_type, + storage_options=storage_options, + index=index, + ) @_cudf_nvtx_annotate def stack(self, level=-1, dropna=True): diff --git a/python/cudf/cudf/io/avro.py b/python/cudf/cudf/io/avro.py index 66c5c1c5a56..aaafe60d03f 100644 --- a/python/cudf/cudf/io/avro.py +++ b/python/cudf/cudf/io/avro.py @@ -1,4 +1,7 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. + +import warnings + import cudf from cudf import _lib as libcudf from cudf.utils import ioutils @@ -11,13 +14,13 @@ def read_avro( columns=None, skiprows=None, num_rows=None, - **kwargs, + storage_options=None, ): """{docstring}""" is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer( path_or_data=filepath_or_buffer, - **kwargs, + storage_options=storage_options, ) if not is_single_filepath_or_buffer: raise NotImplementedError( @@ -25,12 +28,19 @@ def read_avro( ) filepath_or_buffer, compression = ioutils.get_reader_filepath_or_buffer( - path_or_data=filepath_or_buffer, compression=None, **kwargs + path_or_data=filepath_or_buffer, + compression=None, + storage_options=storage_options, ) if compression is not None: ValueError("URL content-encoding decompression is not supported") if engine == "cudf": + warnings.warn( + "The `engine` parameter is deprecated and will be removed in a " + "future release", + FutureWarning, + ) return cudf.DataFrame._from_data( *libcudf.avro.read_avro( filepath_or_buffer, columns, skiprows, num_rows diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 0adf432c31d..1eacbbb4458 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -61,6 +61,9 @@ def read_csv( "`use_python_file_object=False`" ) + if bytes_per_thread is None: + bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT + is_single_filepath_or_buffer = ioutils.ensure_single_filepath_or_buffer( path_or_data=filepath_or_buffer, storage_options=storage_options, @@ -76,9 +79,7 @@ def read_csv( iotypes=(BytesIO, StringIO, NativeFile), use_python_file_object=use_python_file_object, storage_options=storage_options, - bytes_per_thread=256_000_000 - if bytes_per_thread is None - else bytes_per_thread, + bytes_per_thread=bytes_per_thread, ) if na_values is not None and is_scalar(na_values): diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 2a0ae565974..0ae02dcb62b 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -1,4 +1,5 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. + import warnings from collections import abc from io import BytesIO, StringIO @@ -17,22 +18,23 @@ def read_json( path_or_buf, engine="auto", - dtype=True, + orient=None, + dtype=None, lines=False, compression="infer", byte_range=None, keep_quotes=False, + storage_options=None, *args, **kwargs, ): """{docstring}""" - if not isinstance(dtype, (abc.Mapping, bool)): - warnings.warn( - "passing 'dtype' as list is deprecated, instead pass " - "a dict of column name and types key-value paris." - "in future versions 'dtype' can only be a dict or bool", - FutureWarning, + if dtype is not None and not isinstance(dtype, (abc.Mapping, bool)): + raise TypeError( + "'dtype' parameter only supports " + "a dict of column names and types as key-value pairs, " + f"or a bool, or None. Got {type(dtype)}" ) if engine == "cudf" and not lines: @@ -45,6 +47,20 @@ def read_json( if engine == "auto": engine = "cudf" if lines else "pandas" if engine == "cudf" or engine == "cudf_experimental": + if dtype is None: + dtype = True + + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) + # Multiple sources are passed as a list. If a single source is passed, # wrap it in a list for unified processing downstream. if not is_list_like(path_or_buf): @@ -52,9 +68,13 @@ def read_json( filepaths_or_buffers = [] for source in path_or_buf: - if ioutils.is_directory(source, **kwargs): + if ioutils.is_directory( + path_or_data=source, storage_options=storage_options + ): fs = ioutils._ensure_filesystem( - passed_filesystem=None, path=source, **kwargs + passed_filesystem=None, + path=source, + storage_options=storage_options, ) source = ioutils.stringify_pathlike(source) source = fs.sep.join([source, "*.json"]) @@ -64,7 +84,7 @@ def read_json( compression=compression, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, - **kwargs, + storage_options=storage_options, ) if isinstance(tmp_source, list): filepaths_or_buffers.extend(tmp_source) @@ -88,7 +108,7 @@ def read_json( if not ioutils.ensure_single_filepath_or_buffer( path_or_data=path_or_buf, - **kwargs, + storage_options=storage_options, ): raise NotImplementedError( "`read_json` does not yet support reading " @@ -100,28 +120,24 @@ def read_json( compression=compression, iotypes=(BytesIO, StringIO), allow_raw_text_input=True, - **kwargs, + storage_options=storage_options, ) - if kwargs.get("orient") == "table": - pd_value = pd.read_json( - path_or_buf, - lines=lines, - compression=compression, - *args, - **kwargs, - ) - else: - pd_value = pd.read_json( - path_or_buf, - lines=lines, - dtype=dtype, - compression=compression, - *args, - **kwargs, - ) + pd_value = pd.read_json( + path_or_buf, + lines=lines, + dtype=dtype, + compression=compression, + storage_options=storage_options, + orient=orient, + *args, + **kwargs, + ) df = cudf.from_pandas(pd_value) + if dtype is None: + dtype = True + if dtype is True or isinstance(dtype, abc.Mapping): # There exists some dtypes in the result columns that is inferred. # Find them and map them to the default dtypes. diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index b9ce07466e5..8865bdd9d33 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -289,7 +289,8 @@ def read_orc( use_index=True, timestamp_type=None, use_python_file_object=True, - **kwargs, + storage_options=None, + bytes_per_thread=None, ): """{docstring}""" from cudf import DataFrame @@ -326,11 +327,13 @@ def read_orc( filepaths_or_buffers = [] for source in filepath_or_buffer: - if ioutils.is_directory(source, **kwargs): + if ioutils.is_directory( + path_or_data=source, storage_options=storage_options + ): fs = ioutils._ensure_filesystem( passed_filesystem=None, path=source, - **kwargs, + storage_options=storage_options, ) source = stringify_path(source) source = fs.sep.join([source, "*.orc"]) @@ -339,7 +342,8 @@ def read_orc( path_or_data=source, compression=None, use_python_file_object=use_python_file_object, - **kwargs, + storage_options=storage_options, + bytes_per_thread=bytes_per_thread, ) if compression is not None: raise ValueError( @@ -413,7 +417,8 @@ def to_orc( stripe_size_rows=None, row_index_stride=None, cols_as_map_type=None, - **kwargs, + storage_options=None, + index=None, ): """{docstring}""" @@ -434,7 +439,7 @@ def to_orc( raise TypeError("cols_as_map_type must be a list of column names.") path_or_buf = ioutils.get_writer_filepath_or_buffer( - path_or_data=fname, mode="wb", **kwargs + path_or_data=fname, mode="wb", storage_options=storage_options ) if ioutils.is_fsspec_open_file(path_or_buf): with path_or_buf as file_obj: @@ -448,6 +453,7 @@ def to_orc( stripe_size_rows, row_index_stride, cols_as_map_type, + index, ) else: liborc.write_orc( @@ -459,6 +465,7 @@ def to_orc( stripe_size_rows, row_index_stride, cols_as_map_type, + index, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7ac391c5f3d..ceb08cb8058 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,7 +9,6 @@ from typing import Dict, List, Tuple from uuid import uuid4 -import numpy as np from pyarrow import dataset as ds, parquet as pq import cudf @@ -54,12 +53,12 @@ def _write_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, - row_group_size_bytes=None, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, partitions_info=None, - **kwargs, + storage_options=None, ): if is_list_like(paths) and len(paths) > 1: if partitions_info is None: @@ -73,7 +72,9 @@ def _write_parquet( ValueError("paths must be list-like when partitions_info provided") paths_or_bufs = [ - ioutils.get_writer_filepath_or_buffer(path, mode="wb", **kwargs) + ioutils.get_writer_filepath_or_buffer( + path_or_data=path, mode="wb", storage_options=storage_options + ) for path in paths ] common_args = { @@ -111,12 +112,19 @@ def _write_parquet( def write_to_dataset( df, root_path, + compression="snappy", filename=None, partition_cols=None, fs=None, preserve_index=False, return_metadata=False, - **kwargs, + statistics="ROWGROUP", + int96_timestamps=False, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_rows=None, + max_page_size_bytes=None, + max_page_size_rows=None, + storage_options=None, ): """Wraps `to_parquet` to write partitioned Parquet datasets. For each combination of partition group and value, @@ -136,25 +144,51 @@ def write_to_dataset( df : cudf.DataFrame root_path : string, The root directory of the dataset + compression : {'snappy', 'ZSTD', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. filename : string, default None The file name to use (within each partition directory). If None, a random uuid4 hex string will be used for each file name. + partition_cols : list, + Column names by which to partition the dataset. + Columns are partitioned in the order they are given. fs : FileSystem, default None If nothing passed, paths assumed to be found in the local on-disk filesystem preserve_index : bool, default False Preserve index values in each parquet file. - partition_cols : list, - Column names by which to partition the dataset - Columns are partitioned in the order they are given return_metadata : bool, default False Return parquet metadata for written data. Returned metadata will include the file-path metadata (relative to `root_path`). - **kwargs : dict, - kwargs for to_parquet function. + int96_timestamps : bool, default False + If ``True``, write timestamps in int96 format. This will convert + timestamps from timestamp[ns], timestamp[ms], timestamp[s], and + timestamp[us] to the int96 format, which is the number of Julian + days and the number of nanoseconds since midnight of 1970-01-01. + If ``False``, timestamps will not be altered. + row_group_size_bytes: integer or None, default None + Maximum size of each stripe of the output. + If None, 134217728 (128MB) will be used. + row_group_size_rows: integer or None, default None + Maximum number of rows of each stripe of the output. + If None, 1000000 will be used. + max_page_size_bytes: integer or None, default None + Maximum uncompressed size of each page of the output. + If None, 524288 (512KB) will be used. + max_page_size_rows: integer or None, default None + Maximum number of rows of each page of the output. + If None, 20000 will be used. + + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. """ - fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) + fs = ioutils._ensure_filesystem(fs, root_path, storage_options) fs.mkdirs(root_path, exist_ok=True) if partition_cols is not None and len(partition_cols) > 0: @@ -166,31 +200,50 @@ def write_to_dataset( part_offsets, _, ) = _get_partitioned( - df, - root_path, - partition_cols, - filename, - fs, - preserve_index, - **kwargs, + df=df, + root_path=root_path, + partition_cols=partition_cols, + filename=filename, + fs=fs, + preserve_index=preserve_index, + storage_options=storage_options, ) - - if return_metadata: - kwargs["metadata_file_path"] = metadata_file_paths + metadata_file_path = metadata_file_paths if return_metadata else None metadata = to_parquet( - grouped_df, - full_paths, + df=grouped_df, + path=full_paths, + compression=compression, index=preserve_index, partition_offsets=part_offsets, - **kwargs, + storage_options=storage_options, + metadata_file_path=metadata_file_path, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, ) else: filename = filename or _generate_filename() full_path = fs.sep.join([root_path, filename]) - if return_metadata: - kwargs["metadata_file_path"] = filename - metadata = df.to_parquet(full_path, index=preserve_index, **kwargs) + + metadata_file_path = filename if return_metadata else None + + metadata = df.to_parquet( + path=full_path, + compression=compression, + index=preserve_index, + storage_options=storage_options, + metadata_file_path=metadata_file_path, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + ) return metadata @@ -361,6 +414,7 @@ def read_parquet( filepath_or_buffer, engine="cudf", columns=None, + storage_options=None, filters=None, row_groups=None, strings_to_categorical=False, @@ -368,6 +422,7 @@ def read_parquet( use_python_file_object=True, categorical_partitions=True, open_file_options=None, + bytes_per_thread=None, *args, **kwargs, ): @@ -383,6 +438,9 @@ def read_parquet( ) open_file_options = {} + if bytes_per_thread is None: + bytes_per_thread = ioutils._BYTES_PER_THREAD_DEFAULT + # Multiple sources are passed as a list. If a single source is passed, # wrap it in a list for unified processing downstream. if not is_list_like(filepath_or_buffer): @@ -403,7 +461,9 @@ def read_parquet( # Start by trying construct a filesystem object, so we # can apply filters on remote file-systems - fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs) + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=storage_options + ) # Use pyarrow dataset to detect/process directory-partitioned # data and apply filters. Note that we can only support partitioned @@ -418,8 +478,8 @@ def read_parquet( partition_keys, partition_categories, ) = _process_dataset( - paths, - fs, + paths=paths, + fs=fs, filters=filters, row_groups=row_groups, categorical_partitions=categorical_partitions, @@ -431,19 +491,20 @@ def read_parquet( filepaths_or_buffers = [] if use_python_file_object: open_file_options = _default_open_file_options( - open_file_options, - columns, - row_groups, + open_file_options=open_file_options, + columns=columns, + row_groups=row_groups, fs=fs, ) - for i, source in enumerate(filepath_or_buffer): + for source in filepath_or_buffer: tmp_source, compression = ioutils.get_reader_filepath_or_buffer( path_or_data=source, compression=None, fs=fs, use_python_file_object=use_python_file_object, open_file_options=open_file_options, - **kwargs, + storage_options=storage_options, + bytes_per_thread=bytes_per_thread, ) if compression is not None: @@ -571,6 +632,16 @@ def _read_parquet( # Simple helper function to dispatch between # cudf and pyarrow to read parquet data if engine == "cudf": + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) return libparquet.read_parquet( filepaths_or_buffers, columns=columns, @@ -600,16 +671,28 @@ def to_parquet( statistics="ROWGROUP", metadata_file_path=None, int96_timestamps=False, - row_group_size_bytes=None, + row_group_size_bytes=ioutils._ROW_GROUP_SIZE_BYTES_DEFAULT, row_group_size_rows=None, max_page_size_bytes=None, max_page_size_rows=None, + storage_options=None, + return_metadata=False, *args, **kwargs, ): """{docstring}""" if engine == "cudf": + if kwargs: + raise ValueError( + "cudf engine doesn't support the " + f"following keyword arguments: {list(kwargs.keys())}" + ) + if args: + raise ValueError( + "cudf engine doesn't support the " + f"following positional arguments: {list(args)}" + ) # Ensure that no columns dtype is 'category' for col in df._column_names: if partition_cols is None or col not in partition_cols: @@ -626,34 +709,32 @@ def to_parquet( "partition_cols are provided. To request returning the " "metadata binary blob, pass `return_metadata=True`" ) - kwargs.update( - { - "compression": compression, - "statistics": statistics, - "int96_timestamps": int96_timestamps, - "row_group_size_bytes": row_group_size_bytes, - "row_group_size_rows": row_group_size_rows, - "max_page_size_bytes": max_page_size_bytes, - "max_page_size_rows": max_page_size_rows, - } - ) + return write_to_dataset( df, filename=partition_file_name, partition_cols=partition_cols, root_path=path, preserve_index=index, - **kwargs, + compression=compression, + statistics=statistics, + int96_timestamps=int96_timestamps, + row_group_size_bytes=row_group_size_bytes, + row_group_size_rows=row_group_size_rows, + max_page_size_bytes=max_page_size_bytes, + max_page_size_rows=max_page_size_rows, + return_metadata=return_metadata, + storage_options=storage_options, ) - if partition_offsets: - kwargs["partitions_info"] = list( - zip( - partition_offsets, - np.roll(partition_offsets, -1) - partition_offsets, - ) - )[:-1] - + partition_info = ( + [ + (i, j - i) + for i, j in zip(partition_offsets, partition_offsets[1:]) + ] + if partition_offsets is not None + else None + ) return _write_parquet( df, paths=path if is_list_like(path) else [path], @@ -666,7 +747,8 @@ def to_parquet( row_group_size_rows=row_group_size_rows, max_page_size_bytes=max_page_size_bytes, max_page_size_rows=max_page_size_rows, - **kwargs, + partitions_info=partition_info, + storage_options=storage_options, ) else: @@ -730,9 +812,11 @@ def _get_partitioned( filename=None, fs=None, preserve_index=False, - **kwargs, + storage_options=None, ): - fs = ioutils._ensure_filesystem(fs, root_path, **kwargs) + fs = ioutils._ensure_filesystem( + fs, root_path, storage_options=storage_options + ) fs.mkdirs(root_path, exist_ok=True) part_names, grouped_df, part_offsets = _get_groups_and_offsets( @@ -872,6 +956,13 @@ class ParquetDatasetWriter: file_name_prefix : str This is a prefix to file names generated only when `max_file_size` is specified. + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. Examples @@ -915,7 +1006,7 @@ def __init__( statistics="ROWGROUP", max_file_size=None, file_name_prefix=None, - **kwargs, + storage_options=None, ) -> None: if isinstance(path, str) and path.startswith("s3://"): self.fs_meta = {"is_s3": True, "actual_path": path} @@ -938,7 +1029,7 @@ def __init__( # Map of partition_col values to their ParquetWriter's index # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} - self.kwargs = kwargs + self.storage_options = storage_options self.filename = file_name_prefix self.max_file_size = max_file_size if max_file_size is not None: @@ -961,7 +1052,7 @@ def write_table(self, df): partition_cols=self.partition_cols, preserve_index=self.common_args["index"], ) - fs = ioutils._ensure_filesystem(None, self.path) + fs = ioutils._ensure_filesystem(None, self.path, None) fs.mkdirs(self.path, exist_ok=True) full_paths = [] @@ -1044,10 +1135,11 @@ def write_table(self, df): ) existing_cw_batch = defaultdict(dict) new_cw_paths = [] + partition_info = [(i, j - i) for i, j in zip(offsets, offsets[1:])] for path, part_info, meta_path in zip( paths, - zip(offsets, np.roll(offsets, -1) - offsets), + partition_info, metadata_file_paths, ): if path in self.path_cw_map: # path is a currently open file @@ -1097,7 +1189,7 @@ def close(self, return_metadata=False): local_path = self.path s3_path = self.fs_meta["actual_path"] s3_file, _ = ioutils._get_filesystem_and_paths( - s3_path, **self.kwargs + s3_path, storage_options=self.storage_options ) s3_file.put(local_path, s3_path, recursive=True) shutil.rmtree(self.path) diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index f341edbf6c1..eb2c7fa7ef6 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -17,7 +17,7 @@ def read_text( strip_delimiters=False, compression=None, compression_offsets=None, - **kwargs, + storage_options=None, ): """{docstring}""" @@ -28,7 +28,7 @@ def read_text( path_or_data=filepath_or_buffer, compression=None, iotypes=(BytesIO, StringIO), - **kwargs, + storage_options=storage_options, ) return cudf.Series._from_data( diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 5aa049db31a..fbd9b83330e 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1759,11 +1759,26 @@ def test_orc_writer_nvcomp(compression): assert_eq(expected, got) +@pytest.mark.parametrize("index_obj", [None, [10, 11, 12], ["x", "y", "z"]]) @pytest.mark.parametrize("index", [True, False, None]) -@pytest.mark.parametrize("columns", [None, [], ["b", "a"]]) -def test_orc_columns_and_index_param(index, columns): +@pytest.mark.parametrize( + "columns", + [ + None, + [], + pytest.param( + ["b", "a"], + marks=pytest.mark.xfail( + reason="https://github.com/rapidsai/cudf/issues/12026" + ), + ), + ], +) +def test_orc_columns_and_index_param(index_obj, index, columns): buffer = BytesIO() - df = cudf.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}) + df = cudf.DataFrame( + {"a": [1, 2, 3], "b": ["a", "b", "c"]}, index=index_obj + ) df.to_orc(buffer, index=index) expected = pd.read_orc(buffer, columns=columns) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 5c06dea4ca6..d2339930b91 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -298,7 +298,6 @@ def test_read_parquet_ext( f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, - footer_sample_size=3200, columns=columns, ) if index: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 5298e470a91..ebb73ba0ca6 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -25,6 +25,9 @@ fsspec_parquet = None +_BYTES_PER_THREAD_DEFAULT = 256 * 1024 * 1024 +_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 + _docstring_remote_sources = """ - cuDF supports local and remote data stores. See configuration details for available sources @@ -43,12 +46,20 @@ (such as builtin `open()` file handler function or `BytesIO`). engine : ['cudf'], default 'cudf' Parser engine to use. + This parameter is deprecated. columns : list, default None If not None, only these columns will be read. skiprows : int, default None If not None, the number of rows to skip from the start of the file. num_rows : int, default None If not None, the total number of rows to read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -132,6 +143,13 @@ Parser engine to use. columns : list, default None If not None, only these columns will be read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. filters : list of tuple, list of lists of tuples default None If not None, specifies a filter predicate used to filter out row groups using statistics stored for each row group as Parquet metadata. Row groups @@ -170,6 +188,13 @@ deactivate optimized precaching, set the "method" to `None` under the "precache_options" key. Note that the `open_file_func` key can also be used to specify a custom file-open function. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -195,7 +220,8 @@ cudf.DataFrame.to_parquet cudf.read_orc """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_parquet = docfmt_partial(docstring=_docstring_read_parquet) @@ -208,14 +234,15 @@ File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. Use list of str with partition_offsets to write parts of the dataframe to different files. -compression : {'snappy', 'ZSTD', None}, default 'snappy' +compression : {{'snappy', 'ZSTD', None}}, default 'snappy' Name of the compression to use. Use ``None`` for no compression. index : bool, default None - If ``True``, include the dataframe's index(es) in the file output. If - ``False``, they will not be written to the file. If ``None``, the - engine's default behavior will be used. However, instead of being saved - as values, the ``RangeIndex`` will be stored as a range in the metadata - so it doesn’t require much space and is faster. Other indexes will + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. + If ``None``, similar to ``True`` the dataframe's index(es) will + be saved, however, instead of being saved as values any + ``RangeIndex`` will be stored as a range in the metadata so it + doesn't require much space and is faster. Other indexes will be included as columns in the file output. partition_cols : list, optional, default None Column names by which to partition the dataset @@ -228,7 +255,7 @@ partition_offsets : list, optional, default None Offsets to partition the dataframe by. Should be used when path is list of str. Should be a list of integers of size ``len(path) + 1`` -statistics : {'ROWGROUP', 'PAGE', 'COLUMN', 'NONE'}, default 'ROWGROUP' +statistics : {{'ROWGROUP', 'PAGE', 'COLUMN', 'NONE'}}, default 'ROWGROUP' Level at which column statistics should be included in file. metadata_file_path : str, optional, default None If specified, this function will return a binary blob containing the footer @@ -239,11 +266,12 @@ If ``True``, write timestamps in int96 format. This will convert timestamps from timestamp[ns], timestamp[ms], timestamp[s], and timestamp[us] to the int96 format, which is the number of Julian - days and the number of nanoseconds since midnight. If ``False``, - timestamps will not be altered. -row_group_size_bytes: integer or None, default None + days and the number of nanoseconds since midnight of 1970-01-01. + If ``False``, timestamps will not be altered. +row_group_size_bytes: integer, default {row_group_size_bytes_val} Maximum size of each stripe of the output. - If None, 134217728 (128MB) will be used. + If None, {row_group_size_bytes_val} + ({row_group_size_bytes_val_in_mb} MB) will be used. row_group_size_rows: integer or None, default None Maximum number of rows of each stripe of the output. If None, 1000000 will be used. @@ -253,15 +281,30 @@ max_page_size_rows: integer or None, default None Maximum number of rows of each page of the output. If None, 20000 will be used. -**kwargs +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +return_metadata : bool, default False + Return parquet metadata for written data. Returned metadata will + include the file path metadata (relative to `root_path`). To request metadata binary blob when using with ``partition_cols``, Pass ``return_metadata=True`` instead of specifying ``metadata_file_path`` +**kwargs + Additional parameters will be passed to execution engines other + than ``cudf``. See Also -------- cudf.read_parquet -""" +""".format( + row_group_size_bytes_val=_ROW_GROUP_SIZE_BYTES_DEFAULT, + row_group_size_bytes_val_in_mb=_ROW_GROUP_SIZE_BYTES_DEFAULT / 1024 / 1024, +) doc_to_parquet = docfmt_partial(docstring=_docstring_to_parquet) _docstring_merge_parquet_filemetadata = """ @@ -392,7 +435,20 @@ If True, Arrow-backed PythonFile objects will be used in place of fsspec AbstractBufferedFile objects at IO time. This option is likely to improve performance when making small reads from larger ORC files. -kwargs are passed to the engine +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -416,7 +472,8 @@ -------- cudf.DataFrame.to_orc """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_orc = docfmt_partial(docstring=_docstring_read_orc) @@ -429,8 +486,9 @@ File path or object where the ORC dataset will be stored. compression : {{ 'snappy', 'ZSTD', None }}, default 'snappy' Name of the compression to use. Use None for no compression. -enable_statistics: boolean, default True - Enable writing column statistics. +statistics: str {{ "ROWGROUP", "STRIPE", None }}, default "ROWGROUP" + The granularity with which column statistics must + be written to the file. stripe_size_bytes: integer or None, default None Maximum size of each stripe of the output. If None, 67108864 (64MB) will be used. @@ -444,6 +502,21 @@ A list of column names which should be written as map type in the ORC file. Note that this option only affects columns of ListDtype. Names of other column types will be ignored. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. +index : bool, default None + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. + If ``None``, similar to ``True`` the dataframe's index(es) will + be saved, however, instead of being saved as values any + ``RangeIndex`` will be stored as a range in the metadata so it + doesn’t require much space and is faster. Other indexes will + be included as columns in the file output. See Also -------- @@ -504,10 +577,11 @@ ``'columns'``, and ``'records'``. typ : type of object to recover (series or frame), default 'frame' With cudf engine, only frame output is supported. -dtype : boolean or dict, default True +dtype : boolean or dict, default None If True, infer dtypes for all columns; if False, then don't infer dtypes at all, if a dict, provide a mapping from column names to their respective dtype (any missing columns will have their dtype inferred). Applies only to the data. + For all ``orient`` values except ``'table'``, default is ``True``. convert_axes : boolean, default True .. admonition:: Not GPU-accelerated @@ -613,6 +687,13 @@ If `True`, any string values are read literally (and wrapped in an additional set of quotes). If `False` string values are parsed into Python strings. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -1043,7 +1124,7 @@ Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to ``urllib.request.Request`` as header options. - For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and ``urllib`` for more details. bytes_per_thread : int, default None @@ -1051,7 +1132,7 @@ files in parallel. When there is a file of large size, we get slightly better throughput by decomposing it and transferring multiple "blocks" in parallel (using a python thread pool). Default allocation is - 256_000_000 bytes. + {bytes_per_thread} bytes. This parameter is functional only when `use_python_file_object=False`. Returns ------- @@ -1089,7 +1170,8 @@ -------- cudf.DataFrame.to_csv """.format( - remote_data_sources=_docstring_remote_sources + remote_data_sources=_docstring_remote_sources, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ) doc_read_csv = docfmt_partial(docstring=_docstring_read_csv) @@ -1139,7 +1221,7 @@ Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to ``urllib.request.Request`` as header options. - For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and ``urllib`` for more details. Returns @@ -1215,6 +1297,12 @@ delimiter : string, default None The delimiter that should be used for splitting text chunks into separate cudf column rows. The delimiter may be one or more characters. +byte_range : list or tuple, default None + Byte range within the input file to be read. The first number is the + offset in bytes, the second number is the range size in bytes. + The output contains all rows that start inside the byte range + (i.e. at or after the offset, and before the end at `offset + size`), + which may include rows that continue past the end. strip_delimiters : boolean, default False Unlike the `str.split()` function, `read_text` preserves the delimiter at the end of a field in output by default, meaning `a;b;c` will turn into @@ -1222,12 +1310,6 @@ Setting this option to `True` will strip these trailing delimiters, leaving only the contents between delimiters in the resulting column: `['a','b','c']` -byte_range : list or tuple, default None - Byte range within the input file to be read. The first number is the - offset in bytes, the second number is the range size in bytes. - The output contains all rows that start inside the byte range - (i.e. at or after the offset, and before the end at `offset + size`), - which may include rows that continue past the end. compression : string, default None Which compression type is the input compressed with. Currently supports only `bgzip`, and requires the path to a file as input. @@ -1238,6 +1320,13 @@ compressed file (upper 48 bits). The start offset points to the first byte to be read, the end offset points one past the last byte to be read. +storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details. Returns ------- @@ -1247,6 +1336,66 @@ doc_read_text = docfmt_partial(docstring=_docstring_text_datasource) +_docstring_get_reader_filepath_or_buffer = """ +Return either a filepath string to data, or a memory buffer of data. +If filepath, then the source filepath is expanded to user's environment. +If buffer, then data is returned in-memory as bytes or a ByteIO object. + +Parameters +---------- +path_or_data : str, file-like object, bytes, ByteIO + Path to data or the data itself. +compression : str + Type of compression algorithm for the content +mode : str + Mode in which file is opened +iotypes : (), default (BytesIO) + Object type to exclude from file-like check +use_python_file_object : boolean, default False + If True, Arrow-backed PythonFile objects will be used in place + of fsspec AbstractBufferedFile objects. +open_file_options : dict, optional + Optional dictionary of keyword arguments to pass to + `_open_remote_files` (used for remote storage only). +allow_raw_text_input : boolean, default False + If True, this indicates the input `path_or_data` could be a raw text + input and will not check for its existence in the filesystem. If False, + the input must be a path and an error will be raised if it does not + exist. +storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc. For HTTP(S) URLs the key-value + pairs are forwarded to ``urllib.request.Request`` as header options. + For other URLs (e.g. starting with "s3://", and "gcs://") the key-value + pairs are forwarded to ``fsspec.open``. Please see ``fsspec`` and + ``urllib`` for more details, and for more examples on storage options + refer `here `__. +bytes_per_thread : int, default None + Determines the number of bytes to be allocated per thread to read the + files in parallel. When there is a file of large size, we get slightly + better throughput by decomposing it and transferring multiple "blocks" + in parallel (using a Python thread pool). Default allocation is + {bytes_per_thread} bytes. + This parameter is functional only when `use_python_file_object=False`. + +Returns +------- +filepath_or_buffer : str, bytes, BytesIO, list + Filepath string or in-memory buffer of data or a + list of Filepath strings or in-memory buffers of data. +compression : str + Type of compression algorithm for the content + """.format( + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT +) + + +doc_get_reader_filepath_or_buffer = docfmt_partial( + docstring=_docstring_get_reader_filepath_or_buffer +) + + def is_url(url): """Check if a string is a valid URL to a network location. @@ -1295,13 +1444,12 @@ def _is_local_filesystem(fs): return isinstance(fs, fsspec.implementations.local.LocalFileSystem) -def ensure_single_filepath_or_buffer(path_or_data, **kwargs): +def ensure_single_filepath_or_buffer(path_or_data, storage_options=None): """Return False if `path_or_data` resolves to multiple filepaths or buffers. """ path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: fs, _, paths = get_fs_token_paths( @@ -1321,11 +1469,10 @@ def ensure_single_filepath_or_buffer(path_or_data, **kwargs): return True -def is_directory(path_or_data, **kwargs): +def is_directory(path_or_data, storage_options=None): """Returns True if the provided filepath is a directory""" path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options") path_or_data = os.path.expanduser(path_or_data) try: fs = get_fs_token_paths( @@ -1342,7 +1489,7 @@ def is_directory(path_or_data, **kwargs): return False -def _get_filesystem_and_paths(path_or_data, **kwargs): +def _get_filesystem_and_paths(path_or_data, storage_options): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the @@ -1355,7 +1502,6 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): and isinstance(stringify_pathlike(path_or_data[0]), str) ): # Ensure we are always working with a list - storage_options = kwargs.get("storage_options") if isinstance(path_or_data, list): path_or_data = [ os.path.expanduser(stringify_pathlike(source)) @@ -1472,54 +1618,21 @@ def _open_remote_files( ] +@doc_get_reader_filepath_or_buffer() def get_reader_filepath_or_buffer( path_or_data, compression, mode="rb", fs=None, iotypes=(BytesIO, NativeFile), - byte_ranges=None, use_python_file_object=False, open_file_options=None, allow_raw_text_input=False, - **kwargs, + storage_options=None, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, ): - """Return either a filepath string to data, or a memory buffer of data. - If filepath, then the source filepath is expanded to user's environment. - If buffer, then data is returned in-memory as bytes or a ByteIO object. - - Parameters - ---------- - path_or_data : str, file-like object, bytes, ByteIO - Path to data or the data itself. - compression : str - Type of compression algorithm for the content - mode : str - Mode in which file is opened - iotypes : (), default (BytesIO) - Object type to exclude from file-like check - byte_ranges : list, optional - List of known byte ranges that will be read from path_or_data - use_python_file_object : boolean, default False - If True, Arrow-backed PythonFile objects will be used in place - of fsspec AbstractBufferedFile objects. - open_file_options : dict, optional - Optional dictionary of key-word arguments to pass to - `_open_remote_files` (used for remote storage only). - allow_raw_text_input : boolean, default False - If True, this indicates the input `path_or_data` could be a raw text - input and will not check for its existence in the filesystem. If False, - the input must be a path and an error will be raised if it does not - exist. + """{docstring}""" - Returns - ------- - filepath_or_buffer : str, bytes, BytesIO, list - Filepath string or in-memory buffer of data or a - list of Filepath strings or in-memory buffers of data. - compression : str - Type of compression algorithm for the content - """ path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): @@ -1527,7 +1640,9 @@ def get_reader_filepath_or_buffer( # Get a filesystem object if one isn't already available paths = [path_or_data] if fs is None: - fs, paths = _get_filesystem_and_paths(path_or_data, **kwargs) + fs, paths = _get_filesystem_and_paths( + path_or_data, storage_options + ) if fs is None: return path_or_data, compression @@ -1560,7 +1675,7 @@ def get_reader_filepath_or_buffer( fpath, fs=fs, mode=mode, - **kwargs, + bytes_per_thread=bytes_per_thread, ) ) for fpath in paths @@ -1575,13 +1690,15 @@ def get_reader_filepath_or_buffer( path_or_data = ArrowPythonFile(path_or_data) else: path_or_data = BytesIO( - _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) + _fsspec_data_transfer( + path_or_data, mode=mode, bytes_per_thread=bytes_per_thread + ) ) return path_or_data, compression -def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs): +def get_writer_filepath_or_buffer(path_or_data, mode, storage_options=None): """ Return either a filepath string to data, or a open file object to the output filesystem @@ -1592,14 +1709,23 @@ def get_writer_filepath_or_buffer(path_or_data, mode, **kwargs): Path to data or the data itself. mode : str Mode in which file is opened + storage_options : dict, optional, default None + Extra options that make sense for a particular storage connection, + e.g. host, port, username, password, etc. For HTTP(S) URLs the + key-value pairs are forwarded to ``urllib.request.Request`` as + header options. For other URLs (e.g. starting with "s3://", and + "gcs://") the key-value pairs are forwarded to ``fsspec.open``. + Please see ``fsspec`` and ``urllib`` for more details. Returns ------- filepath_or_buffer : str, Filepath string or buffer of data """ + if storage_options is None: + storage_options = {} + if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options", {}) path_or_data = os.path.expanduser(path_or_data) fs = get_fs_token_paths( path_or_data, mode=mode or "w", storage_options=storage_options @@ -1793,11 +1919,11 @@ def _prepare_filters(filters): return filters -def _ensure_filesystem(passed_filesystem, path, **kwargs): +def _ensure_filesystem(passed_filesystem, path, storage_options): if passed_filesystem is None: return get_fs_token_paths( path[0] if isinstance(path, list) else path, - storage_options=kwargs.get("storage_options", {}), + storage_options={} if storage_options is None else storage_options, )[0] return passed_filesystem @@ -1811,11 +1937,12 @@ def _fsspec_data_transfer( path_or_fob, fs=None, file_size=None, - bytes_per_thread=256_000_000, + bytes_per_thread=_BYTES_PER_THREAD_DEFAULT, max_gap=64_000, mode="rb", - **kwargs, ): + if bytes_per_thread is None: + bytes_per_thread = _BYTES_PER_THREAD_DEFAULT # Require `fs` if `path_or_fob` is not file-like file_like = is_file_like(path_or_fob) @@ -1848,7 +1975,6 @@ def _fsspec_data_transfer( byte_ranges, buf, fs=fs, - **kwargs, ) return buf.tobytes() @@ -1898,7 +2024,6 @@ def _read_byte_ranges( ranges, local_buffer, fs=None, - **kwargs, ): # Simple utility to copy remote byte ranges # into a local buffer for IO in libcudf diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index e64847948cf..bd398cb9607 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -22,7 +22,11 @@ from cudf.io import write_to_dataset from cudf.io.parquet import _default_open_file_options from cudf.utils.dtypes import cudf_dtype_from_pa_type -from cudf.utils.ioutils import _is_local_filesystem, _open_remote_files +from cudf.utils.ioutils import ( + _ROW_GROUP_SIZE_BYTES_DEFAULT, + _is_local_filesystem, + _open_remote_files, +) class CudfEngine(ArrowDatasetEngine): @@ -292,24 +296,47 @@ def write_partition( preserve_index = True if partition_on: md = write_to_dataset( - df, - path, + df=df, + root_path=path, + compression=compression, filename=filename, partition_cols=partition_on, fs=fs, preserve_index=preserve_index, return_metadata=return_metadata, - **kwargs, + statistics=kwargs.get("statistics", "ROWGROUP"), + int96_timestamps=kwargs.get("int96_timestamps", False), + row_group_size_bytes=kwargs.get( + "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT + ), + row_group_size_rows=kwargs.get("row_group_size_rows", None), + max_page_size_bytes=kwargs.get("max_page_size_bytes", None), + max_page_size_rows=kwargs.get("max_page_size_rows", None), + storage_options=kwargs.get("storage_options", None), ) else: with fs.open(fs.sep.join([path, filename]), mode="wb") as out_file: if not isinstance(out_file, IOBase): out_file = BufferedWriter(out_file) md = df.to_parquet( - out_file, - compression=compression, + path=out_file, + engine=kwargs.get("engine", "cudf"), + index=kwargs.get("index", None), + partition_cols=kwargs.get("partition_cols", None), + partition_file_name=kwargs.get( + "partition_file_name", None + ), + partition_offsets=kwargs.get("partition_offsets", None), + statistics=kwargs.get("statistics", "ROWGROUP"), + int96_timestamps=kwargs.get("int96_timestamps", False), + row_group_size_bytes=kwargs.get( + "row_group_size_bytes", _ROW_GROUP_SIZE_BYTES_DEFAULT + ), + row_group_size_rows=kwargs.get( + "row_group_size_rows", None + ), + storage_options=kwargs.get("storage_options", None), metadata_file_path=filename if return_metadata else None, - **kwargs, ) # Return the schema needed to write the metadata if return_metadata: