Skip to content

Commit

Permalink
Deprecate Arrow support in I/O (#16132)
Browse files Browse the repository at this point in the history
Contributes to #15193

Authors:
  - Thomas Li (https://github.com/lithomas1)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16132
  • Loading branch information
lithomas1 authored Jul 20, 2024
1 parent e169e8e commit 535db9b
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 132 deletions.
10 changes: 9 additions & 1 deletion python/cudf/cudf/_lib/pylibcudf/io/datasource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ from pyarrow.lib cimport NativeFile
from cudf._lib.pylibcudf.libcudf.io.arrow_io_source cimport arrow_io_source
from cudf._lib.pylibcudf.libcudf.io.datasource cimport datasource

import warnings


cdef class Datasource:
cdef datasource* get_datasource(self) except * nogil:
Expand All @@ -16,10 +18,16 @@ cdef class Datasource:

cdef class NativeFileDatasource(Datasource):

def __cinit__(self, NativeFile native_file,):
def __cinit__(self, NativeFile native_file):

cdef shared_ptr[CRandomAccessFile] ra_src

warnings.warn(
"Support for reading pyarrow's NativeFile is deprecated "
"and will be removed in a future release of cudf.",
FutureWarning,
)

ra_src = native_file.get_random_access_file()
self.c_datasource.reset(new arrow_io_source(ra_src))

Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def read_csv(
comment=None,
delim_whitespace=False,
byte_range=None,
use_python_file_object=True,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand Down
33 changes: 22 additions & 11 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from cudf._lib import orc as liborc
from cudf.api.types import is_list_like
from cudf.utils import ioutils
from cudf.utils.utils import maybe_filter_deprecation


def _make_empty_df(filepath_or_buffer, columns):
Expand Down Expand Up @@ -280,7 +281,7 @@ def read_orc(
num_rows=None,
use_index=True,
timestamp_type=None,
use_python_file_object=True,
use_python_file_object=None,
storage_options=None,
bytes_per_thread=None,
):
Expand Down Expand Up @@ -320,6 +321,9 @@ def read_orc(
)

filepaths_or_buffers = []
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
if ioutils.is_directory(
path_or_data=source, storage_options=storage_options
Expand Down Expand Up @@ -360,17 +364,24 @@ def read_orc(
stripes = selected_stripes

if engine == "cudf":
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
with maybe_filter_deprecation(
not have_nativefile,
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
return DataFrame._from_data(
*liborc.read_orc(
filepaths_or_buffers,
columns,
stripes,
skiprows,
num_rows,
use_index,
timestamp_type,
)
)
)
else:
from pyarrow import orc

Expand Down
40 changes: 26 additions & 14 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import dataset as ds

import cudf
Expand All @@ -23,6 +24,7 @@
from cudf.core.column import as_column, build_categorical_column, column_empty
from cudf.utils import ioutils
from cudf.utils.performance_tracking import _performance_tracking
from cudf.utils.utils import maybe_filter_deprecation

BYTE_SIZES = {
"kb": 1000,
Expand Down Expand Up @@ -350,7 +352,7 @@ def read_parquet_metadata(filepath_or_buffer):
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
use_python_file_object=None,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
Expand Down Expand Up @@ -532,7 +534,7 @@ def read_parquet(
filters=None,
row_groups=None,
use_pandas_metadata=True,
use_python_file_object=True,
use_python_file_object=None,
categorical_partitions=True,
open_file_options=None,
bytes_per_thread=None,
Expand Down Expand Up @@ -615,6 +617,9 @@ def read_parquet(
row_groups=row_groups,
fs=fs,
)
have_nativefile = any(
isinstance(source, pa.NativeFile) for source in filepath_or_buffer
)
for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
Expand Down Expand Up @@ -662,19 +667,26 @@ def read_parquet(
)

# Convert parquet data to a cudf.DataFrame
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)

# Don't want to warn if use_python_file_object causes us to get
# a NativeFile (there is a separate deprecation warning for that)
with maybe_filter_deprecation(
not have_nativefile,
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
df = _parquet_to_frame(
filepaths_or_buffers,
engine,
*args,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
**kwargs,
)
# Apply filters row-wise (if any are defined), and return
df = _apply_post_filters(df, filters)
if projected_columns:
Expand Down
21 changes: 5 additions & 16 deletions python/cudf/cudf/pylibcudf_tests/io/test_source_sink_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import io

import pyarrow as pa
import pytest

import cudf._lib.pylibcudf as plc
from cudf._lib.pylibcudf.io.datasource import NativeFileDatasource


@pytest.fixture(params=[plc.io.SourceInfo, plc.io.SinkInfo])
Expand All @@ -18,10 +16,8 @@ def _skip_invalid_sinks(io_class, sink):
"""
Skip invalid sinks for SinkInfo
"""
if io_class is plc.io.SinkInfo and isinstance(
sink, (bytes, NativeFileDatasource)
):
pytest.skip(f"{sink} is not a valid input for SinkInfo")
if io_class is plc.io.SinkInfo and isinstance(sink, bytes):
pytest.skip("bytes is not a valid input for SinkInfo")


@pytest.mark.parametrize(
Expand All @@ -30,7 +26,6 @@ def _skip_invalid_sinks(io_class, sink):
"a.txt",
b"hello world",
io.BytesIO(b"hello world"),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
)
def test_source_info_ctor(io_class, source, tmp_path):
Expand All @@ -47,13 +42,12 @@ def test_source_info_ctor(io_class, source, tmp_path):
@pytest.mark.parametrize(
"sources",
[
["a.txt"],
[b"hello world"],
[io.BytesIO(b"hello world")],
["a.txt", "a.txt"],
[b"hello world", b"hello there"],
[io.BytesIO(b"hello world"), io.BytesIO(b"hello there")],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
],
],
)
def test_source_info_ctor_multiple(io_class, sources, tmp_path):
Expand All @@ -79,11 +73,6 @@ def test_source_info_ctor_multiple(io_class, sources, tmp_path):
io.BytesIO(b"hello there"),
b"hello world",
],
[
NativeFileDatasource(pa.PythonFile(io.BytesIO(), mode="r")),
"awef.txt",
b"hello world",
],
],
)
def test_source_info_ctor_mixing_invalid(io_class, sources, tmp_path):
Expand Down
5 changes: 3 additions & 2 deletions python/cudf/cudf/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,9 @@ def test_csv_reader_arrow_nativefile(path_or_buf):
# Arrow FileSystem interface
expect = cudf.read_csv(path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)
with pytest.warns(FutureWarning):
with fs.open_input_file(path) as fil:
got = cudf.read_csv(fil)

assert_eq(expect, got)

Expand Down
3 changes: 2 additions & 1 deletion python/cudf/cudf/tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def mock_size(*args):
# use_python_file_object=True, because the pyarrow
# `open_input_file` command will fail (since it doesn't
# use the monkey-patched `open` definition)
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
with pytest.warns(FutureWarning):
got = cudf.read_csv(f"gcs://{fpath}", use_python_file_object=False)
assert_eq(pdf, got)

# AbstractBufferedFile -> PythonFile conversion
Expand Down
19 changes: 11 additions & 8 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ def test_parquet_reader_arrow_nativefile(parquet_path_or_buf):
expect = cudf.read_parquet(parquet_path_or_buf("filepath"))
fs, path = pa_fs.FileSystem.from_uri(parquet_path_or_buf("filepath"))
with fs.open_input_file(path) as fil:
got = cudf.read_parquet(fil)
with pytest.warns(FutureWarning):
got = cudf.read_parquet(fil)

assert_eq(expect, got)

Expand All @@ -726,16 +727,18 @@ def test_parquet_reader_use_python_file_object(
fs, _, paths = get_fs_token_paths(parquet_path_or_buf("filepath"))

# Pass open fsspec file
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
with fs.open(paths[0], mode="rb") as fil:
got1 = cudf.read_parquet(
fil, use_python_file_object=use_python_file_object
)
assert_eq(expect, got1)

# Pass path only
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
with pytest.warns(FutureWarning):
got2 = cudf.read_parquet(
paths[0], use_python_file_object=use_python_file_object
)
assert_eq(expect, got2)


Expand Down
Loading

0 comments on commit 535db9b

Please sign in to comment.