Skip to content

Commit

Permalink
Add Arrow-NativeFile and PythonFile support to read_parquet and read_…
Browse files Browse the repository at this point in the history
…csv in cudf (#9304)

This PR implements a simple but critical subset of the the features implemented and discussed in #8961 and #9225. Note that I suggest those PRs be closed in favor of a few simpler PRs (like this one).

**What this PR DOES do**:

- Enables users to pass Arrow-based file objects directly to the cudf `read_parquet` and `read_csv` functions. For example:

```python
import cudf
import pyarrow.fs as pa_fs

fs, path = pa_fs.FileSystem.from_uri("s3://my-bucket/some-file.parquet")
with fs.open_input_file(path) as fil:
    gdf = cudf.read_parquet(fil)
```

- Adds automatic conversion of fsspec `AbstractBufferedFile` objects into Arrow-backed `PythonFile` objects. For `read_parquet`, an Arrow-backed `PythonFile` object can be used (in place of an optimized fsspec transfer) by passing `use_python_file_object=True`:

```python
import cudf

gdf = cudf.read_parquet(path, use_python_file_object=True)
```

or 

```python
import cudf
from fsspec.core import get_fs_token_paths

fs = get_fs_token_paths(path)[0]
with fs.open(path, mode="rb") as fil:
    gdf = cudf.read_parquet(fil, use_python_file_object=True)
```


**What this PR does NOT do**:

- cudf will **not** automatically produce "direct" (e.g. HadoopFileSystem/S3FileSystem-based) Arrow NativeFile objects for explicit file-path input. It is still up to the user to create/supply a direct NativeFile object to read_csv/parquet if they do not want any python overhead.
- cudf will **not** accept NativeFile input for IO functions other than read_csv and read_parquet
- dask-cudf does not yet have a mechanism to open/process s3 files as "direct" NativeFile objects - Those changes only apply to direct cudf usage


Props to @shridharathi for doing most of the work for this in #8961 (this PR only extends that work to include parquet and add tests).

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #9304
  • Loading branch information
rjzamora authored Oct 4, 2021
1 parent 8bd7d68 commit fb18491
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 30 deletions.
4 changes: 4 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/types.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ cdef extern from "cudf/io/datasource.hpp" \

cdef cppclass datasource:
pass

cdef cppclass arrow_io_source(datasource):
arrow_io_source(string arrow_uri) except +
arrow_io_source(shared_ptr[CRandomAccessFile]) except +
8 changes: 7 additions & 1 deletion python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ from libcpp.vector cimport vector

cimport cudf._lib.cpp.types as libcudf_types
from cudf._lib.cpp.types cimport data_type, type_id
from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource
from cudf._lib.types cimport dtype_to_data_type

import numpy as np
Expand Down Expand Up @@ -50,6 +51,8 @@ from cudf._lib.utils cimport (
table_view_from_table,
)

from pyarrow.lib import NativeFile

ctypedef int32_t underlying_type_t_compression


Expand Down Expand Up @@ -393,7 +396,8 @@ def read_csv(
"""

if not isinstance(datasource, (BytesIO, StringIO, bytes,
cudf._lib.io.datasource.Datasource)):
Datasource,
NativeFile)):
if not os.path.isfile(datasource):
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), datasource
Expand All @@ -403,6 +407,8 @@ def read_csv(
datasource = datasource.read().encode()
elif isinstance(datasource, str) and not os.path.isfile(datasource):
datasource = datasource.encode()
elif isinstance(datasource, NativeFile):
datasource = NativeFileDatasource(datasource)

validate_args(delimiter, sep, delim_whitespace, decimal, thousands,
nrows, skipfooter, byte_range, skiprows)
Expand Down
9 changes: 6 additions & 3 deletions python/cudf/cudf/_lib/io/datasource.pxd
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.memory cimport shared_ptr

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource


cdef class Datasource:

cdef datasource* get_datasource(self) nogil except *

cdef class NativeFileDatasource(Datasource):
cdef shared_ptr[arrow_io_source] c_datasource
cdef datasource* get_datasource(self) nogil
18 changes: 16 additions & 2 deletions python/cudf/cudf/_lib/io/datasource.pyx
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
# Copyright (c) 2020, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.memory cimport shared_ptr
from pyarrow.includes.libarrow cimport CRandomAccessFile
from pyarrow.lib cimport NativeFile

from cudf._lib.cpp.io.types cimport datasource
from cudf._lib.cpp.io.types cimport arrow_io_source, datasource


cdef class Datasource:
cdef datasource* get_datasource(self) nogil except *:
with gil:
raise NotImplementedError("get_datasource() should not "
+ "be directly invoked here")

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):

cdef shared_ptr[CRandomAccessFile] ra_src

ra_src = native_file.get_random_access_file()
self.c_datasource.reset(new arrow_io_source(ra_src))

cdef datasource* get_datasource(self) nogil:
return <datasource *> (self.c_datasource.get())
7 changes: 6 additions & 1 deletion python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata
from cudf._lib.cpp.table.table cimport table
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type
from cudf._lib.io.datasource cimport Datasource, NativeFileDatasource
from cudf._lib.io.utils cimport (
make_sink_info,
make_source_info,
update_struct_field_names,
)
from cudf._lib.utils cimport table_view_from_table

from pyarrow.lib import NativeFile


cdef class BufferArrayFromVector:
cdef Py_ssize_t length
Expand Down Expand Up @@ -115,7 +118,9 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from io import BytesIO, StringIO

from nvtx import annotate
from pyarrow.lib import NativeFile

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -60,7 +61,7 @@ def read_csv(
filepath_or_buffer, compression = ioutils.get_filepath_or_buffer(
path_or_data=filepath_or_buffer,
compression=compression,
iotypes=(BytesIO, StringIO),
iotypes=(BytesIO, StringIO, NativeFile),
byte_ranges=[byte_range] if byte_range else None,
clip_local_buffer=True if byte_range else False,
**kwargs,
Expand Down
23 changes: 14 additions & 9 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ def read_parquet(
num_rows=None,
strings_to_categorical=False,
use_pandas_metadata=True,
use_python_file_object=False,
*args,
**kwargs,
):
Expand Down Expand Up @@ -340,16 +341,19 @@ def read_parquet(
# local filesystem object. We can also do it without a
# file-system object for `AbstractBufferedFile` buffers
byte_ranges, footers, file_sizes = None, None, None
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(fs)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
if not use_python_file_object:
need_byte_ranges = fs is not None and not ioutils._is_local_filesystem(
fs
)
if need_byte_ranges or (
filepath_or_buffer
and isinstance(
filepath_or_buffer[0], fsspec.spec.AbstractBufferedFile,
)
):
byte_ranges, footers, file_sizes = _get_byte_ranges(
filepath_or_buffer, row_groups, columns, fs,
)

filepaths_or_buffers = []
for i, source in enumerate(filepath_or_buffer):
Expand All @@ -371,6 +375,7 @@ def read_parquet(
footer=footers[i] if footers else None,
file_size=file_sizes[i] if file_sizes else None,
add_par1_magic=True,
use_python_file_object=use_python_file_object,
**kwargs,
)

Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd
import pytest
from pyarrow import fs as pa_fs

import cudf
from cudf import read_csv
Expand Down Expand Up @@ -978,6 +979,17 @@ def test_csv_reader_filepath_or_buffer(tmpdir, path_or_buf, src):
assert_eq(expect, got)


def test_csv_reader_arrow_nativefile(path_or_buf):
# Check that we can read a file opened with the
# Arrow FileSystem inferface
expect = cudf.read_csv(path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)

assert_eq(expect, got)


def test_small_zip(tmpdir):
df = pd.DataFrame(
{
Expand Down
30 changes: 29 additions & 1 deletion python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import pandas as pd
import pyarrow as pa
import pytest
from fsspec.core import get_fs_token_paths
from packaging import version
from pyarrow import parquet as pq
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf.io.parquet import ParquetWriter, merge_parquet_filemetadata
Expand Down Expand Up @@ -678,6 +679,33 @@ def test_parquet_reader_filepath_or_buffer(parquet_path_or_buf, src):
assert_eq(expect, got)


def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
# Check that we can read a file opened with the
# Arrow FileSystem inferface
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_parquet(fil)

assert_eq(expect, got)


def test_parquet_reader_use_python_file_object(parquet_path_or_buf):
# Check that the non-default `use_python_file_object=True`
# option works as expected
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(fil, use_python_file_object=True)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(paths[0], use_python_file_object=True)
assert_eq(expect, got2)


def create_parquet_source(df, src_type, fname):
if src_type == "filepath":
df.to_parquet(fname, engine="pyarrow")
Expand Down
60 changes: 58 additions & 2 deletions python/cudf/cudf/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.fs as pa_fs
import pyarrow.orc
import pytest
from fsspec.core import get_fs_token_paths

import cudf
from cudf.testing._utils import assert_eq
Expand Down Expand Up @@ -138,6 +140,21 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread):
assert_eq(pdf, got)


def test_read_csv_arrow_nativefile(s3_base, s3so, pdf):
# Write to buffer
fname = "test_csv_reader_arrow_nativefile.csv"
bname = "csv"
buffer = pdf.to_csv(index=False)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
)
with fs.open_input_file("{}/{}".format(bname, fname)) as fil:
got = cudf.read_csv(fil)

assert_eq(pdf, got)


@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
def test_read_csv_byte_range(s3_base, s3so, pdf, bytes_per_thread):
# Write to buffer
Expand Down Expand Up @@ -180,19 +197,58 @@ def test_write_csv(s3_base, s3so, pdf, chunksize):

@pytest.mark.parametrize("bytes_per_thread", [32, 1024])
@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet(s3_base, s3so, pdf, bytes_per_thread, columns):
@pytest.mark.parametrize("use_python_file_object", [False, True])
def test_read_parquet(
s3_base, s3so, pdf, bytes_per_thread, columns, use_python_file_object
):
fname = "test_parquet_reader.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)

# Check direct path handling
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
got = cudf.read_parquet(
got1 = cudf.read_parquet(
"s3://{}/{}".format(bname, fname),
use_python_file_object=use_python_file_object,
storage_options=s3so,
bytes_per_thread=bytes_per_thread,
columns=columns,
)
expect = pdf[columns] if columns else pdf
assert_eq(expect, got1)

# Check fsspec file-object handling
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = get_fs_token_paths(
"s3://{}/{}".format(bname, fname), storage_options=s3so
)[0]
with fs.open("s3://{}/{}".format(bname, fname), mode="rb") as f:
got2 = cudf.read_parquet(
f,
use_python_file_object=use_python_file_object,
bytes_per_thread=bytes_per_thread,
columns=columns,
)
assert_eq(expect, got2)


@pytest.mark.parametrize("columns", [None, ["Float", "String"]])
def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns):
# Write to buffer
fname = "test_parquet_reader_arrow_nativefile.parquet"
bname = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}):
fs = pa_fs.S3FileSystem(
endpoint_override=s3so["client_kwargs"]["endpoint_url"],
)
with fs.open_input_file("{}/{}".format(bname, fname)) as fil:
got = cudf.read_parquet(fil, columns=columns)

expect = pdf[columns] if columns else pdf
assert_eq(expect, got)
Expand Down
Loading

0 comments on commit fb18491

Please sign in to comment.