diff --git a/cpp/include/cudf/io/datasource.hpp b/cpp/include/cudf/io/datasource.hpp index 93f68d43aff..712ffc97239 100644 --- a/cpp/include/cudf/io/datasource.hpp +++ b/cpp/include/cudf/io/datasource.hpp @@ -340,6 +340,10 @@ class arrow_io_source : public datasource { * * @param Apache Arrow Filesystem URI */ + + arrow_io_source() = default; + ~arrow_io_source() = default; + explicit arrow_io_source(std::string_view arrow_uri) { const std::string uri_start_delimiter = "//"; diff --git a/python/cudf/cudf/_lib/cpp/io/types.pxd b/python/cudf/cudf/_lib/cpp/io/types.pxd index 721d90f1f5b..4817cba9d74 100644 --- a/python/cudf/cudf/_lib/cpp/io/types.pxd +++ b/python/cudf/cudf/_lib/cpp/io/types.pxd @@ -119,3 +119,8 @@ cdef extern from "cudf/io/datasource.hpp" \ cdef cppclass datasource: pass + + cdef cppclass arrow_io_source(datasource): + arrow_io_source() except + + arrow_io_source(string arrow_uri) except + + arrow_io_source(shared_ptr[CRandomAccessFile]) except + diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index be89bc0d17a..b9aed0b3b77 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -51,6 +51,8 @@ from cudf._lib.table cimport ( ) from cudf._lib.utils cimport data_from_unique_ptr +from pyarrow.lib import NativeFile + ctypedef int32_t underlying_type_t_compression @@ -394,7 +396,8 @@ def read_csv( """ if not isinstance(datasource, (BytesIO, StringIO, bytes, - cudf._lib.io.datasource.Datasource)): + cudf._lib.io.datasource.Datasource, + NativeFile)): if not os.path.isfile(datasource): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), datasource diff --git a/python/cudf/cudf/_lib/io/datasource.pxd b/python/cudf/cudf/_lib/io/datasource.pxd index 705a3600f68..66aaad4a09b 100644 --- a/python/cudf/cudf/_lib/io/datasource.pxd +++ b/python/cudf/cudf/_lib/io/datasource.pxd @@ -1,10 +1,11 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr - -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource cdef class Datasource: - cdef datasource* get_datasource(self) nogil except * + +cdef class NativeFileDatasource(Datasource): + cdef arrow_io_source c_datasource + cdef datasource* get_datasource(self) nogil diff --git a/python/cudf/cudf/_lib/io/datasource.pyx b/python/cudf/cudf/_lib/io/datasource.pyx index ddfd9a3540a..72d64bb9ad2 100644 --- a/python/cudf/cudf/_lib/io/datasource.pyx +++ b/python/cudf/cudf/_lib/io/datasource.pyx @@ -1,8 +1,11 @@ # Copyright (c) 2020, NVIDIA CORPORATION. -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport shared_ptr +from libcpp.utility cimport move +from pyarrow.includes.libarrow cimport CRandomAccessFile +from pyarrow.lib cimport NativeFile -from cudf._lib.cpp.io.types cimport datasource +from cudf._lib.cpp.io.types cimport arrow_io_source, datasource, source_info cdef class Datasource: @@ -10,3 +13,16 @@ cdef class Datasource: with gil: raise NotImplementedError("get_datasource() should not " + "be directly invoked here") + +cdef class NativeFileDatasource(Datasource): + + def __cinit__(self, NativeFile native_file,): + + cdef shared_ptr[CRandomAccessFile] ra_src + cdef arrow_io_source arrow_src + + ra_src = native_file.get_random_access_file() + self.c_datasource = arrow_io_source(ra_src) + + cdef datasource* get_datasource(self) nogil: + return &(self.c_datasource) diff --git a/python/cudf/cudf/_lib/io/utils.pyx b/python/cudf/cudf/_lib/io/utils.pyx index 8e45011228a..67adad63d42 100644 --- a/python/cudf/cudf/_lib/io/utils.pyx +++ b/python/cudf/cudf/_lib/io/utils.pyx @@ -8,9 +8,12 @@ from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector +from pyarrow.includes.libarrow cimport CRandomAccessFile +from pyarrow.lib cimport NativeFile from cudf._lib.column cimport Column from cudf._lib.cpp.io.types cimport ( + arrow_io_source, column_name_info, data_sink, datasource, @@ -19,13 +22,15 @@ from cudf._lib.cpp.io.types cimport ( sink_info, source_info, ) -from cudf._lib.io.datasource cimport Datasource +from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource import codecs import errno import io import os +from pyarrow.lib import NativeFile + import cudf from cudf.api.types import is_struct_dtype @@ -35,7 +40,6 @@ from cudf.api.types import is_struct_dtype cdef source_info make_source_info(list src) except*: if not src: raise ValueError("Need to pass at least one source") - cdef const unsigned char[::1] c_buffer cdef vector[host_buffer] c_host_buffers cdef vector[string] c_files @@ -60,6 +64,9 @@ cdef source_info make_source_info(list src) except*: elif isinstance(src[0], Datasource): csrc = src[0] return source_info(csrc.get_datasource()) + elif isinstance(src[0], NativeFile): + csrc = NativeFileDatasource(src[0]) + return source_info(csrc.get_datasource()) elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)): # If source is a file, return source_info where type=FILEPATH if not all(os.path.isfile(file) for file in src): diff --git a/python/cudf/cudf/io/csv.py b/python/cudf/cudf/io/csv.py index 966ede655c6..565a109eb79 100644 --- a/python/cudf/cudf/io/csv.py +++ b/python/cudf/cudf/io/csv.py @@ -3,6 +3,7 @@ from io import BytesIO, StringIO from nvtx import annotate +from pyarrow.lib import NativeFile import cudf from cudf import _lib as libcudf @@ -60,7 +61,7 @@ def read_csv( filepath_or_buffer, compression = ioutils.get_filepath_or_buffer( path_or_data=filepath_or_buffer, compression=compression, - iotypes=(BytesIO, StringIO), + iotypes=(BytesIO, StringIO, NativeFile), byte_ranges=[byte_range] if byte_range else None, clip_local_buffer=True if byte_range else False, **kwargs, diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index e23318eb999..a7105f3f35b 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd from fsspec.core import get_fs_token_paths +from pyarrow.lib import NativeFile from cudf.utils.docutils import docfmt_partial @@ -1180,7 +1181,7 @@ def get_filepath_or_buffer( compression, mode="rb", fs=None, - iotypes=(BytesIO,), + iotypes=(BytesIO, NativeFile), **kwargs, ): """Return either a filepath string to data, or a memory buffer of data.