Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: rapidsai/cudf
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: bd302d773c50552531bc7f11f782f8ed876e8fab
Choose a base ref
...
head repository: rapidsai/cudf
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 0f07b0bb5e2cc89ca66e9d9639ff6ac961ec0471
Choose a head ref
  • 2 commits
  • 3 files changed
  • 2 contributors

Commits on Jul 30, 2024

  1. [Bug] Remove loud NativeFile deprecation noise for read_parquet f…

    …rom S3 (#16415)
    
    Important follow-up to #16132
    
    Without this PR, using `dask_cudf.read_parquet("s3://...", ...)` will
    result in loud deprecation warnings after `compute`/`persist` is called.
    This is because dask will always pass `NativeFile` objects down to cudf.
    
    My fault for missing this earlier!
    rjzamora authored Jul 30, 2024

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    5feeaf3 View commit details
  2. Enable prefetching before runpy (#16427)

    This PR enables prefetching before we execute the `runpy` module and
    script code.
    galipremsagar authored Jul 30, 2024

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature.
    Copy the full SHA
    0f07b0b View commit details
Showing with 138 additions and 45 deletions.
  1. +10 −11 python/cudf/cudf/pandas/__main__.py
  2. +44 −32 python/dask_cudf/dask_cudf/io/parquet.py
  3. +84 −2 python/dask_cudf/dask_cudf/io/tests/test_s3.py
21 changes: 10 additions & 11 deletions python/cudf/cudf/pandas/__main__.py
Original file line number Diff line number Diff line change
@@ -73,6 +73,16 @@ def main():
args = parser.parse_args()

rmm_mode = install()
if "managed" in rmm_mode:
for key in {
"column_view::get_data",
"mutable_column_view::get_data",
"gather",
"hash_join",
}:
from cudf._lib import pylibcudf

pylibcudf.experimental.enable_prefetching(key)
with profile(args.profile, args.line_profile, args.args[0]) as fn:
args.args[0] = fn
if args.module:
@@ -86,17 +96,6 @@ def main():
sys.argv[:] = args.args
runpy.run_path(args.args[0], run_name="__main__")

if "managed" in rmm_mode:
for key in {
"column_view::get_data",
"mutable_column_view::get_data",
"gather",
"hash_join",
}:
from cudf._lib import pylibcudf

pylibcudf.experimental.enable_prefetching(key)


if __name__ == "__main__":
main()
76 changes: 44 additions & 32 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
_is_local_filesystem,
_open_remote_files,
)
from cudf.utils.utils import maybe_filter_deprecation


class CudfEngine(ArrowDatasetEngine):
@@ -110,39 +111,50 @@ def _read_paths(
),
)

# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i]
if row_groups
else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
]
# Filter out deprecation warning unless the user
# specifies open_file_options and/or use_python_file_object.
# Otherwise, the FutureWarning is out of their control.
with maybe_filter_deprecation(
(
not open_file_options
and "use_python_file_object" not in kwargs
),
message="Support for reading pyarrow's NativeFile is deprecated",
category=FutureWarning,
):
# Use cudf to read in data
try:
df = cudf.read_parquet(
paths_or_fobs,
engine="cudf",
columns=columns,
row_groups=row_groups if row_groups else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
else:
raise err
except RuntimeError as err:
# TODO: Remove try/except after null-schema issue is resolved
# (See: https://github.com/rapidsai/cudf/issues/12702)
if len(paths_or_fobs) > 1:
df = cudf.concat(
[
cudf.read_parquet(
pof,
engine="cudf",
columns=columns,
row_groups=row_groups[i]
if row_groups
else None,
dataset_kwargs=dataset_kwargs,
categorical_partitions=False,
**kwargs,
)
for i, pof in enumerate(paths_or_fobs)
]
)
else:
raise err

# Apply filters (if any are defined)
df = _apply_post_filters(df, filters)
86 changes: 84 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_s3.py
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@
import pyarrow.fs as pa_fs
import pytest

from dask.dataframe import assert_eq

import dask_cudf

moto = pytest.importorskip("moto", minversion="3.1.6")
@@ -102,6 +104,11 @@ def s3_context(s3_base, bucket, files=None):
pass


@pytest.fixture
def pdf(scope="module"):
return pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]})


def test_read_csv(s3_base, s3so):
with s3_context(
s3_base=s3_base, bucket="daskcsv", files={"a.csv": b"a,b\n1,2\n3,4\n"}
@@ -112,6 +119,22 @@ def test_read_csv(s3_base, s3so):
assert df.a.sum().compute() == 4


def test_read_csv_warns(s3_base, s3so):
with s3_context(
s3_base=s3_base,
bucket="daskcsv_warns",
files={"a.csv": b"a,b\n1,2\n3,4\n"},
):
with pytest.warns(FutureWarning):
df = dask_cudf.read_csv(
"s3://daskcsv_warns/*.csv",
blocksize="50 B",
storage_options=s3so,
use_python_file_object=True,
)
assert df.a.sum().compute() == 4


@pytest.mark.parametrize(
"open_file_options",
[
@@ -120,8 +143,7 @@ def test_read_csv(s3_base, s3so):
{"open_file_func": None},
],
)
def test_read_parquet(s3_base, s3so, open_file_options):
pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [2.1, 2.2, 2.3, 2.4]})
def test_read_parquet_open_file_options(s3_base, s3so, open_file_options, pdf):
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
@@ -142,3 +164,63 @@ def test_read_parquet(s3_base, s3so, open_file_options):
assert df.a.sum().compute() == 10
with pytest.warns(FutureWarning):
assert df.b.sum().compute() == 9


def test_read_parquet(s3_base, s3so, pdf):
fname = "test_parquet_reader_dask.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
got = dask_cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
)
assert_eq(pdf, got)


def test_read_parquet_use_python_file_object(s3_base, s3so, pdf):
fname = "test_parquet_use_python_file_object.parquet"
bucket = "parquet"
buffer = BytesIO()
pdf.to_parquet(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
with pytest.warns(FutureWarning):
got = dask_cudf.read_parquet(
f"s3://{bucket}/{fname}",
storage_options=s3so,
read={"use_python_file_object": True},
).head()
assert_eq(pdf, got)


def test_read_orc(s3_base, s3so, pdf):
fname = "test_orc_reader_dask.orc"
bucket = "orc"
buffer = BytesIO()
pdf.to_orc(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
got = dask_cudf.read_orc(
f"s3://{bucket}/{fname}",
storage_options=s3so,
)
assert_eq(pdf, got)


def test_read_orc_use_python_file_object(s3_base, s3so, pdf):
fname = "test_orc_use_python_file_object.orc"
bucket = "orc"
buffer = BytesIO()
pdf.to_orc(path=buffer)
buffer.seek(0)
with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}):
with pytest.warns(FutureWarning):
got = dask_cudf.read_orc(
f"s3://{bucket}/{fname}",
storage_options=s3so,
use_python_file_object=True,
).head()
assert_eq(pdf, got)