Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Arrow-NativeFile and PythonFile support to read_parquet and read_csv in cudf #9304

Merged
merged 34 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
82ab31d
save work related to byte-range collection
rjzamora Sep 9, 2021
ef02f3d
enable byte_ranges optimization for open file-like
rjzamora Sep 10, 2021
a32a7ae
fix bug for no column or row-group selection
rjzamora Sep 10, 2021
26b96f3
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 10, 2021
bd2e59a
add arrow_filesystem flag for dask_cudf
rjzamora Sep 10, 2021
53bb32e
use cat_ranges when available
rjzamora Sep 10, 2021
ada8451
expose arrow_filesystem and legacy_transfer
rjzamora Sep 11, 2021
661ae58
most tests passing with reasonable defaults - arrow_filesystem=True n…
rjzamora Sep 13, 2021
42c55c9
fix bug
rjzamora Sep 13, 2021
51021ab
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 13, 2021
ddec4df
legacy_transfer fix
rjzamora Sep 15, 2021
36e4c52
fix test failures
rjzamora Sep 16, 2021
98efb58
plumb in csv support since most of the work is already done
rjzamora Sep 16, 2021
63dd615
remove unncessary BytesIO usage for optimized code path
rjzamora Sep 16, 2021
40639c2
Merge remote-tracking branch 'upstream/branch-21.10' into nativefile-…
rjzamora Sep 20, 2021
5524538
avoid memory leaks in _read_byte_ranges
rjzamora Sep 20, 2021
fd2998a
avoid full-file transfer for read_csv with byte_range defined
rjzamora Sep 20, 2021
d1cb7a6
avoid seeking before beginning of file
rjzamora Sep 20, 2021
491c69f
remove arrow_filesystem option from dask (for now)
rjzamora Sep 21, 2021
5994fd9
save state
rjzamora Sep 21, 2021
a821ba7
simplify PR to require NativeFile input (no more uri inference for now)
rjzamora Sep 22, 2021
587ee5b
Merge remote-tracking branch 'upstream/branch-21.12' into nativefile-…
rjzamora Sep 24, 2021
ab18cab
add test coverage (csv not passing yet)
rjzamora Sep 24, 2021
1095029
fixng datasource bug - requires code duplication for now
rjzamora Sep 24, 2021
d0a17c4
add s3-specific tests
rjzamora Sep 24, 2021
352a475
Merge remote-tracking branch 'upstream/branch-21.12' into native-file…
rjzamora Sep 29, 2021
183f930
add automatic conversion of fsspec AbstractBufferedFile objects to Ar…
rjzamora Sep 29, 2021
731799d
change use_python_file_object default to False (will need to enable f…
rjzamora Sep 30, 2021
43964e9
add s3 test for use_python_file_object
rjzamora Sep 30, 2021
42040a7
Merge remote-tracking branch 'upstream/branch-21.12' into native-file…
rjzamora Oct 1, 2021
dde7958
address import code review
rjzamora Oct 1, 2021
93376bd
remove unused changes
rjzamora Oct 4, 2021
7375af5
avoid default ctor for arrow_io_source
rjzamora Oct 4, 2021
c3591c8
Update cpp/include/cudf/io/datasource.hpp
rjzamora Oct 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class arrow_io_source : public datasource {
*
* @param Apache Arrow Filesystem URI
*/

rjzamora marked this conversation as resolved.
Show resolved Hide resolved
explicit arrow_io_source(std::string_view arrow_uri)
{
const std::string uri_start_delimiter = "//";
Expand Down
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ cdef extern from "cudf/io/datasource.hpp" \

cdef cppclass datasource:
pass

cdef cppclass arrow_io_source(datasource):
arrow_io_source(string arrow_uri) except +
arrow_io_source(shared_ptr[CRandomAccessFile]) except +
8 changes: 7 additions & 1 deletion python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ from libcpp.vector cimport vector

cimport cudf._lib.cpp.types as libcudf_types
from cudf._lib.cpp.types cimport data_type, type_id
from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
from cudf._lib.types cimport dtype_to_data_type

import numpy as np
Expand Down Expand Up @@ -51,6 +52,8 @@ from cudf._lib.table cimport (
)
from cudf._lib.utils cimport data_from_unique_ptr

from pyarrow.lib import NativeFile

ctypedef int32_t underlying_type_t_compression


Expand Down Expand Up @@ -394,7 +397,8 @@ def read_csv(
"""

if not isinstance(datasource, (BytesIO, StringIO, bytes,
cudf._lib.io.datasource.Datasource)):
Datasource,
NativeFile)):
if not os.path.isfile(datasource):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), datasource
Expand All @@ -404,6 +408,8 @@ def read_csv(
datasource = datasource.read().encode()
elif isinstance(datasource, str) and not os.path.isfile(datasource):
datasource = datasource.encode()
elif isinstance(datasource, NativeFile):
datasource = NativeFileDatasource(datasource)

validate_args(delimiter, sep, delim_whitespace, decimal, thousands,
nrows, skipfooter, byte_range, skiprows)
Expand Down
9 changes: 6 additions & 3 deletions python/cudf/cudf/_lib/io/datasource.pxd
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.memory cimport shared_ptr

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource


cdef class Datasource:

cdef datasource* get_datasource(self) nogil except *

cdef class NativeFileDatasource(Datasource):
cdef shared_ptr[arrow_io_source] c_datasource
cdef datasource* get_datasource(self) nogil
18 changes: 16 additions & 2 deletions python/cudf/cudf/_lib/io/datasource.pyx
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.memory cimport shared_ptr
from pyarrow.includes.libarrow cimport CRandomAccessFile
from pyarrow.lib cimport NativeFile

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource


cdef class Datasource:
cdef datasource* get_datasource(self) nogil except *:
with gil:
raise NotImplementedError("get_datasource() should not "
+ "be directly invoked here")

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):

cdef shared_ptr[CRandomAccessFile] ra_src

ra_src = native_file.get_random_access_file()
self.c_datasource.reset(new arrow_io_source(ra_src))

cdef datasource* get_datasource(self) nogil:
return <datasource *> (self.c_datasource.get())
7 changes: 6 additions & 1 deletion python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type
from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource
from cudf._lib.io.utils cimport (
make_sink_info,
make_source_info,
update_struct_field_names,
)
from cudf._lib.table cimport Table, table_view_from_table

from pyarrow.lib import NativeFile


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
Expand Down Expand Up @@ -115,7 +118,9 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import BytesIO, StringIO

from nvtx import annotate
from pyarrow.lib import NativeFile

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -60,7 +61,7 @@ def read_csv(
filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO),
iotypes=(BytesIO, StringIO, NativeFile),
byte_ranges=[byte_range] if byte_range else None,
clip_local_buffer=True if byte_range else False,
**kwargs,
Expand Down
23 changes: 14 additions & 9 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def read_parquet(
num_rows=None,
strings_to_categorical=False,
use_pandas_metadata=True,
use_python_file_object=False,
*args,
**kwargs,
):
Expand Down Expand Up @@ -340,16 +341,19 @@ def read_parquet(
# local filesystem object. We can also do it without a
# file-system object for `AbstractBufferedFile` buffers
byte_ranges, footers, file_sizes = None, None, None
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(fs)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
if not use_python_file_object:
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(
fs
)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
)

filepaths_or_buffers = []
for i, source in enumerate(filepath_or_buffer):
Expand All @@ -371,6 +375,7 @@ def read_parquet(
footer=footers[i] if footers else None,
file_size=file_sizes[i] if file_sizes else None,
add_par1_magic=True,
use_python_file_object=use_python_file_object,
**kwargs,
)

Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd
import pytest
from pyarrow import fs as pa_fs

import cudf
from cudf import read_csv
Expand Down Expand Up @@ -978,6 +979,17 @@ def test_csv_reader_filepath_or_buffer(tmpdir, path_or_buf, src):
assert_eq(expect, got)


def test_csv_reader_arrow_nativefile(path_or_buf):
# Check that we can read a file opened with the
# Arrow FileSystem inferface
expect = cudf.read_csv(path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)

assert_eq(expect, got)


def test_small_zip(tmpdir):
df = pd.DataFrame(
{
Expand Down
30 changes: 29 additions & 1 deletion python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import pandas as pd
import pyarrow as pa
import pytest
from fsspec.core import get_fs_token_paths
from packaging import version
from pyarrow import parquet as pq
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata
Expand Down Expand Up @@ -678,6 +679,33 @@ def test_parquet_reader_filepath_or_buffer(parquet_path_or_buf, src):
assert_eq(expect, got)


def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
# Check that we can read a file opened with the
# Arrow FileSystem inferface
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_parquet(fil)

assert_eq(expect, got)


def test_parquet_reader_use_python_file_object(parquet_path_or_buf):
# Check that the non-default `use_python_file_object=True`
# option works as expected
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(fil, use_python_file_object=True)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(paths[0], use_python_file_object=True)
assert_eq(expect, got2)


def create_parquet_source(df, src_type, fname):
if src_type == "filepath":
df.to_parquet(fname, engine="pyarrow")
Expand Down
60 changes: 58 additions & 2 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.fs as pa_fs
import pyarrow.orc
import pytest
from fsspec.core import get_fs_token_paths

import cudf
from cudf.testing._utils import assert_eq
Expand Down Expand Up @@ -138,6 +140,21 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
assert_eq(pdf, got)


def test_read_csv_arrow_nativefile(s3_base, s3so, pdf):
# Write to buffer
fname = "test_csv_reader_arrow_nativefile.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
)
with fs.open_input_file("{}/{}".format(bname, fname)) as fil:
got = cudf.read_csv(fil)

assert_eq(pdf, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv_byte_range(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
Expand Down Expand Up @@ -180,19 +197,58 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):

@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet(s3_base, s3so, pdf, bytes_per_thread, columns):
@pytest.mark.parametrize("use_python_file_object", [False, True])
def test_read_parquet(
s3_base, s3so, pdf, bytes_per_thread, columns, use_python_file_object
):
fname = "test_parquet_reader.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)

# Check direct path handling
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
columns=columns,
)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got1)

# Check fsspec file-object handling
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = get_fs_token_paths(
"s3://{}/{}".format(bname, fname), storage_options=s3so
)[0]
with fs.open("s3://{}/{}".format(bname, fname), mode="rb") as f:
got2 = cudf.read_parquet(
f,
use_python_file_object=use_python_file_object,
bytes_per_thread=bytes_per_thread,
columns=columns,
)
assert_eq(expect, got2)


@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns):
# Write to buffer
fname = "test_parquet_reader_arrow_nativefile.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
)
with fs.open_input_file("{}/{}".format(bname, fname)) as fil:
got = cudf.read_parquet(fil, columns=columns)

expect = pdf[columns] if columns else pdf
assert_eq(expect, got)
Expand Down
Loading