diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 721d90f1f5b..6b68902d22f 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -119,3 +119,7 @@ cdef extern from "cudf/io/datasource.hpp" \ cdef cppclass datasource: pass + + cdef cppclass arrow_io_source(datasource): + arrow_io_source(string arrow_uri) except + + arrow_io_source(shared_ptr[CRandomAccessFile]) except + diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index be89bc0d17a..18efbcfa18b 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -9,6 +9,7 @@ from libcpp.vector cimport vector cimport cudf._lib.cpp.types as libcudf_types from cudf._lib.cpp.types cimport data_type, type_id +from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource from cudf._lib.types cimport dtype_to_data_type import numpy as np @@ -51,6 +52,8 @@ from cudf._lib.table cimport ( ) from cudf._lib.utils cimport data_from_unique_ptr +from pyarrow.lib import NativeFile + ctypedef int32_t underlying_type_t_compression @@ -394,7 +397,8 @@ def read_csv( """ if not isinstance(datasource, (BytesIO, StringIO, bytes, - cudf._lib.io.datasource.Datasource)): + Datasource, + NativeFile)): if not os.path.isfile(datasource): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), datasource @@ -404,6 +408,8 @@ def read_csv( datasource = datasource.read().encode() elif isinstance(datasource, str) and not os.path.isfile(datasource): datasource = datasource.encode() + elif isinstance(datasource, NativeFile): + datasource = NativeFileDatasource(datasource) validate_args(delimiter, sep, delim_whitespace, decimal, thousands, nrows, skipfooter, byte_range, skiprows) diff --git a/python/cudf/cudf/_lib/io/datasource.pxd b/python/cudf/cudf/_lib/io/datasource.pxd index 705a3600f68..a7a3731a0e6 100644 --- a/python/cudf/cudf/_lib/io/datasource.pxd +++ b/python/cudf/cudf/_lib/io/datasource.pxd @@ -1,10 +1,13 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport shared_ptr -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource cdef class Datasource: - cdef datasource* get_datasource(self) nogil except * + +cdef class NativeFileDatasource(Datasource): + cdef shared_ptr[arrow_io_source] c_datasource + cdef datasource* get_datasource(self) nogil diff --git a/python/cudf/cudf/_lib/io/datasource.pyx b/python/cudf/cudf/_lib/io/datasource.pyx index ddfd9a3540a..7402779a6ac 100644 --- a/python/cudf/cudf/_lib/io/datasource.pyx +++ b/python/cudf/cudf/_lib/io/datasource.pyx @@ -1,8 +1,10 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport shared_ptr +from pyarrow.includes.libarrow cimport CRandomAccessFile +from pyarrow.lib cimport NativeFile -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource cdef class Datasource: @@ -10,3 +12,15 @@ cdef class Datasource: with gil: raise NotImplementedError("get_datasource() should not " + "be directly invoked here") + +cdef class NativeFileDatasource(Datasource): + + def __cinit__(self, NativeFile native_file,): + + cdef shared_ptr[CRandomAccessFile] ra_src + + ra_src = native_file.get_random_access_file() + self.c_datasource.reset(new arrow_io_source(ra_src)) + + cdef datasource* get_datasource(self) nogil: + return (self.c_datasource.get()) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 2ae3c53cb1b..ba91e6362d8 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -56,6 +56,7 @@ from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata from cudf._lib.cpp.table.table cimport table from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport data_type, size_type +from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource from cudf._lib.io.utils cimport ( make_sink_info, make_source_info, @@ -63,6 +64,8 @@ from cudf._lib.io.utils cimport ( ) from cudf._lib.table cimport Table, table_view_from_table +from pyarrow.lib import NativeFile + cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -115,7 +118,9 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, cudf.io.parquet.read_parquet cudf.io.parquet.to_parquet """ - + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + filepaths_or_buffers[i] = NativeFileDatasource(datasource) cdef cudf_io_types.source_info source = make_source_info( filepaths_or_buffers) diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 966ede655c6..565a109eb79 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -3,6 +3,7 @@ from io import BytesIO, StringIO from nvtx import annotate +from pyarrow.lib import NativeFile import cudf from cudf import _lib as libcudf @@ -60,7 +61,7 @@ def read_csv( filepath_or_buffer, compression = ioutils.get_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=compression, - iotypes=(BytesIO, StringIO), + iotypes=(BytesIO, StringIO, NativeFile), byte_ranges=[byte_range] if byte_range else None, clip_local_buffer=True if byte_range else False, **kwargs, diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 56cfd563435..a60ec07b894 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -296,6 +296,7 @@ def read_parquet( num_rows=None, strings_to_categorical=False, use_pandas_metadata=True, + use_python_file_object=False, *args, **kwargs, ): @@ -340,16 +341,19 @@ def read_parquet( # local filesystem object. We can also do it without a # file-system object for `AbstractBufferedFile` buffers byte_ranges, footers, file_sizes = None, None, None - need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(fs) - if need_byte_ranges or ( - filepath_or_buffer - and isinstance( - filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, - ) - ): - byte_ranges, footers, file_sizes = _get_byte_ranges( - filepath_or_buffer, row_groups, columns, fs, + if not use_python_file_object: + need_byte_ranges = fs is not None and not ioutils._is_local_filesystem( + fs ) + if need_byte_ranges or ( + filepath_or_buffer + and isinstance( + filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile, + ) + ): + byte_ranges, footers, file_sizes = _get_byte_ranges( + filepath_or_buffer, row_groups, columns, fs, + ) filepaths_or_buffers = [] for i, source in enumerate(filepath_or_buffer): @@ -371,6 +375,7 @@ def read_parquet( footer=footers[i] if footers else None, file_size=file_sizes[i] if file_sizes else None, add_par1_magic=True, + use_python_file_object=use_python_file_object, **kwargs, ) diff --git a/python/cudf/cudf/tests/test_csv.py b/python/cudf/cudf/tests/test_csv.py index 0b8b6dd565f..9208b8c7cd4 100644 --- a/python/cudf/cudf/tests/test_csv.py +++ b/python/cudf/cudf/tests/test_csv.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd import pytest +from pyarrow import fs as pa_fs import cudf from cudf import read_csv @@ -978,6 +979,17 @@ def test_csv_reader_filepath_or_buffer(tmpdir, path_or_buf, src): assert_eq(expect, got) +def test_csv_reader_arrow_nativefile(path_or_buf): + # Check that we can read a file opened with the + # Arrow FileSystem inferface + expect = cudf.read_csv(path_or_buf("filepath")) + fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath")) + with fs.open_input_file(path) as fil: + got = cudf.read_csv(fil) + + assert_eq(expect, got) + + def test_small_zip(tmpdir): df = pd.DataFrame( { diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 21c385e3299..3db5037c224 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -13,8 +13,9 @@ import pandas as pd import pyarrow as pa import pytest +from fsspec.core import get_fs_token_paths from packaging import version -from pyarrow import parquet as pq +from pyarrow import fs as pa_fs, parquet as pq import cudf from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata @@ -678,6 +679,33 @@ def test_parquet_reader_filepath_or_buffer(parquet_path_or_buf, src): assert_eq(expect, got) +def test_parquet_reader_arrow_nativefile(parquet_path_or_buf): + # Check that we can read a file opened with the + # Arrow FileSystem inferface + expect = cudf.read_parquet(parquet_path_or_buf("filepath")) + fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath")) + with fs.open_input_file(path) as fil: + got = cudf.read_parquet(fil) + + assert_eq(expect, got) + + +def test_parquet_reader_use_python_file_object(parquet_path_or_buf): + # Check that the non-default `use_python_file_object=True` + # option works as expected + expect = cudf.read_parquet(parquet_path_or_buf("filepath")) + fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath")) + + # Pass open fsspec file + with fs.open(paths[0], mode="rb") as fil: + got1 = cudf.read_parquet(fil, use_python_file_object=True) + assert_eq(expect, got1) + + # Pass path only + got2 = cudf.read_parquet(paths[0], use_python_file_object=True) + assert_eq(expect, got2) + + def create_parquet_source(df, src_type, fname): if src_type == "filepath": df.to_parquet(fname, engine="pyarrow") diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 11ed68056b6..e23e8bbef89 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -10,8 +10,10 @@ import numpy as np import pandas as pd import pyarrow as pa +import pyarrow.fs as pa_fs import pyarrow.orc import pytest +from fsspec.core import get_fs_token_paths import cudf from cudf.testing._utils import assert_eq @@ -138,6 +140,21 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): assert_eq(pdf, got) +def test_read_csv_arrow_nativefile(s3_base, s3so, pdf): + # Write to buffer + fname = "test_csv_reader_arrow_nativefile.csv" + bname = "csv" + buffer = pdf.to_csv(index=False) + with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + fs = pa_fs.S3FileSystem( + endpoint_override=s3so["client_kwargs"]["endpoint_url"], + ) + with fs.open_input_file("{}/{}".format(bname, fname)) as fil: + got = cudf.read_csv(fil) + + assert_eq(pdf, got) + + @pytest.mark.parametrize("bytes_per_thread", [32, 1024]) def test_read_csv_byte_range(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer @@ -180,19 +197,58 @@ def test_write_csv(s3_base, s3so, pdf, chunksize): @pytest.mark.parametrize("bytes_per_thread", [32, 1024]) @pytest.mark.parametrize("columns", [None, ["Float", "String"]]) -def test_read_parquet(s3_base, s3so, pdf, bytes_per_thread, columns): +@pytest.mark.parametrize("use_python_file_object", [False, True]) +def test_read_parquet( + s3_base, s3so, pdf, bytes_per_thread, columns, use_python_file_object +): fname = "test_parquet_reader.parquet" bname = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) + + # Check direct path handling buffer.seek(0) with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): - got = cudf.read_parquet( + got1 = cudf.read_parquet( "s3://{}/{}".format(bname, fname), + use_python_file_object=use_python_file_object, storage_options=s3so, bytes_per_thread=bytes_per_thread, columns=columns, ) + expect = pdf[columns] if columns else pdf + assert_eq(expect, got1) + + # Check fsspec file-object handling + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + fs = get_fs_token_paths( + "s3://{}/{}".format(bname, fname), storage_options=s3so + )[0] + with fs.open("s3://{}/{}".format(bname, fname), mode="rb") as f: + got2 = cudf.read_parquet( + f, + use_python_file_object=use_python_file_object, + bytes_per_thread=bytes_per_thread, + columns=columns, + ) + assert_eq(expect, got2) + + +@pytest.mark.parametrize("columns", [None, ["Float", "String"]]) +def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): + # Write to buffer + fname = "test_parquet_reader_arrow_nativefile.parquet" + bname = "parquet" + buffer = BytesIO() + pdf.to_parquet(path=buffer) + buffer.seek(0) + with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + fs = pa_fs.S3FileSystem( + endpoint_override=s3so["client_kwargs"]["endpoint_url"], + ) + with fs.open_input_file("{}/{}".format(bname, fname)) as fil: + got = cudf.read_parquet(fil, columns=columns) expect = pdf[columns] if columns else pdf assert_eq(expect, got) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index e23318eb999..feae7ccd62d 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -11,6 +11,9 @@ import numpy as np import pandas as pd from fsspec.core import get_fs_token_paths +from pyarrow import PythonFile as ArrowPythonFile +from pyarrow.fs import FSSpecHandler, PyFileSystem +from pyarrow.lib import NativeFile from cudf.utils.docutils import docfmt_partial @@ -154,6 +157,10 @@ use_pandas_metadata : boolean, default True If True and dataset has custom PANDAS schema metadata, ensure that index columns are also loaded. +use_python_file_object : boolean, default False + If True, Arrow-backed PythonFile objects will be used in place of fsspec + AbstractBufferedFile objects at IO time. This option is likely to improve + performance when making small reads from larger parquet files. Returns ------- @@ -1180,7 +1187,9 @@ def get_filepath_or_buffer( compression, mode="rb", fs=None, - iotypes=(BytesIO,), + iotypes=(BytesIO, NativeFile), + byte_ranges=None, + use_python_file_object=False, **kwargs, ): """Return either a filepath string to data, or a memory buffer of data. @@ -1197,6 +1206,11 @@ def get_filepath_or_buffer( Mode in which file is opened iotypes : (), default (BytesIO) Object type to exclude from file-like check + byte_ranges : list, optional + List of known byte ranges that will be read from path_or_data + use_python_file_object : boolean, default False + If True, Arrow-backed PythonFile objects will be used in place + of fsspec AbstractBufferedFile objects. Returns ------- @@ -1229,21 +1243,38 @@ def get_filepath_or_buffer( path_or_data = paths if len(paths) > 1 else paths[0] else: - path_or_data = [ - BytesIO( - _fsspec_data_transfer(fpath, fs=fs, mode=mode, **kwargs) - ) - for fpath in paths - ] + if use_python_file_object: + pa_fs = PyFileSystem(FSSpecHandler(fs)) + path_or_data = [ + pa_fs.open_input_file(fpath) for fpath in paths + ] + else: + path_or_data = [ + BytesIO( + _fsspec_data_transfer( + fpath, + fs=fs, + mode=mode, + byte_ranges=byte_ranges, + **kwargs, + ) + ) + for fpath in paths + ] if len(path_or_data) == 1: path_or_data = path_or_data[0] elif not isinstance(path_or_data, iotypes) and is_file_like(path_or_data): if isinstance(path_or_data, TextIOWrapper): path_or_data = path_or_data.buffer - path_or_data = BytesIO( - _fsspec_data_transfer(path_or_data, mode=mode, **kwargs) - ) + if use_python_file_object: + path_or_data = ArrowPythonFile(path_or_data) + else: + path_or_data = BytesIO( + _fsspec_data_transfer( + path_or_data, mode=mode, byte_ranges=byte_ranges, **kwargs + ) + ) return path_or_data, compression