From 82ab31d1d5e2196e0932d3b3b375f421388ee771 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 9 Sep 2021 13:31:52 -0700 Subject: [PATCH 01/18] save work related to byte-range collection --- cpp/include/cudf/io/datasource.hpp | 4 + python/cudf/cudf/_lib/cpp/io/types.pxd | 5 + python/cudf/cudf/_lib/io/datasource.pxd | 9 +- python/cudf/cudf/_lib/io/datasource.pyx | 20 ++- python/cudf/cudf/_lib/io/utils.pyx | 11 +- python/cudf/cudf/io/parquet.py | 168 ++++++++++++++++++------ python/cudf/cudf/utils/ioutils.py | 107 ++++++++++++--- 7 files changed, 264 insertions(+), 60 deletions(-) diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index 93f68d43aff..712ffc97239 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -340,6 +340,10 @@ class arrow_io_source : public datasource { * * @param Apache Arrow Filesystem URI */ + + arrow_io_source() = default; + ~arrow_io_source() = default; + explicit arrow_io_source(std::string_view arrow_uri) { const std::string uri_start_delimiter = "//"; diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 7fa6406bd29..9fb0e470950 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -102,3 +102,8 @@ cdef extern from "cudf/io/datasource.hpp" \ cdef cppclass datasource: pass + + cdef cppclass arrow_io_source(datasource): + arrow_io_source() except + + arrow_io_source(string arrow_uri) except + + arrow_io_source(shared_ptr[CRandomAccessFile]) except + diff --git a/python/cudf/cudf/_lib/io/datasource.pxd b/python/cudf/cudf/_lib/io/datasource.pxd index 705a3600f68..66aaad4a09b 100644 --- a/python/cudf/cudf/_lib/io/datasource.pxd +++ b/python/cudf/cudf/_lib/io/datasource.pxd @@ -1,10 +1,11 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr - -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource cdef class Datasource: - cdef datasource* get_datasource(self) nogil except * + +cdef class NativeFileDatasource(Datasource): + cdef arrow_io_source c_datasource + cdef datasource* get_datasource(self) nogil diff --git a/python/cudf/cudf/_lib/io/datasource.pyx b/python/cudf/cudf/_lib/io/datasource.pyx index ddfd9a3540a..72d64bb9ad2 100644 --- a/python/cudf/cudf/_lib/io/datasource.pyx +++ b/python/cudf/cudf/_lib/io/datasource.pyx @@ -1,8 +1,11 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport shared_ptr +from libcpp.utility cimport move +from pyarrow.includes.libarrow cimport CRandomAccessFile +from pyarrow.lib cimport NativeFile -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource, source_info cdef class Datasource: @@ -10,3 +13,16 @@ cdef class Datasource: with gil: raise NotImplementedError("get_datasource() should not " + "be directly invoked here") + +cdef class NativeFileDatasource(Datasource): + + def __cinit__(self, NativeFile native_file,): + + cdef shared_ptr[CRandomAccessFile] ra_src + cdef arrow_io_source arrow_src + + ra_src = native_file.get_random_access_file() + self.c_datasource = arrow_io_source(ra_src) + + cdef datasource* get_datasource(self) nogil: + return &(self.c_datasource) diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 972a93e55ec..309586179c3 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -8,9 +8,12 @@ from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from pyarrow.includes.libarrow cimport CRandomAccessFile +from pyarrow.lib cimport NativeFile from cudf._lib.column cimport Column from cudf._lib.cpp.io.types cimport ( + arrow_io_source, column_name_info, data_sink, datasource, @@ -19,13 +22,15 @@ from cudf._lib.cpp.io.types cimport ( sink_info, source_info, ) -from cudf._lib.io.datasource cimport Datasource +from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource import codecs import errno import io import os +from pyarrow.lib import NativeFile + import cudf from cudf.utils.dtypes import is_struct_dtype @@ -35,7 +40,6 @@ from cudf.utils.dtypes import is_struct_dtype cdef source_info make_source_info(list src) except*: if not src: raise ValueError("Need to pass at least one source") - cdef const unsigned char[::1] c_buffer cdef vector[host_buffer] c_host_buffers cdef vector[string] c_files @@ -60,6 +64,9 @@ cdef source_info make_source_info(list src) except*: elif isinstance(src[0], Datasource): csrc = src[0] return source_info(csrc.get_datasource()) + elif isinstance(src[0], NativeFile): + csrc = NativeFileDatasource(src[0]) + return source_info(csrc.get_datasource()) elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)): # If source is a file, return source_info where type=FILEPATH if not all(os.path.isfile(file) for file in src): diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index fa748761695..7677a197bb0 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,10 +1,11 @@ # Copyright (c) 2019-2020, NVIDIA CORPORATION. +import io import warnings from collections import defaultdict from uuid import uuid4 -from pyarrow import dataset as ds, parquet as pq +from pyarrow import dataset as ds, fs as pa_fs, parquet as pq import cudf from cudf._lib import parquet as libparquet @@ -160,6 +161,97 @@ def read_parquet_metadata(path): return num_rows, num_row_groups, col_names +def _filter_row_groups(paths, fs, filters=None, row_groups=None): + + # Deal with case that the user passed in a directory name + file_list = paths + if len(paths) == 1 and ioutils.is_directory(paths[0]): + paths = ioutils.stringify_pathlike(paths[0]) + + # Convert filters to ds.Expression + if filters: + filters = pq._filters_to_expression(filters) + + # Initialize ds.FilesystemDataset + dataset = ds.dataset( + paths, filesystem=fs, format="parquet", partitioning="hive", + ) + file_list = dataset.files + + # Load IDs of filtered row groups for each file in dataset + filtered_rg_ids = defaultdict(list) + for fragment in dataset.get_fragments(filter=filters): + for rg_fragment in fragment.split_by_row_group(filters): + for rg_info in rg_fragment.row_groups: + filtered_rg_ids[rg_fragment.path].append(rg_info.id) + + # Initialize row_groups to be selected + if row_groups is None: + row_groups = [None for _ in dataset.files] + + # Store IDs of selected row groups for each file + for i, file in enumerate(dataset.files): + if row_groups[i] is None: + row_groups[i] = filtered_rg_ids[file] + else: + row_groups[i] = filter( + lambda id: id in row_groups[i], filtered_rg_ids[file] + ) + + return file_list, row_groups + + +def _get_byte_ranges(file_list, row_groups, columns, fs): + + if row_groups is None: + if columns is None: + return None, None # No reason to construct this + row_groups = [None for path in file_list] + + # Construct a list of required byte-ranges for every file + all_byte_ranges = [] + for path, rgs in zip(file_list, row_groups): + + # Step 0 - Get size of file + file_size = fs.size(path) + + # Step 1 - Get 32 KB from tail of file. + # + # This "sample size" can be tunable, but should + # always be >= 8 bytes (so we can read the footer size) + tail_size = 32_000 + footer_sample = fs.tail(path, tail_size) + + # Step 2 - Read the footer size and re-read a larger + # tail if necessary + footer_size = int.from_bytes(footer_sample[-8:-4], "little") + if tail_size < (footer_size + 8): + footer_sample = fs.tail(path, footer_size + 8) + + # Step 3 - Collect required byte ranges + byte_ranges = [] + md = pq.ParquetFile(io.BytesIO(footer_sample)).metadata + for r in range(md.num_row_groups): + # Skip this row-group if we are targetting + # specific row-groups + if row_groups is None or r in rgs: + row_group = md.row_group(r) + for c in range(row_group.num_columns): + column = row_group.column(c) + name = column.path_in_schema + # Skip this column if we are targetting a + # specific columns + if columns is None or name in columns: + file_offset0 = column.dictionary_page_offset + if file_offset0 is None: + file_offset0 = column.data_page_offset + num_bytes = column.total_uncompressed_size + byte_ranges.append((file_offset0, num_bytes)) + + all_byte_ranges.append(byte_ranges) + return file_size, all_byte_ranges + + @ioutils.doc_read_parquet() def read_parquet( filepath_or_buffer, @@ -189,17 +281,52 @@ def read_parquet( elif not is_list_like(row_groups[0]): row_groups = [row_groups] + # Check columns input + if columns is not None: + if not is_list_like(columns): + raise ValueError("Expected list like for columns") + + # Start by trying construct a filesystem object, so we + # can apply filters on remote file-systems + fs, paths = ioutils._get_filesystem_and_paths(filepath_or_buffer, **kwargs) + filepath_or_buffer = paths if paths else filepath_or_buffer + if fs is None and filters is not None: + raise ValueError("cudf cannot apply filters to open file objects.") + + # Check if we should calculate the specific byte-ranges + # needed for each parquet file. We do this when we have + # a file-system object to work with and it is not a local + # or pyarrow-backed filesystem object. + need_fs_blocks = fs is not None and not ( + ioutils._is_local_filesystem(fs) or isinstance(fs, pa_fs.FileSystem) + ) + + # Apply filters now (before converting non-local paths to buffers) + if fs is not None and filters is not None: + filepath_or_buffer, row_groups = _filter_row_groups( + filepath_or_buffer, fs, filters=filters, row_groups=row_groups, + ) + + # Get required byte ranges (used with non-local fsspec filesystems) + fs_blocks = None + if need_fs_blocks: + fs_blocks = _get_byte_ranges( + filepath_or_buffer, row_groups, columns, fs, + ) + print(fs_blocks) + filepaths_or_buffers = [] for source in filepath_or_buffer: + if ioutils.is_directory(source, **kwargs): - fs = ioutils._ensure_filesystem( + fsspec_fs = ioutils._ensure_filesystem( passed_filesystem=None, path=source ) source = ioutils.stringify_pathlike(source) - source = fs.sep.join([source, "*.parquet"]) + source = fsspec_fs.sep.join([source, "*.parquet"]) tmp_source, compression = ioutils.get_filepath_or_buffer( - path_or_data=source, compression=None, **kwargs, + path_or_data=source, compression=None, fs=fs, **kwargs, ) if compression is not None: raise ValueError( @@ -210,39 +337,6 @@ def read_parquet( else: filepaths_or_buffers.append(tmp_source) - if columns is not None: - if not is_list_like(columns): - raise ValueError("Expected list like for columns") - - if filters is not None: - # Convert filters to ds.Expression - filters = pq._filters_to_expression(filters) - - # Initialize ds.FilesystemDataset - dataset = ds.dataset( - filepaths_or_buffers, format="parquet", partitioning="hive" - ) - - # Load IDs of filtered row groups for each file in dataset - filtered_rg_ids = defaultdict(list) - for fragment in dataset.get_fragments(filter=filters): - for rg_fragment in fragment.split_by_row_group(filters): - for rg_info in rg_fragment.row_groups: - filtered_rg_ids[rg_fragment.path].append(rg_info.id) - - # Initialize row_groups to be selected - if row_groups is None: - row_groups = [None for _ in dataset.files] - - # Store IDs of selected row groups for each file - for i, file in enumerate(dataset.files): - if row_groups[i] is None: - row_groups[i] = filtered_rg_ids[file] - else: - row_groups[i] = filter( - lambda id: id in row_groups[i], filtered_rg_ids[file] - ) - if engine == "cudf": return libparquet.read_parquet( filepaths_or_buffers, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index af91db6a9e6..52e959cbdcb 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -3,12 +3,16 @@ import datetime import os import urllib +import warnings from io import BufferedWriter, BytesIO, IOBase, TextIOWrapper import fsspec import fsspec.implementations.local import pandas as pd +import pyarrow.fs as pa_fs from fsspec.core import get_fs_token_paths +from pyarrow._fs import LocalFileSystem +from pyarrow.lib import ArrowInvalid, NativeFile from cudf.utils.docutils import docfmt_partial @@ -1057,7 +1061,9 @@ def is_file_like(obj): def _is_local_filesystem(fs): - return isinstance(fs, fsspec.implementations.local.LocalFileSystem) + return isinstance( + fs, (fsspec.implementations.local.LocalFileSystem, LocalFileSystem), + ) def ensure_single_filepath_or_buffer(path_or_data, **kwargs): @@ -1107,8 +1113,81 @@ def is_directory(path_or_data, **kwargs): return False +def _get_filesystem_and_paths(path_or_data, **kwargs): + # Returns a filesystem object and the filesystem-normalized + # paths. If `path_or_data` does not correspond to a path or + # list of paths (or if the protocol is not supported), the + # return will be `None` for the fs and `[]` for the paths. + fs = None + return_paths = path_or_data + if isinstance(path_or_data, str) or ( + isinstance(path_or_data, list) and isinstance(path_or_data[0], str) + ): + # Ensure we are always working with a list + storage_options = kwargs.get("storage_options") + if isinstance(path_or_data, list): + path_or_data = [ + os.path.expanduser(stringify_pathlike(source)) + for source in path_or_data + ] + else: + path_or_data = [path_or_data] + + # Try infering a pyarrow-backed filesystem + arrow_protocol_supported = True + arrow_options_supported = True + try: + fs, fs_paths = pa_fs.FileSystem.from_uri(path_or_data[0]) + fs_paths = [fs_paths] + for source in path_or_data[1:]: + fs_paths.append(pa_fs.FileSystem.from_uri(source)[1]) + if storage_options: + fs = type(fs)(**storage_options) + return_paths = fs_paths + except ArrowInvalid: + # The uri protocol is not supported by pyarrow + fs = None + arrow_protocol_supported = False + except TypeError: + # The `storage_options` are not supported by pyarrow + fs = None + arrow_options_supported = False + + if fs is None: + # Pyarrow did not support the protocol or storage options. + # Fall back to fsspec + try: + fs, _, fs_paths = fsspec.get_fs_token_paths( + path_or_data, mode="rb", storage_options=storage_options + ) + return_paths = fs_paths + except ValueError as e: + if str(e).startswith("Protocol not known"): + return None, [] + else: + raise e + + # Warn the user if they are not using an available pyarrow + # filesystem class due to incompatible `storage_options` + if arrow_protocol_supported and not arrow_options_supported: + warnings.warn( + f"Using an fsspec filesystem object ({fs}) for IO, even " + f"though the protocol is supported by a native pyarrow " + f"FileSystem class. IO performance will likely improve " + f"by passing `storage_options` that are compatible " + f"with the appropriate pyarrow FileSystem." + ) + + return fs, return_paths + + def get_filepath_or_buffer( - path_or_data, compression, mode="rb", iotypes=(BytesIO), **kwargs, + path_or_data, + compression, + mode="rb", + fs=None, + iotypes=(BytesIO, NativeFile), + **kwargs, ): """Return either a filepath string to data, or a memory buffer of data. If filepath, then the source filepath is expanded to user's environment. @@ -1136,19 +1215,11 @@ def get_filepath_or_buffer( path_or_data = stringify_pathlike(path_or_data) if isinstance(path_or_data, str): - storage_options = kwargs.get("storage_options") - # fsspec does not expanduser so handle here - path_or_data = os.path.expanduser(path_or_data) - try: - fs, _, paths = fsspec.get_fs_token_paths( - path_or_data, mode=mode, storage_options=storage_options - ) - except ValueError as e: - if str(e).startswith("Protocol not known"): - return path_or_data, compression - else: - raise e + # Get a filesystem object if one isn't already available + paths = [path_or_data] + if fs is None: + fs, paths = _get_filesystem_and_paths(path_or_data, **kwargs) if len(paths) == 0: raise FileNotFoundError( @@ -1162,7 +1233,13 @@ def get_filepath_or_buffer( path_or_data = paths if len(paths) > 1 else paths[0] else: - path_or_data = [BytesIO(fs.open(fpath).read()) for fpath in paths] + if isinstance(fs, pa_fs.FileSystem): + # We do not want to + path_or_data = [fs.open_input_file(fpath) for fpath in paths] + else: + path_or_data = [ + BytesIO(fs.open(fpath).read()) for fpath in paths + ] if len(path_or_data) == 1: path_or_data = path_or_data[0] From ef02f3de9ba03856d8925dd6913befde1f284cce Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 9 Sep 2021 19:49:11 -0700 Subject: [PATCH 02/18] enable byte_ranges optimization for open file-like --- python/cudf/cudf/io/parquet.py | 92 +++++++++++++-------- python/cudf/cudf/utils/ioutils.py | 130 +++++++++++++++++++++++++++++- 2 files changed, 187 insertions(+), 35 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7677a197bb0..837d3a3124f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -5,6 +5,7 @@ from collections import defaultdict from uuid import uuid4 +import fsspec from pyarrow import dataset as ds, fs as pa_fs, parquet as pq import cudf @@ -178,25 +179,26 @@ def _filter_row_groups(paths, fs, filters=None, row_groups=None): ) file_list = dataset.files - # Load IDs of filtered row groups for each file in dataset - filtered_rg_ids = defaultdict(list) - for fragment in dataset.get_fragments(filter=filters): - for rg_fragment in fragment.split_by_row_group(filters): - for rg_info in rg_fragment.row_groups: - filtered_rg_ids[rg_fragment.path].append(rg_info.id) - - # Initialize row_groups to be selected - if row_groups is None: - row_groups = [None for _ in dataset.files] - - # Store IDs of selected row groups for each file - for i, file in enumerate(dataset.files): - if row_groups[i] is None: - row_groups[i] = filtered_rg_ids[file] - else: - row_groups[i] = filter( - lambda id: id in row_groups[i], filtered_rg_ids[file] - ) + if filters: + # Load IDs of filtered row groups for each file in dataset + filtered_rg_ids = defaultdict(list) + for fragment in dataset.get_fragments(filter=filters): + for rg_fragment in fragment.split_by_row_group(filters): + for rg_info in rg_fragment.row_groups: + filtered_rg_ids[rg_fragment.path].append(rg_info.id) + + # Initialize row_groups to be selected + if row_groups is None: + row_groups = [None for _ in dataset.files] + + # Store IDs of selected row groups for each file + for i, file in enumerate(dataset.files): + if row_groups[i] is None: + row_groups[i] = filtered_rg_ids[file] + else: + row_groups[i] = filter( + lambda id: id in row_groups[i], filtered_rg_ids[file] + ) return file_list, row_groups @@ -209,24 +211,35 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): row_groups = [None for path in file_list] # Construct a list of required byte-ranges for every file - all_byte_ranges = [] + all_byte_ranges, all_footers, all_sizes = [], [], [] for path, rgs in zip(file_list, row_groups): # Step 0 - Get size of file - file_size = fs.size(path) + if fs is None: + file_size = path.size + else: + file_size = fs.size(path) # Step 1 - Get 32 KB from tail of file. # # This "sample size" can be tunable, but should # always be >= 8 bytes (so we can read the footer size) tail_size = 32_000 - footer_sample = fs.tail(path, tail_size) + if fs is None: + path.seek(file_size - tail_size) + footer_sample = path.read(tail_size) + else: + footer_sample = fs.tail(path, tail_size) # Step 2 - Read the footer size and re-read a larger # tail if necessary footer_size = int.from_bytes(footer_sample[-8:-4], "little") if tail_size < (footer_size + 8): - footer_sample = fs.tail(path, footer_size + 8) + if fs is None: + path.seek(file_size - (footer_size + 8)) + footer_sample = path.read(footer_size + 8) + else: + footer_sample = fs.tail(path, footer_size + 8) # Step 3 - Collect required byte ranges byte_ranges = [] @@ -234,7 +247,7 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): for r in range(md.num_row_groups): # Skip this row-group if we are targetting # specific row-groups - if row_groups is None or r in rgs: + if rgs is None or r in rgs: row_group = md.row_group(r) for c in range(row_group.num_columns): column = row_group.column(c) @@ -249,7 +262,9 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): byte_ranges.append((file_offset0, num_bytes)) all_byte_ranges.append(byte_ranges) - return file_size, all_byte_ranges + all_footers.append(footer_sample) + all_sizes.append(file_size) + return all_byte_ranges, all_footers, all_sizes @ioutils.doc_read_parquet() @@ -297,26 +312,30 @@ def read_parquet( # needed for each parquet file. We do this when we have # a file-system object to work with and it is not a local # or pyarrow-backed filesystem object. - need_fs_blocks = fs is not None and not ( + need_byte_ranges = fs is not None and not ( ioutils._is_local_filesystem(fs) or isinstance(fs, pa_fs.FileSystem) ) # Apply filters now (before converting non-local paths to buffers) - if fs is not None and filters is not None: + if fs is not None and (filters or need_byte_ranges): filepath_or_buffer, row_groups = _filter_row_groups( filepath_or_buffer, fs, filters=filters, row_groups=row_groups, ) # Get required byte ranges (used with non-local fsspec filesystems) - fs_blocks = None - if need_fs_blocks: - fs_blocks = _get_byte_ranges( + byte_ranges, footers, file_sizes = None, None, None + if need_byte_ranges or ( + filepath_or_buffer + and isinstance( + filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, + ) + ): + byte_ranges, footers, file_sizes = _get_byte_ranges( filepath_or_buffer, row_groups, columns, fs, ) - print(fs_blocks) filepaths_or_buffers = [] - for source in filepath_or_buffer: + for i, source in enumerate(filepath_or_buffer): if ioutils.is_directory(source, **kwargs): fsspec_fs = ioutils._ensure_filesystem( @@ -326,7 +345,14 @@ def read_parquet( source = fsspec_fs.sep.join([source, "*.parquet"]) tmp_source, compression = ioutils.get_filepath_or_buffer( - path_or_data=source, compression=None, fs=fs, **kwargs, + path_or_data=source, + compression=None, + fs=fs, + byte_ranges=byte_ranges[i] if byte_ranges else None, + footer=footers[i] if footers else None, + file_size=file_sizes[i] if file_sizes else None, + add_par1_magic=True, + **kwargs, ) if compression is not None: raise ValueError( diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 52e959cbdcb..051e572bb82 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -5,9 +5,12 @@ import urllib import warnings from io import BufferedWriter, BytesIO, IOBase, TextIOWrapper +from queue import Queue +from threading import Thread import fsspec import fsspec.implementations.local +import numpy as np import pandas as pd import pyarrow.fs as pa_fs from fsspec.core import get_fs_token_paths @@ -1238,7 +1241,8 @@ def get_filepath_or_buffer( path_or_data = [fs.open_input_file(fpath) for fpath in paths] else: path_or_data = [ - BytesIO(fs.open(fpath).read()) for fpath in paths + BytesIO(_fsspec_data_transfer(fpath, fs=fs, **kwargs)) + for fpath in paths ] if len(path_or_data) == 1: path_or_data = path_or_data[0] @@ -1246,7 +1250,10 @@ def get_filepath_or_buffer( elif not isinstance(path_or_data, iotypes) and is_file_like(path_or_data): if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer - path_or_data = BytesIO(path_or_data.read()) + path_or_data = BytesIO( + # path_or_data.read() + _fsspec_data_transfer(path_or_data, **kwargs) + ) return path_or_data, compression @@ -1468,3 +1475,122 @@ def _ensure_filesystem(passed_filesystem, path): 0 ] return passed_filesystem + + +# +# Fsspec Data-transfer Optimization Code +# + + +def _fsspec_data_transfer( + path_or_fob, + fs=None, + byte_ranges=None, + footer=None, + file_size=None, + add_par1_magic=None, + bytes_per_thread=128_000_000, + **kwargs, +): + + if is_file_like(path_or_fob): + file_size = path_or_fob.size + + # Start with empty buffer + file_size = file_size or fs.size(path_or_fob) + buf = np.zeros(file_size, dtype="b") + + if byte_ranges: + + # Call multi-threaded data transfer of + # remote byte-ranges to local buffer + _read_byte_ranges( + path_or_fob, byte_ranges, buf, fs=fs, num_threads=len(byte_ranges) + ) + + # Add Header & Footer bytes + if footer is not None: + footer_size = len(footer) + buf[-footer_size:] = np.frombuffer( + footer[-footer_size:], dtype="b" + ) + + # Add parquet magic bytes (optional) + if add_par1_magic: + buf[:4] = np.frombuffer(b"PAR1", dtype="b") + if footer is None: + buf[-4:] = np.frombuffer(b"PAR1", dtype="b") + + else: + # return fs.open(path_or_fob).read() + byte_ranges = [ + (b, min(bytes_per_thread, file_size - b)) + for b in range(0, file_size, bytes_per_thread) + ] + _read_byte_ranges( + path_or_fob, byte_ranges, buf, fs=fs, num_threads=len(byte_ranges) + ) + + return buf.tobytes() + + +def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): + if fs is None: + # We have an open fsspec file object + path_or_fob.seek(offset) + local_buffer[offset : offset + nbytes] = np.frombuffer( + path_or_fob.read(nbytes), dtype="b", + ) + else: + # We have an fsspec filesystem and a path + local_buffer[offset : offset + nbytes] = np.frombuffer( + fs.read_block(path_or_fob, offset, nbytes), dtype="b", + ) + + +class ReadBlockWorker(Thread): + def __init__(self, queue, fs, path_or_fob, local_buffer): + Thread.__init__(self) + self.queue = queue + self.fs = fs + self.path_or_fob = path_or_fob + self.local_buffer = local_buffer + + def run(self): + while True: + # Get the work from the queue and expand the tuple + offset, nbytes = self.queue.get() + try: + _assign_block( + self.fs, + self.path_or_fob, + self.local_buffer, + offset, + nbytes, + ) + finally: + self.queue.task_done() + + +def _read_byte_ranges( + path_or_fob, ranges, local_buffer, fs=None, num_threads=1 +): + + # No reason to generate more threads than byte-ranges + num_threads = min(num_threads, len(ranges)) + + if num_threads > 1: + queue = Queue() + for x in range(num_threads): + worker = ReadBlockWorker(queue, fs, path_or_fob, local_buffer) + worker.daemon = True + worker.start() + + for (offset, nbytes) in ranges: + if num_threads > 1: + queue.put((offset, nbytes)) + else: + _assign_block(fs, path_or_fob, local_buffer, offset, nbytes) + + if num_threads > 1: + queue.join() From a32a7ae9053383ed654216bab09de6b223446c61 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 9 Sep 2021 20:00:05 -0700 Subject: [PATCH 03/18] fix bug for no column or row-group selection --- python/cudf/cudf/io/parquet.py | 2 +- python/cudf/cudf/utils/ioutils.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 837d3a3124f..9133c96e9f1 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -207,7 +207,7 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): if row_groups is None: if columns is None: - return None, None # No reason to construct this + return None, None, None # No reason to construct this row_groups = [None for path in file_list] # Construct a list of required byte-ranges for every file diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 051e572bb82..99ee9798e3c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1492,7 +1492,7 @@ def _fsspec_data_transfer( bytes_per_thread=128_000_000, **kwargs, ): - + # return fs.open(path_or_fob).read() if is_file_like(path_or_fob): file_size = path_or_fob.size @@ -1522,7 +1522,6 @@ def _fsspec_data_transfer( buf[-4:] = np.frombuffer(b"PAR1", dtype="b") else: - # return fs.open(path_or_fob).read() byte_ranges = [ (b, min(bytes_per_thread, file_size - b)) for b in range(0, file_size, bytes_per_thread) From bd2e59ab258e8199e867baca247d5bd434f196e8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 10 Sep 2021 13:39:16 -0700 Subject: [PATCH 04/18] add arrow_filesystem flag for dask_cudf --- python/cudf/cudf/utils/ioutils.py | 124 +++++++++++++------ python/dask_cudf/dask_cudf/io/parquet.py | 145 +++++++++++------------ 2 files changed, 157 insertions(+), 112 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 99ee9798e3c..5e958a920a1 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1116,11 +1116,55 @@ def is_directory(path_or_data, **kwargs): return False +def _try_pyarrow_filesystem(path, storage_options=None): + # Returns a pyarrow filesystem object and a normalized + # path if the url includes a protocol supported by + # arrow. The current motivation for this utility is to + # improve s3 read performance over fsspec (s3fs). + + try: + fs, fs_path = pa_fs.FileSystem.from_uri(path) + except ArrowInvalid: + # Protocol not supported + return None, None + + if storage_options: + + import pdb + + pdb.set_trace() + # Translate known s3 options + _translation = { + "anon": "anonymous", + "key": "access_key", + "token": "session_token", + } + translated_storage_options = { + _translation.get(k, k): v for k, v in storage_options.items() + } + + try: + fs = type(fs)(**translated_storage_options) + except TypeError: + # Warn the user if they are not using an available pyarrow + # filesystem class due to incompatible `storage_options` + warnings.warn( + f"This url protocol is supported by the {fs} " + f"FileSystem subclass. IO performance will likely " + f"improve by passing `storage_options` that are " + f"compatible with this FileSystem implementation." + ) + return None, None + + return fs, fs_path + + def _get_filesystem_and_paths(path_or_data, **kwargs): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the # return will be `None` for the fs and `[]` for the paths. + fs = None return_paths = path_or_data if isinstance(path_or_data, str) or ( @@ -1137,24 +1181,14 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): path_or_data = [path_or_data] # Try infering a pyarrow-backed filesystem - arrow_protocol_supported = True - arrow_options_supported = True - try: - fs, fs_paths = pa_fs.FileSystem.from_uri(path_or_data[0]) + fs, fs_paths = _try_pyarrow_filesystem( + path_or_data[0], storage_options=storage_options + ) + if fs is not None: fs_paths = [fs_paths] for source in path_or_data[1:]: - fs_paths.append(pa_fs.FileSystem.from_uri(source)[1]) - if storage_options: - fs = type(fs)(**storage_options) + fs_paths.append(_try_pyarrow_filesystem(source)[1]) return_paths = fs_paths - except ArrowInvalid: - # The uri protocol is not supported by pyarrow - fs = None - arrow_protocol_supported = False - except TypeError: - # The `storage_options` are not supported by pyarrow - fs = None - arrow_options_supported = False if fs is None: # Pyarrow did not support the protocol or storage options. @@ -1170,17 +1204,6 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): else: raise e - # Warn the user if they are not using an available pyarrow - # filesystem class due to incompatible `storage_options` - if arrow_protocol_supported and not arrow_options_supported: - warnings.warn( - f"Using an fsspec filesystem object ({fs}) for IO, even " - f"though the protocol is supported by a native pyarrow " - f"FileSystem class. IO performance will likely improve " - f"by passing `storage_options` that are compatible " - f"with the appropriate pyarrow FileSystem." - ) - return fs, return_paths @@ -1241,7 +1264,11 @@ def get_filepath_or_buffer( path_or_data = [fs.open_input_file(fpath) for fpath in paths] else: path_or_data = [ - BytesIO(_fsspec_data_transfer(fpath, fs=fs, **kwargs)) + BytesIO( + _fsspec_data_transfer( + fpath, fs=fs, mode=mode, **kwargs + ) + ) for fpath in paths ] if len(path_or_data) == 1: @@ -1252,7 +1279,7 @@ def get_filepath_or_buffer( path_or_data = path_or_data.buffer path_or_data = BytesIO( # path_or_data.read() - _fsspec_data_transfer(path_or_data, **kwargs) + _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) ) return path_or_data, compression @@ -1490,22 +1517,37 @@ def _fsspec_data_transfer( file_size=None, add_par1_magic=None, bytes_per_thread=128_000_000, + mode="rb", **kwargs, ): - # return fs.open(path_or_fob).read() - if is_file_like(path_or_fob): - file_size = path_or_fob.size - # Start with empty buffer + # Require `fs` if `path_or_fob` is not file-like + file_like = is_file_like(path_or_fob) + if fs is None and not file_like: + raise ValueError( + "fs must be defined if `path_or_fob` is not file-like" + ) + + # Calculate total file size + if file_like: + file_size = path_or_fob.size file_size = file_size or fs.size(path_or_fob) - buf = np.zeros(file_size, dtype="b") + # Check if a direct read makes the most sense + if not byte_ranges and bytes_per_thread >= file_size: + if file_like: + return path_or_fob.read() + else: + return fs.open(path_or_fob, mode=mode, cache_type="bytes").read() + + # Threaded read into "dummy" buffer + buf = np.zeros(file_size, dtype="b") if byte_ranges: # Call multi-threaded data transfer of # remote byte-ranges to local buffer _read_byte_ranges( - path_or_fob, byte_ranges, buf, fs=fs, num_threads=len(byte_ranges) + path_or_fob, byte_ranges, buf, fs=fs, **kwargs, ) # Add Header & Footer bytes @@ -1527,7 +1569,7 @@ def _fsspec_data_transfer( for b in range(0, file_size, bytes_per_thread) ] _read_byte_ranges( - path_or_fob, byte_ranges, buf, fs=fs, num_threads=len(byte_ranges) + path_or_fob, byte_ranges, buf, fs=fs, **kwargs, ) return buf.tobytes() @@ -1542,9 +1584,11 @@ def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): ) else: # We have an fsspec filesystem and a path - local_buffer[offset : offset + nbytes] = np.frombuffer( - fs.read_block(path_or_fob, offset, nbytes), dtype="b", - ) + with fs.open(path_or_fob, mode="rb", cache_type="none") as fob: + fob.seek(offset) + local_buffer[offset : offset + nbytes] = np.frombuffer( + fob.read(nbytes), dtype="b", + ) class ReadBlockWorker(Thread): @@ -1572,10 +1616,12 @@ def run(self): def _read_byte_ranges( - path_or_fob, ranges, local_buffer, fs=None, num_threads=1 + path_or_fob, ranges, local_buffer, fs=None, num_threads=None, **kwargs, ): # No reason to generate more threads than byte-ranges + if num_threads is None: + num_threads = len(ranges) num_threads = min(num_threads, len(ranges)) if num_threads > 1: diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index ac1b0b9f3e3..1182ea0044b 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -65,12 +65,32 @@ def _read_paths( partitions=None, partitioning=None, partition_keys=None, + arrow_filesystem=None, **kwargs, ): + # Simplify row_groups if all None + if row_groups == [None for path in paths]: + row_groups = None + + # Non-local filesystem handling + paths_or_fobs = paths + if not cudf.utils.ioutils._is_local_filesystem(fs): + # Convert paths to file objects for remote data + if arrow_filesystem: + paths_or_fobs = [ + arrow_filesystem.open_input_file(path) for path in paths + ] + else: + cache_type = "none" if columns or row_groups else "bytes" + paths_or_fobs = [ + fs.open(path, mode="rb", cache_type=cache_type) + for path in paths + ] + # Use cudf to read in data df = cudf.read_parquet( - paths, + paths_or_fobs, engine="cudf", columns=columns, row_groups=row_groups if row_groups else None, @@ -147,89 +167,55 @@ def read_partition( strings_to_cats = kwargs.get("strings_to_categorical", False) - if len(pieces) > 1: - - # Multi-peice read - paths = [] - rgs = [] - last_partition_keys = None - dfs = [] - - for i, piece in enumerate(pieces): - - (path, row_group, partition_keys) = piece - row_group = None if row_group == [None] else row_group - - if i > 0 and partition_keys != last_partition_keys: - dfs.append( - cls._read_paths( - paths, - fs, - columns=columns, - row_groups=rgs if rgs else None, - strings_to_categorical=strings_to_cats, - partitions=partitions, - partitioning=partitioning, - partition_keys=last_partition_keys, - **kwargs.get("read", {}), - ) - ) - paths = rgs = [] - last_partition_keys = None - paths.append(path) - rgs.append( - [row_group] - if not isinstance(row_group, list) - else row_group - ) - last_partition_keys = partition_keys - - dfs.append( - cls._read_paths( - paths, - fs, - columns=columns, - row_groups=rgs if rgs else None, - strings_to_categorical=strings_to_cats, - partitions=partitions, - partitioning=partitioning, - partition_keys=last_partition_keys, - **kwargs.get("read", {}), - ) - ) - df = cudf.concat(dfs) + # Assume multi-peice read + paths = [] + rgs = [] + last_partition_keys = None + dfs = [] - else: + for i, piece in enumerate(pieces): - # Single-piece read - (path, row_group, partition_keys) = pieces[0] + (path, row_group, partition_keys) = piece row_group = None if row_group == [None] else row_group - if cudf.utils.ioutils._is_local_filesystem(fs): - df = cls._read_paths( - path, - fs, - columns=columns, - row_groups=row_group, - strings_to_categorical=strings_to_cats, - partitions=partitions, - partitioning=partitioning, - partition_keys=partition_keys, - **kwargs.get("read", {}), - ) - else: - with fs.open(path, mode="rb") as f: - df = cls._read_paths( - f, + if i > 0 and partition_keys != last_partition_keys: + dfs.append( + cls._read_paths( + paths, fs, columns=columns, - row_groups=row_group, + row_groups=rgs if rgs else None, strings_to_categorical=strings_to_cats, partitions=partitions, partitioning=partitioning, - partition_keys=partition_keys, + partition_keys=last_partition_keys, **kwargs.get("read", {}), ) + ) + paths = rgs = [] + last_partition_keys = None + paths.append(path) + rgs.append( + [row_group] + if not isinstance(row_group, list) and row_group is not None + else row_group + ) + last_partition_keys = partition_keys + + dfs.append( + cls._read_paths( + paths, + fs, + columns=columns, + row_groups=rgs if rgs else None, + strings_to_categorical=strings_to_cats, + partitions=partitions, + partitioning=partitioning, + partition_keys=last_partition_keys, + **kwargs.get("read", {}), + ) + ) + df = cudf.concat(dfs) if len(dfs) > 1 else dfs[0] # Re-set "object" dtypes align with pa schema set_object_dtypes_from_pa_schema(df, kwargs.get("schema", None)) @@ -350,6 +336,8 @@ def read_parquet( columns=None, split_row_groups=None, row_groups_per_part=None, + read=None, + arrow_filesystem=None, **kwargs, ): """ Read parquet files into a Dask DataFrame @@ -380,6 +368,17 @@ def read_parquet( if split_row_groups is None: split_row_groups = row_groups_per_part + # Check if we should use an arrow-backed filesystem + # on the workers (at IO time) + if arrow_filesystem is not False: + arrow_filesystem = cudf.utils.ioutils._try_pyarrow_filesystem( + path, kwargs.get("storage_options", {}), + )[0] + if arrow_filesystem: + read_kwargs = (read or {}).copy() + read_kwargs["arrow_filesystem"] = arrow_filesystem + kwargs["read"] = read_kwargs + return dd.read_parquet( path, columns=columns, From 53bb32e4c9b29e0066ae7e0e0d8c9210e89eb89f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 10 Sep 2021 15:59:59 -0700 Subject: [PATCH 05/18] use cat_ranges when available --- python/cudf/cudf/utils/ioutils.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 5e958a920a1..b5f5e01fc39 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1619,6 +1619,21 @@ def _read_byte_ranges( path_or_fob, ranges, local_buffer, fs=None, num_threads=None, **kwargs, ): + # If we have a fs object that supports `cat_ranges`, + # the ranges should be collected concurrently + if fs is not None and hasattr(fs, "cat_ranges"): + paths, starts, ends = [], [], [] + for offset, nbytes in ranges: + paths.append(path_or_fob) + starts.append(offset) + ends.append(offset + nbytes) + blocks = fs.cat_ranges(paths, starts, ends) + for block, (offset, nbytes) in zip(blocks, ranges): + local_buffer[offset : offset + nbytes] = np.frombuffer( + block, dtype="b", + ) + return + # No reason to generate more threads than byte-ranges if num_threads is None: num_threads = len(ranges) From ada8451a54d43148064e40819df973143c81208a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 10 Sep 2021 18:33:23 -0700 Subject: [PATCH 06/18] expose arrow_filesystem and legacy_transfer --- python/cudf/cudf/utils/ioutils.py | 32 ++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index b5f5e01fc39..37e4002e9f1 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1159,7 +1159,7 @@ def _try_pyarrow_filesystem(path, storage_options=None): return fs, fs_path -def _get_filesystem_and_paths(path_or_data, **kwargs): +def _get_filesystem_and_paths(path_or_data, arrow_filesystem=None, **kwargs): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the @@ -1180,15 +1180,17 @@ def _get_filesystem_and_paths(path_or_data, **kwargs): else: path_or_data = [path_or_data] - # Try infering a pyarrow-backed filesystem - fs, fs_paths = _try_pyarrow_filesystem( - path_or_data[0], storage_options=storage_options - ) - if fs is not None: - fs_paths = [fs_paths] - for source in path_or_data[1:]: - fs_paths.append(_try_pyarrow_filesystem(source)[1]) - return_paths = fs_paths + if arrow_filesystem is not False: + + # Try infering a pyarrow-backed filesystem + fs, fs_paths = _try_pyarrow_filesystem( + path_or_data[0], storage_options=storage_options + ) + if fs is not None: + fs_paths = [fs_paths] + for source in path_or_data[1:]: + fs_paths.append(_try_pyarrow_filesystem(source)[1]) + return_paths = fs_paths if fs is None: # Pyarrow did not support the protocol or storage options. @@ -1213,6 +1215,7 @@ def get_filepath_or_buffer( mode="rb", fs=None, iotypes=(BytesIO, NativeFile), + legacy_transfer=False, **kwargs, ): """Return either a filepath string to data, or a memory buffer of data. @@ -1265,7 +1268,9 @@ def get_filepath_or_buffer( else: path_or_data = [ BytesIO( - _fsspec_data_transfer( + fs.open(fpath, mode=mode).read() + if legacy_transfer + else _fsspec_data_transfer( fpath, fs=fs, mode=mode, **kwargs ) ) @@ -1278,8 +1283,9 @@ def get_filepath_or_buffer( if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer path_or_data = BytesIO( - # path_or_data.read() - _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) + path_or_data.read() + if legacy_transfer + else _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) ) return path_or_data, compression From 661ae584417a0423adbcced583f574f15e1716fa Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 13 Sep 2021 11:11:43 -0700 Subject: [PATCH 07/18] most tests passing with reasonable defaults - arrow_filesystem=True not stable --- python/cudf/cudf/io/parquet.py | 7 +- python/cudf/cudf/utils/ioutils.py | 41 +++++++++-- python/dask_cudf/dask_cudf/io/parquet.py | 88 +++++++++++++++--------- 3 files changed, 95 insertions(+), 41 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 9133c96e9f1..2a7d52d8d2f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -170,7 +170,7 @@ def _filter_row_groups(paths, fs, filters=None, row_groups=None): paths = ioutils.stringify_pathlike(paths[0]) # Convert filters to ds.Expression - if filters: + if filters is not None: filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset @@ -179,7 +179,7 @@ def _filter_row_groups(paths, fs, filters=None, row_groups=None): ) file_list = dataset.files - if filters: + if filters is not None: # Load IDs of filtered row groups for each file in dataset filtered_rg_ids = defaultdict(list) for fragment in dataset.get_fragments(filter=filters): @@ -278,6 +278,7 @@ def read_parquet( num_rows=None, strings_to_categorical=False, use_pandas_metadata=True, + legacy_transfer=False, *args, **kwargs, ): @@ -329,6 +330,7 @@ def read_parquet( and isinstance( filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, ) + and not legacy_transfer ): byte_ranges, footers, file_sizes = _get_byte_ranges( filepath_or_buffer, row_groups, columns, fs, @@ -352,6 +354,7 @@ def read_parquet( footer=footers[i] if footers else None, file_size=file_sizes[i] if file_sizes else None, add_par1_magic=True, + legacy_transfer=legacy_transfer, **kwargs, ) if compression is not None: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 37e4002e9f1..716bf0be1e1 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1130,9 +1130,6 @@ def _try_pyarrow_filesystem(path, storage_options=None): if storage_options: - import pdb - - pdb.set_trace() # Translate known s3 options _translation = { "anon": "anonymous", @@ -1159,7 +1156,7 @@ def _try_pyarrow_filesystem(path, storage_options=None): return fs, fs_path -def _get_filesystem_and_paths(path_or_data, arrow_filesystem=None, **kwargs): +def _get_filesystem_and_paths(path_or_data, arrow_filesystem=False, **kwargs): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the @@ -1168,7 +1165,8 @@ def _get_filesystem_and_paths(path_or_data, arrow_filesystem=None, **kwargs): fs = None return_paths = path_or_data if isinstance(path_or_data, str) or ( - isinstance(path_or_data, list) and isinstance(path_or_data[0], str) + isinstance(path_or_data, list) + and isinstance(stringify_pathlike(path_or_data[0]), str) ): # Ensure we are always working with a list storage_options = kwargs.get("storage_options") @@ -1180,7 +1178,7 @@ def _get_filesystem_and_paths(path_or_data, arrow_filesystem=None, **kwargs): else: path_or_data = [path_or_data] - if arrow_filesystem is not False: + if arrow_filesystem: # Try infering a pyarrow-backed filesystem fs, fs_paths = _try_pyarrow_filesystem( @@ -1282,6 +1280,8 @@ def get_filepath_or_buffer( elif not isinstance(path_or_data, iotypes) and is_file_like(path_or_data): if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer + if legacy_transfer: + path_or_data.seek(0) path_or_data = BytesIO( path_or_data.read() if legacy_transfer @@ -1522,7 +1522,8 @@ def _fsspec_data_transfer( footer=None, file_size=None, add_par1_magic=None, - bytes_per_thread=128_000_000, + bytes_per_thread=256_000_000, + max_gap=64_000, mode="rb", **kwargs, ): @@ -1550,6 +1551,11 @@ def _fsspec_data_transfer( buf = np.zeros(file_size, dtype="b") if byte_ranges: + # Optimize/merge the ranges + byte_ranges = _merge_ranges( + byte_ranges, max_block=bytes_per_thread, max_gap=max_gap, + ) + # Call multi-threaded data transfer of # remote byte-ranges to local buffer _read_byte_ranges( @@ -1581,6 +1587,27 @@ def _fsspec_data_transfer( return buf.tobytes() +def _merge_ranges(byte_ranges, max_block=256_000_000, max_gap=64_000): + # Simple utility to merge small/adjacent byte ranges + new_ranges = [] + if not byte_ranges: + # Early return + return new_ranges + + offset, size = byte_ranges[0] + for (new_offset, new_size) in byte_ranges[1:]: + gap = new_offset - (offset + size) + if gap > max_gap or (size + new_size + gap) > max_block: + # Gap is too large or total read is too large + new_ranges.append((offset, size)) + offset = new_offset + size = new_size + continue + size += new_size + gap + new_ranges.append((offset, size)) + return new_ranges + + def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): if fs is None: # We have an open fsspec file object diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index 1182ea0044b..33a4d8feffa 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,5 +1,6 @@ # Copyright (c) 2019-2020, NVIDIA CORPORATION. import warnings +from contextlib import ExitStack from functools import partial from io import BufferedWriter, BytesIO, IOBase @@ -73,30 +74,38 @@ def _read_paths( if row_groups == [None for path in paths]: row_groups = None - # Non-local filesystem handling - paths_or_fobs = paths - if not cudf.utils.ioutils._is_local_filesystem(fs): - # Convert paths to file objects for remote data - if arrow_filesystem: - paths_or_fobs = [ - arrow_filesystem.open_input_file(path) for path in paths - ] - else: - cache_type = "none" if columns or row_groups else "bytes" - paths_or_fobs = [ - fs.open(path, mode="rb", cache_type=cache_type) - for path in paths - ] - - # Use cudf to read in data - df = cudf.read_parquet( - paths_or_fobs, - engine="cudf", - columns=columns, - row_groups=row_groups if row_groups else None, - strings_to_categorical=strings_to_categorical, - **kwargs, - ) + with ExitStack() as stack: + + # Non-local filesystem handling + paths_or_fobs = paths + if not cudf.utils.ioutils._is_local_filesystem(fs): + + # Convert paths to file objects for remote data + if arrow_filesystem: + paths_or_fobs = [ + stack.enter_context( + arrow_filesystem.open_input_file(path) + ) + for path in paths + ] + else: + cache_type = "none" if columns or row_groups else "bytes" + paths_or_fobs = [ + stack.enter_context( + fs.open(path, mode="rb", cache_type=cache_type) + ) + for path in paths + ] + + # Use cudf to read in data + df = cudf.read_parquet( + paths_or_fobs, + engine="cudf", + columns=columns, + row_groups=row_groups if row_groups else None, + strings_to_categorical=strings_to_categorical, + **kwargs, + ) if partitions and partition_keys is None: @@ -155,13 +164,22 @@ def read_partition( categories=(), partitions=(), partitioning=None, + schema=None, **kwargs, ): + if columns is not None: columns = [c for c in columns] if isinstance(index, list): columns += index + # Check if we are actually selecting any columns + read_columns = columns + if schema and columns: + ignored = set(schema.names) - set(columns) + if not ignored: + read_columns = None + if not isinstance(pieces, list): pieces = [pieces] @@ -183,7 +201,7 @@ def read_partition( cls._read_paths( paths, fs, - columns=columns, + columns=read_columns, row_groups=rgs if rgs else None, strings_to_categorical=strings_to_cats, partitions=partitions, @@ -206,7 +224,7 @@ def read_partition( cls._read_paths( paths, fs, - columns=columns, + columns=read_columns, row_groups=rgs if rgs else None, strings_to_categorical=strings_to_cats, partitions=partitions, @@ -336,8 +354,9 @@ def read_parquet( columns=None, split_row_groups=None, row_groups_per_part=None, + arrow_filesystem=False, + legacy_transfer=False, read=None, - arrow_filesystem=None, **kwargs, ): """ Read parquet files into a Dask DataFrame @@ -370,20 +389,25 @@ def read_parquet( # Check if we should use an arrow-backed filesystem # on the workers (at IO time) - if arrow_filesystem is not False: + read_kwargs = (read or {}).copy() + if arrow_filesystem: arrow_filesystem = cudf.utils.ioutils._try_pyarrow_filesystem( path, kwargs.get("storage_options", {}), )[0] - if arrow_filesystem: - read_kwargs = (read or {}).copy() - read_kwargs["arrow_filesystem"] = arrow_filesystem - kwargs["read"] = read_kwargs + if arrow_filesystem: + read_kwargs["arrow_filesystem"] = arrow_filesystem + + # Check if we are using legacy approach to remote + # data transfer (single `read` call into host memory) + if legacy_transfer: + read_kwargs["legacy_transfer"] = legacy_transfer return dd.read_parquet( path, columns=columns, split_row_groups=split_row_groups, engine=CudfEngine, + read=read_kwargs, **kwargs, ) From 42c55c97a8529422ecf31fc9ad8e9e30ed7bc459 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 13 Sep 2021 11:47:15 -0700 Subject: [PATCH 08/18] fix bug --- python/cudf/cudf/io/parquet.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 2a7d52d8d2f..18daa8fe457 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -178,6 +178,8 @@ def _filter_row_groups(paths, fs, filters=None, row_groups=None): paths, filesystem=fs, format="parquet", partitioning="hive", ) file_list = dataset.files + if len(file_list) == 0: + raise FileNotFoundError(f"{paths} could not be resolved to any files") if filters is not None: # Load IDs of filtered row groups for each file in dataset @@ -318,7 +320,8 @@ def read_parquet( ) # Apply filters now (before converting non-local paths to buffers) - if fs is not None and (filters or need_byte_ranges): + # if fs is not None and (filters is not None or need_byte_ranges): + if fs is not None: filepath_or_buffer, row_groups = _filter_row_groups( filepath_or_buffer, fs, filters=filters, row_groups=row_groups, ) From ddec4df3be1cb3d71a262125dc44fa951d67d93d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 15 Sep 2021 14:48:05 -0700 Subject: [PATCH 09/18] legacy_transfer fix --- python/cudf/cudf/io/parquet.py | 14 ++++++++------ python/cudf/cudf/utils/ioutils.py | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 18daa8fe457..6500c2b7dae 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -328,13 +328,15 @@ def read_parquet( # Get required byte ranges (used with non-local fsspec filesystems) byte_ranges, footers, file_sizes = None, None, None - if need_byte_ranges or ( - filepath_or_buffer - and isinstance( - filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, + if ( + need_byte_ranges + or ( + filepath_or_buffer + and isinstance( + filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, + ) ) - and not legacy_transfer - ): + ) and not legacy_transfer: byte_ranges, footers, file_sizes = _get_byte_ranges( filepath_or_buffer, row_groups, columns, fs, ) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 716bf0be1e1..7cebe21ad48 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -14,7 +14,7 @@ import pandas as pd import pyarrow.fs as pa_fs from fsspec.core import get_fs_token_paths -from pyarrow._fs import LocalFileSystem +from pyarrow.fs import LocalFileSystem from pyarrow.lib import ArrowInvalid, NativeFile from cudf.utils.docutils import docfmt_partial From 36e4c52d51d93d2a7a9aa608cbe3ee4766f27376 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 15 Sep 2021 19:32:15 -0700 Subject: [PATCH 10/18] fix test failures --- python/cudf/cudf/tests/test_gcs.py | 6 +++++- python/cudf/cudf/utils/ioutils.py | 2 ++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/tests/test_gcs.py b/python/cudf/cudf/tests/test_gcs.py index 99d79e41520..03cd6c6f5cb 100644 --- a/python/cudf/cudf/tests/test_gcs.py +++ b/python/cudf/cudf/tests/test_gcs.py @@ -34,10 +34,14 @@ def test_read_csv(pdf, monkeypatch): fpath = TEST_BUCKET + "test_csv_reader.csv" buffer = pdf.to_csv(index=False) - def mock_open(*args): + def mock_open(*args, **kwargs): return io.BytesIO(buffer.encode()) + def mock_size(*args): + return len(buffer.encode()) + monkeypatch.setattr(gcsfs.core.GCSFileSystem, "open", mock_open) + monkeypatch.setattr(gcsfs.core.GCSFileSystem, "size", mock_size) got = cudf.read_csv("gcs://{}".format(fpath)) assert_eq(pdf, got) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 7cebe21ad48..c3169bede7d 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1247,6 +1247,8 @@ def get_filepath_or_buffer( paths = [path_or_data] if fs is None: fs, paths = _get_filesystem_and_paths(path_or_data, **kwargs) + if fs is None: + return path_or_data, compression if len(paths) == 0: raise FileNotFoundError( From 98efb5849396ecc938039283d6c916889c20aff8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 15 Sep 2021 19:55:39 -0700 Subject: [PATCH 11/18] plumb in csv support since most of the work is already done --- python/cudf/cudf/_lib/csv.pyx | 5 ++++- python/cudf/cudf/io/csv.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index 9912a7801a4..9fe99305c21 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -47,6 +47,8 @@ from cudf._lib.io.utils cimport make_sink_info, make_source_info from cudf._lib.table cimport Table, make_table_view from cudf._lib.utils cimport data_from_unique_ptr +from pyarrow.lib import NativeFile + ctypedef int32_t underlying_type_t_compression @@ -390,7 +392,8 @@ def read_csv( """ if not isinstance(datasource, (BytesIO, StringIO, bytes, - cudf._lib.io.datasource.Datasource)): + cudf._lib.io.datasource.Datasource, + NativeFile)): if not os.path.isfile(datasource): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), datasource diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 582c5324b8f..d7624c263b6 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -3,6 +3,7 @@ from io import BytesIO, StringIO from nvtx import annotate +from pyarrow.lib import NativeFile import cudf from cudf import _lib as libcudf @@ -60,7 +61,7 @@ def read_csv( filepath_or_buffer, compression = ioutils.get_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=compression, - iotypes=(BytesIO, StringIO), + iotypes=(BytesIO, StringIO, NativeFile), **kwargs, ) From 63dd615efbde67afda1d5bd4a3498178f0964fc5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 16 Sep 2021 09:29:05 -0700 Subject: [PATCH 12/18] remove unncessary BytesIO usage for optimized code path --- python/cudf/cudf/utils/ioutils.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index c3169bede7d..f7ad9bd0df1 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1267,12 +1267,10 @@ def get_filepath_or_buffer( path_or_data = [fs.open_input_file(fpath) for fpath in paths] else: path_or_data = [ - BytesIO( - fs.open(fpath, mode=mode).read() - if legacy_transfer - else _fsspec_data_transfer( - fpath, fs=fs, mode=mode, **kwargs - ) + BytesIO(fs.open(fpath, mode=mode).read()) + if legacy_transfer + else _fsspec_data_transfer( + fpath, fs=fs, mode=mode, **kwargs ) for fpath in paths ] @@ -1284,8 +1282,8 @@ def get_filepath_or_buffer( path_or_data = path_or_data.buffer if legacy_transfer: path_or_data.seek(0) - path_or_data = BytesIO( - path_or_data.read() + path_or_data = ( + BytesIO(path_or_data.read()) if legacy_transfer else _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) ) From 55245382cd3fe2133f91982232c99360e49f6a21 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 20 Sep 2021 12:15:40 -0700 Subject: [PATCH 13/18] avoid memory leaks in _read_byte_ranges --- python/cudf/cudf/utils/ioutils.py | 72 ++++++------------------------- 1 file changed, 14 insertions(+), 58 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index ef056cb65a3..1952d862966 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -5,7 +5,6 @@ import urllib import warnings from io import BufferedWriter, BytesIO, IOBase, TextIOWrapper -from queue import Queue from threading import Thread import fsspec @@ -1646,66 +1645,23 @@ def _assign_block(fs, path_or_fob, local_buffer, offset, nbytes): ) -class ReadBlockWorker(Thread): - def __init__(self, queue, fs, path_or_fob, local_buffer): - Thread.__init__(self) - self.queue = queue - self.fs = fs - self.path_or_fob = path_or_fob - self.local_buffer = local_buffer - - def run(self): - while True: - # Get the work from the queue and expand the tuple - offset, nbytes = self.queue.get() - try: - _assign_block( - self.fs, - self.path_or_fob, - self.local_buffer, - offset, - nbytes, - ) - finally: - self.queue.task_done() - - def _read_byte_ranges( - path_or_fob, ranges, local_buffer, fs=None, num_threads=None, **kwargs, + path_or_fob, ranges, local_buffer, fs=None, **kwargs, ): - - # If we have a fs object that supports `cat_ranges`, - # the ranges should be collected concurrently - if fs is not None and hasattr(fs, "cat_ranges"): - paths, starts, ends = [], [], [] - for offset, nbytes in ranges: - paths.append(path_or_fob) - starts.append(offset) - ends.append(offset + nbytes) - blocks = fs.cat_ranges(paths, starts, ends) - for block, (offset, nbytes) in zip(blocks, ranges): - local_buffer[offset : offset + nbytes] = np.frombuffer( - block, dtype="b", - ) - return - - # No reason to generate more threads than byte-ranges - if num_threads is None: - num_threads = len(ranges) - num_threads = min(num_threads, len(ranges)) - - if num_threads > 1: - queue = Queue() - for x in range(num_threads): - worker = ReadBlockWorker(queue, fs, path_or_fob, local_buffer) - worker.daemon = True - worker.start() - + # Simple utility to copy remote byte ranges + # into a local buffer for IO in libcudf + workers = [] for (offset, nbytes) in ranges: - if num_threads > 1: - queue.put((offset, nbytes)) + if len(ranges) > 1: + workers.append( + Thread( + target=_assign_block, + args=(fs, path_or_fob, local_buffer, offset, nbytes), + ) + ) + workers[-1].start() else: _assign_block(fs, path_or_fob, local_buffer, offset, nbytes) - if num_threads > 1: - queue.join() + for worker in workers: + worker.join() From fd2998a1dd19966b04bf2eca1aa57217fbd92dd8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 20 Sep 2021 14:16:07 -0700 Subject: [PATCH 14/18] avoid full-file transfer for read_csv with byte_range defined --- python/cudf/cudf/io/csv.py | 10 +++++++++- python/cudf/cudf/utils/ioutils.py | 9 +++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index fefe173ab96..43665fe6200 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -62,9 +62,17 @@ def read_csv( path_or_data=filepath_or_buffer, compression=compression, iotypes=(BytesIO, StringIO, NativeFile), + byte_ranges=[byte_range] if byte_range else None, + clip_dummy_buffer=True if byte_range else False, **kwargs, ) + # Adjust byte_range for clipped dummy buffers + use_byte_range = byte_range + if byte_range: + if byte_range[1] == len(filepath_or_buffer): + use_byte_range = (0, byte_range[1]) + if na_values is not None and is_scalar(na_values): na_values = [na_values] @@ -92,7 +100,7 @@ def read_csv( true_values=true_values, false_values=false_values, nrows=nrows, - byte_range=byte_range, + byte_range=use_byte_range, skip_blank_lines=skip_blank_lines, parse_dates=parse_dates, comment=comment, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 1952d862966..7cbd02a7b54 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1546,6 +1546,7 @@ def _fsspec_data_transfer( bytes_per_thread=256_000_000, max_gap=64_000, mode="rb", + clip_dummy_buffer=False, **kwargs, ): @@ -1605,6 +1606,14 @@ def _fsspec_data_transfer( path_or_fob, byte_ranges, buf, fs=fs, **kwargs, ) + if clip_dummy_buffer: + # If we only need the populated byte range + # (e.g. a csv byte-range read) then clip parts + # of the dummy buffer that are outside this range + start = byte_ranges[0][0] + end = byte_ranges[-1][0] + byte_ranges[-1][1] + return buf[start:end].tobytes() + return buf.tobytes() From d1cb7a698e93f06eb250832b1ea029cf71e01da6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 20 Sep 2021 14:45:42 -0700 Subject: [PATCH 15/18] avoid seeking before beginning of file --- python/cudf/cudf/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 9835dbb05bb..09cbb8ee53c 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -226,7 +226,7 @@ def _get_byte_ranges(file_list, row_groups, columns, fs): # # This "sample size" can be tunable, but should # always be >= 8 bytes (so we can read the footer size) - tail_size = 32_000 + tail_size = min(32_000, file_size) if fs is None: path.seek(file_size - tail_size) footer_sample = path.read(tail_size) From 491c69ffda15fb13c5e89a1154971dcdc39d626d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 20 Sep 2021 20:08:45 -0700 Subject: [PATCH 16/18] remove arrow_filesystem option from dask (for now) --- python/cudf/cudf/io/csv.py | 2 ++ python/cudf/cudf/io/parquet.py | 17 ++++------ python/cudf/cudf/utils/ioutils.py | 30 +++++++++-------- python/dask_cudf/dask_cudf/io/parquet.py | 41 ++++-------------------- 4 files changed, 31 insertions(+), 59 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 43665fe6200..21b54d88e68 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -46,6 +46,7 @@ def read_csv( na_filter=True, prefix=None, index_col=None, + arrow_filesystem=False, **kwargs, ): """{docstring}""" @@ -64,6 +65,7 @@ def read_csv( iotypes=(BytesIO, StringIO, NativeFile), byte_ranges=[byte_range] if byte_range else None, clip_dummy_buffer=True if byte_range else False, + arrow_filesystem=arrow_filesystem, **kwargs, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 09cbb8ee53c..c4cacaa087d 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -280,7 +280,7 @@ def read_parquet( num_rows=None, strings_to_categorical=False, use_pandas_metadata=True, - legacy_transfer=False, + arrow_filesystem=False, *args, **kwargs, ): @@ -328,15 +328,12 @@ def read_parquet( # Get required byte ranges (used with non-local fsspec filesystems) byte_ranges, footers, file_sizes = None, None, None - if ( - need_byte_ranges - or ( - filepath_or_buffer - and isinstance( - filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, - ) + if need_byte_ranges or ( + filepath_or_buffer + and isinstance( + filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, ) - ) and not legacy_transfer: + ): byte_ranges, footers, file_sizes = _get_byte_ranges( filepath_or_buffer, row_groups, columns, fs, ) @@ -359,7 +356,7 @@ def read_parquet( footer=footers[i] if footers else None, file_size=file_sizes[i] if file_sizes else None, add_par1_magic=True, - legacy_transfer=legacy_transfer, + arrow_filesystem=arrow_filesystem, **kwargs, ) if compression is not None: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 7cbd02a7b54..203b0b1cbd7 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1148,6 +1148,14 @@ def _try_pyarrow_filesystem(path, storage_options=None): except ArrowInvalid: # Protocol not supported return None, None + except OSError: + # Bucket not found + warnings.warn( + "This protocol may be supported by pyarrow. " + "However, `FileSystem.from_uri` failed. " + "Using fsspec. " + ) + return None, None if storage_options: @@ -1199,7 +1207,7 @@ def _get_filesystem_and_paths(path_or_data, arrow_filesystem=False, **kwargs): else: path_or_data = [path_or_data] - if arrow_filesystem: + if arrow_filesystem is True: # Try infering a pyarrow-backed filesystem fs, fs_paths = _try_pyarrow_filesystem( @@ -1211,6 +1219,11 @@ def _get_filesystem_and_paths(path_or_data, arrow_filesystem=False, **kwargs): fs_paths.append(_try_pyarrow_filesystem(source)[1]) return_paths = fs_paths + elif arrow_filesystem: + # The filesystem is already specified explicitly + fs = arrow_filesystem + return_paths = [p.split("//")[-1] for p in path_or_data] + if fs is None: # Pyarrow did not support the protocol or storage options. # Fall back to fsspec @@ -1234,7 +1247,6 @@ def get_filepath_or_buffer( mode="rb", fs=None, iotypes=(BytesIO, NativeFile), - legacy_transfer=False, **kwargs, ): """Return either a filepath string to data, or a memory buffer of data. @@ -1288,11 +1300,7 @@ def get_filepath_or_buffer( path_or_data = [fs.open_input_file(fpath) for fpath in paths] else: path_or_data = [ - BytesIO(fs.open(fpath, mode=mode).read()) - if legacy_transfer - else _fsspec_data_transfer( - fpath, fs=fs, mode=mode, **kwargs - ) + _fsspec_data_transfer(fpath, fs=fs, mode=mode, **kwargs) for fpath in paths ] if len(path_or_data) == 1: @@ -1301,13 +1309,7 @@ def get_filepath_or_buffer( elif not isinstance(path_or_data, iotypes) and is_file_like(path_or_data): if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer - if legacy_transfer: - path_or_data.seek(0) - path_or_data = ( - BytesIO(path_or_data.read()) - if legacy_transfer - else _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) - ) + path_or_data = _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) return path_or_data, compression diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index a03cb553827..850cc0843cc 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -65,7 +65,6 @@ def _read_paths( partitions=None, partitioning=None, partition_keys=None, - arrow_filesystem=None, **kwargs, ): @@ -80,21 +79,12 @@ def _read_paths( if not cudf.utils.ioutils._is_local_filesystem(fs): # Convert paths to file objects for remote data - if arrow_filesystem: - paths_or_fobs = [ - stack.enter_context( - arrow_filesystem.open_input_file(path) - ) - for path in paths - ] - else: - cache_type = "none" if columns or row_groups else "bytes" - paths_or_fobs = [ - stack.enter_context( - fs.open(path, mode="rb", cache_type=cache_type) - ) - for path in paths - ] + paths_or_fobs = [ + stack.enter_context( + fs.open(path, mode="rb", cache_type="none") + ) + for path in paths + ] # Use cudf to read in data df = cudf.read_parquet( @@ -355,9 +345,6 @@ def read_parquet( columns=None, split_row_groups=None, row_groups_per_part=None, - arrow_filesystem=False, - legacy_transfer=False, - read=None, **kwargs, ): """ Read parquet files into a Dask DataFrame @@ -388,27 +375,11 @@ def read_parquet( if split_row_groups is None: split_row_groups = row_groups_per_part - # Check if we should use an arrow-backed filesystem - # on the workers (at IO time) - read_kwargs = (read or {}).copy() - if arrow_filesystem: - arrow_filesystem = cudf.utils.ioutils._try_pyarrow_filesystem( - path, kwargs.get("storage_options", {}), - )[0] - if arrow_filesystem: - read_kwargs["arrow_filesystem"] = arrow_filesystem - - # Check if we are using legacy approach to remote - # data transfer (single `read` call into host memory) - if legacy_transfer: - read_kwargs["legacy_transfer"] = legacy_transfer - return dd.read_parquet( path, columns=columns, split_row_groups=split_row_groups, engine=CudfEngine, - read=read_kwargs, **kwargs, ) From 5994fd9c0d78eacd307f6cb2b6c7f84bfa3af8a6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 20 Sep 2021 21:07:52 -0700 Subject: [PATCH 17/18] save state --- python/cudf/cudf/utils/ioutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 203b0b1cbd7..e54f1f32f35 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -1569,7 +1569,7 @@ def _fsspec_data_transfer( if file_like: return path_or_fob.read() else: - return fs.open(path_or_fob, mode=mode, cache_type="bytes").read() + return fs.open(path_or_fob, mode=mode, cache_type="none").read() # Threaded read into "dummy" buffer buf = np.zeros(file_size, dtype="b") From a821ba774c08287117794ba0dd29d969d171d3d0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 22 Sep 2021 12:19:50 -0700 Subject: [PATCH 18/18] simplify PR to require NativeFile input (no more uri inference for now) --- python/cudf/cudf/io/csv.py | 10 +-- python/cudf/cudf/io/parquet.py | 49 +++++++----- python/cudf/cudf/utils/ioutils.py | 123 +++++++----------------------- 3 files changed, 62 insertions(+), 120 deletions(-) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 21b54d88e68..565a109eb79 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -46,7 +46,6 @@ def read_csv( na_filter=True, prefix=None, index_col=None, - arrow_filesystem=False, **kwargs, ): """{docstring}""" @@ -64,15 +63,14 @@ def read_csv( compression=compression, iotypes=(BytesIO, StringIO, NativeFile), byte_ranges=[byte_range] if byte_range else None, - clip_dummy_buffer=True if byte_range else False, - arrow_filesystem=arrow_filesystem, + clip_local_buffer=True if byte_range else False, **kwargs, ) - # Adjust byte_range for clipped dummy buffers + # Adjust byte_range for clipped local buffers use_byte_range = byte_range - if byte_range: - if byte_range[1] == len(filepath_or_buffer): + if byte_range and isinstance(filepath_or_buffer, BytesIO): + if byte_range[1] == filepath_or_buffer.getbuffer().nbytes: use_byte_range = (0, byte_range[1]) if na_values is not None and is_scalar(na_values): diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index c4cacaa087d..56cfd563435 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -6,7 +6,7 @@ from uuid import uuid4 import fsspec -from pyarrow import dataset as ds, fs as pa_fs, parquet as pq +from pyarrow import dataset as ds, parquet as pq import cudf from cudf._lib import parquet as libparquet @@ -162,7 +162,11 @@ def read_parquet_metadata(path): return num_rows, num_row_groups, col_names -def _filter_row_groups(paths, fs, filters=None, row_groups=None): +def _process_row_groups(paths, fs, filters=None, row_groups=None): + + # The general purpose of this function is to (1) expand + # directory input into a list of paths (using the pyarrow + # dataset API), and (2) to apply row-group filters. # Deal with case that the user passed in a directory name file_list = paths @@ -207,6 +211,18 @@ def _filter_row_groups(paths, fs, filters=None, row_groups=None): def _get_byte_ranges(file_list, row_groups, columns, fs): + # This utility is used to collect the footer metadata + # from a parquet file. This metadata is used to define + # the exact byte-ranges that will be needed to read the + # target column-chunks from the file. + # + # This utility is only used for remote storage. + # + # The calculated byte-range information is used within + # cudf.io.ioutils.get_filepath_or_buffer (which uses + # _fsspec_data_transfer to convert non-local fsspec file + # objects into local byte buffers). + if row_groups is None: if columns is None: return None, None, None # No reason to construct this @@ -280,7 +296,6 @@ def read_parquet( num_rows=None, strings_to_categorical=False, use_pandas_metadata=True, - arrow_filesystem=False, *args, **kwargs, ): @@ -311,23 +326,21 @@ def read_parquet( if fs is None and filters is not None: raise ValueError("cudf cannot apply filters to open file objects.") - # Check if we should calculate the specific byte-ranges - # needed for each parquet file. We do this when we have - # a file-system object to work with and it is not a local - # or pyarrow-backed filesystem object. - need_byte_ranges = fs is not None and not ( - ioutils._is_local_filesystem(fs) or isinstance(fs, pa_fs.FileSystem) - ) - - # Apply filters now (before converting non-local paths to buffers) - # if fs is not None and (filters is not None or need_byte_ranges): + # Apply filters now (before converting non-local paths to buffers). + # Note that `_process_row_groups` will also expand `filepath_or_buffer` + # into a full list of files if it is a directory. if fs is not None: - filepath_or_buffer, row_groups = _filter_row_groups( + filepath_or_buffer, row_groups = _process_row_groups( filepath_or_buffer, fs, filters=filters, row_groups=row_groups, ) - # Get required byte ranges (used with non-local fsspec filesystems) + # Check if we should calculate the specific byte-ranges + # needed for each parquet file. We always do this when we + # have a file-system object to work with and it is not a + # local filesystem object. We can also do it without a + # file-system object for `AbstractBufferedFile` buffers byte_ranges, footers, file_sizes = None, None, None + need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(fs) if need_byte_ranges or ( filepath_or_buffer and isinstance( @@ -342,8 +355,10 @@ def read_parquet( for i, source in enumerate(filepath_or_buffer): if ioutils.is_directory(source, **kwargs): + # Note: For now, we know `fs` is an fsspec filesystem + # object, but it may be an arrow object in the future fsspec_fs = ioutils._ensure_filesystem( - passed_filesystem=None, path=source + passed_filesystem=fs, path=source ) source = ioutils.stringify_pathlike(source) source = fsspec_fs.sep.join([source, "*.parquet"]) @@ -356,9 +371,9 @@ def read_parquet( footer=footers[i] if footers else None, file_size=file_sizes[i] if file_sizes else None, add_par1_magic=True, - arrow_filesystem=arrow_filesystem, **kwargs, ) + if compression is not None: raise ValueError( "URL content-encoding decompression is not supported" diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index e54f1f32f35..c2d55daabdf 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -3,7 +3,6 @@ import datetime import os import urllib -import warnings from io import BufferedWriter, BytesIO, IOBase, TextIOWrapper from threading import Thread @@ -11,10 +10,8 @@ import fsspec.implementations.local import numpy as np import pandas as pd -import pyarrow.fs as pa_fs from fsspec.core import get_fs_token_paths -from pyarrow.fs import LocalFileSystem -from pyarrow.lib import ArrowInvalid, NativeFile +from pyarrow.lib import NativeFile from cudf.utils.docutils import docfmt_partial @@ -1085,9 +1082,7 @@ def is_file_like(obj): def _is_local_filesystem(fs): - return isinstance( - fs, (fsspec.implementations.local.LocalFileSystem, LocalFileSystem), - ) + return isinstance(fs, fsspec.implementations.local.LocalFileSystem) def ensure_single_filepath_or_buffer(path_or_data, **kwargs): @@ -1137,55 +1132,7 @@ def is_directory(path_or_data, **kwargs): return False -def _try_pyarrow_filesystem(path, storage_options=None): - # Returns a pyarrow filesystem object and a normalized - # path if the url includes a protocol supported by - # arrow. The current motivation for this utility is to - # improve s3 read performance over fsspec (s3fs). - - try: - fs, fs_path = pa_fs.FileSystem.from_uri(path) - except ArrowInvalid: - # Protocol not supported - return None, None - except OSError: - # Bucket not found - warnings.warn( - "This protocol may be supported by pyarrow. " - "However, `FileSystem.from_uri` failed. " - "Using fsspec. " - ) - return None, None - - if storage_options: - - # Translate known s3 options - _translation = { - "anon": "anonymous", - "key": "access_key", - "token": "session_token", - } - translated_storage_options = { - _translation.get(k, k): v for k, v in storage_options.items() - } - - try: - fs = type(fs)(**translated_storage_options) - except TypeError: - # Warn the user if they are not using an available pyarrow - # filesystem class due to incompatible `storage_options` - warnings.warn( - f"This url protocol is supported by the {fs} " - f"FileSystem subclass. IO performance will likely " - f"improve by passing `storage_options` that are " - f"compatible with this FileSystem implementation." - ) - return None, None - - return fs, fs_path - - -def _get_filesystem_and_paths(path_or_data, arrow_filesystem=False, **kwargs): +def _get_filesystem_and_paths(path_or_data, **kwargs): # Returns a filesystem object and the filesystem-normalized # paths. If `path_or_data` does not correspond to a path or # list of paths (or if the protocol is not supported), the @@ -1207,36 +1154,18 @@ def _get_filesystem_and_paths(path_or_data, arrow_filesystem=False, **kwargs): else: path_or_data = [path_or_data] - if arrow_filesystem is True: - - # Try infering a pyarrow-backed filesystem - fs, fs_paths = _try_pyarrow_filesystem( - path_or_data[0], storage_options=storage_options + # Pyarrow did not support the protocol or storage options. + # Fall back to fsspec + try: + fs, _, fs_paths = fsspec.get_fs_token_paths( + path_or_data, mode="rb", storage_options=storage_options ) - if fs is not None: - fs_paths = [fs_paths] - for source in path_or_data[1:]: - fs_paths.append(_try_pyarrow_filesystem(source)[1]) - return_paths = fs_paths - - elif arrow_filesystem: - # The filesystem is already specified explicitly - fs = arrow_filesystem - return_paths = [p.split("//")[-1] for p in path_or_data] - - if fs is None: - # Pyarrow did not support the protocol or storage options. - # Fall back to fsspec - try: - fs, _, fs_paths = fsspec.get_fs_token_paths( - path_or_data, mode="rb", storage_options=storage_options - ) - return_paths = fs_paths - except ValueError as e: - if str(e).startswith("Protocol not known"): - return None, [] - else: - raise e + return_paths = fs_paths + except ValueError as e: + if str(e).startswith("Protocol not known"): + return None, [] + else: + raise e return fs, return_paths @@ -1295,21 +1224,21 @@ def get_filepath_or_buffer( path_or_data = paths if len(paths) > 1 else paths[0] else: - if isinstance(fs, pa_fs.FileSystem): - # We do not want to - path_or_data = [fs.open_input_file(fpath) for fpath in paths] - else: - path_or_data = [ + path_or_data = [ + BytesIO( _fsspec_data_transfer(fpath, fs=fs, mode=mode, **kwargs) - for fpath in paths - ] + ) + for fpath in paths + ] if len(path_or_data) == 1: path_or_data = path_or_data[0] elif not isinstance(path_or_data, iotypes) and is_file_like(path_or_data): if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer - path_or_data = _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) + path_or_data = BytesIO( + _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) + ) return path_or_data, compression @@ -1548,7 +1477,7 @@ def _fsspec_data_transfer( bytes_per_thread=256_000_000, max_gap=64_000, mode="rb", - clip_dummy_buffer=False, + clip_local_buffer=False, **kwargs, ): @@ -1571,7 +1500,7 @@ def _fsspec_data_transfer( else: return fs.open(path_or_fob, mode=mode, cache_type="none").read() - # Threaded read into "dummy" buffer + # Threaded read into "local" buffer buf = np.zeros(file_size, dtype="b") if byte_ranges: @@ -1608,10 +1537,10 @@ def _fsspec_data_transfer( path_or_fob, byte_ranges, buf, fs=fs, **kwargs, ) - if clip_dummy_buffer: + if clip_local_buffer: # If we only need the populated byte range # (e.g. a csv byte-range read) then clip parts - # of the dummy buffer that are outside this range + # of the local buffer that are outside this range start = byte_ranges[0][0] end = byte_ranges[-1][0] + byte_ranges[-1][1] return buf[start:end].tobytes()