Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Optimize cudf/dask-cudf read_parquet for s3/remote filesystems #9225

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
82ab31d
save work related to byte-range collection
rjzamora Sep 9, 2021
ef02f3d
enable byte_ranges optimization for open file-like
rjzamora Sep 10, 2021
a32a7ae
fix bug for no column or row-group selection
rjzamora Sep 10, 2021
26b96f3
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 10, 2021
bd2e59a
add arrow_filesystem flag for dask_cudf
rjzamora Sep 10, 2021
53bb32e
use cat_ranges when available
rjzamora Sep 10, 2021
ada8451
expose arrow_filesystem and legacy_transfer
rjzamora Sep 11, 2021
661ae58
most tests passing with reasonable defaults - arrow_filesystem=True n…
rjzamora Sep 13, 2021
42c55c9
fix bug
rjzamora Sep 13, 2021
51021ab
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 13, 2021
ddec4df
legacy_transfer fix
rjzamora Sep 15, 2021
36e4c52
fix test failures
rjzamora Sep 16, 2021
98efb58
plumb in csv support since most of the work is already done
rjzamora Sep 16, 2021
63dd615
remove unncessary BytesIO usage for optimized code path
rjzamora Sep 16, 2021
40639c2
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 20, 2021
5524538
avoid memory leaks in _read_byte_ranges
rjzamora Sep 20, 2021
fd2998a
avoid full-file transfer for read_csv with byte_range defined
rjzamora Sep 20, 2021
d1cb7a6
avoid seeking before beginning of file
rjzamora Sep 20, 2021
491c69f
remove arrow_filesystem option from dask (for now)
rjzamora Sep 21, 2021
5994fd9
save state
rjzamora Sep 21, 2021
a821ba7
simplify PR to require NativeFile input (no more uri inference for now)
rjzamora Sep 22, 2021
587ee5b
Merge remote-tracking branch 'upstream/branch-21.12' into nativefile-…
rjzamora Sep 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ class arrow_io_source : public datasource {
*
* @param Apache Arrow Filesystem URI
*/

arrow_io_source() = default;
~arrow_io_source() = default;

explicit arrow_io_source(std::string_view arrow_uri)
{
const std::string uri_start_delimiter = "//";
Expand Down
5 changes: 5 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,8 @@ cdef extern from "cudf/io/datasource.hpp" \

cdef cppclass datasource:
pass

cdef cppclass arrow_io_source(datasource):
arrow_io_source() except +
arrow_io_source(string arrow_uri) except +
arrow_io_source(shared_ptr[CRandomAccessFile]) except +
5 changes: 4 additions & 1 deletion python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ from cudf._lib.table cimport (
)
from cudf._lib.utils cimport data_from_unique_ptr

from pyarrow.lib import NativeFile

ctypedef int32_t underlying_type_t_compression


Expand Down Expand Up @@ -394,7 +396,8 @@ def read_csv(
"""

if not isinstance(datasource, (BytesIO, StringIO, bytes,
cudf._lib.io.datasource.Datasource)):
cudf._lib.io.datasource.Datasource,
NativeFile)):
if not os.path.isfile(datasource):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), datasource
Expand Down
9 changes: 5 additions & 4 deletions python/cudf/cudf/_lib/io/datasource.pxd
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource


cdef class Datasource:

cdef datasource* get_datasource(self) nogil except *

cdef class NativeFileDatasource(Datasource):
cdef arrow_io_source c_datasource
cdef datasource* get_datasource(self) nogil
20 changes: 18 additions & 2 deletions python/cudf/cudf/_lib/io/datasource.pyx
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.memory cimport shared_ptr
from libcpp.utility cimport move
from pyarrow.includes.libarrow cimport CRandomAccessFile
from pyarrow.lib cimport NativeFile

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource, source_info


cdef class Datasource:
cdef datasource* get_datasource(self) nogil except *:
with gil:
raise NotImplementedError("get_datasource() should not "
+ "be directly invoked here")

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):

cdef shared_ptr[CRandomAccessFile] ra_src
cdef arrow_io_source arrow_src

ra_src = native_file.get_random_access_file()
self.c_datasource = arrow_io_source(ra_src)

cdef datasource* get_datasource(self) nogil:
return <datasource *> &(self.c_datasource)
11 changes: 9 additions & 2 deletions python/cudf/cudf/_lib/io/utils.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
from pyarrow.includes.libarrow cimport CRandomAccessFile
from pyarrow.lib cimport NativeFile

from cudf._lib.column cimport Column
from cudf._lib.cpp.io.types cimport (
arrow_io_source,
column_name_info,
data_sink,
datasource,
Expand All @@ -19,13 +22,15 @@ from cudf._lib.cpp.io.types cimport (
sink_info,
source_info,
)
from cudf._lib.io.datasource cimport Datasource
from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource

import codecs
import errno
import io
import os

from pyarrow.lib import NativeFile

import cudf
from cudf.api.types import is_struct_dtype

Expand All @@ -35,7 +40,6 @@ from cudf.api.types import is_struct_dtype
cdef source_info make_source_info(list src) except*:
if not src:
raise ValueError("Need to pass at least one source")

cdef const unsigned char[::1] c_buffer
cdef vector[host_buffer] c_host_buffers
cdef vector[string] c_files
Expand All @@ -60,6 +64,9 @@ cdef source_info make_source_info(list src) except*:
elif isinstance(src[0], Datasource):
csrc = src[0]
return source_info(csrc.get_datasource())
elif isinstance(src[0], NativeFile):
csrc = NativeFileDatasource(src[0])
return source_info(csrc.get_datasource())
elif isinstance(src[0], (int, float, complex, basestring, os.PathLike)):
# If source is a file, return source_info where type=FILEPATH
if not all(os.path.isfile(file) for file in src):
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import BytesIO, StringIO

from nvtx import annotate
from pyarrow.lib import NativeFile

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -60,7 +61,7 @@ def read_csv(
filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO),
iotypes=(BytesIO, StringIO, NativeFile),
byte_ranges=[byte_range] if byte_range else None,
clip_local_buffer=True if byte_range else False,
**kwargs,
Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd
from fsspec.core import get_fs_token_paths
from pyarrow.lib import NativeFile

from cudf.utils.docutils import docfmt_partial

Expand Down Expand Up @@ -1180,7 +1181,7 @@ def get_filepath_or_buffer(
compression,
mode="rb",
fs=None,
iotypes=(BytesIO,),
iotypes=(BytesIO, NativeFile),
**kwargs,
):
"""Return either a filepath string to data, or a memory buffer of data.
Expand Down