diff --git a/ci/run_cudf_polars_polars_tests.sh b/ci/run_cudf_polars_polars_tests.sh index 49437510c7e..b1bfac2a1dd 100755 --- a/ci/run_cudf_polars_polars_tests.sh +++ b/ci/run_cudf_polars_polars_tests.sh @@ -13,6 +13,8 @@ DESELECTED_TESTS=( "tests/unit/test_cpu_check.py::test_check_cpu_flags_skipped_no_flags" # Mock library error "tests/docs/test_user_guide.py" # No dot binary in CI image "tests/unit/test_polars_import.py::test_fork_safety" # test started to hang in polars-1.14 + "tests/unit/operations/test_join.py::test_join_4_columns_with_validity" # fails in some systems, see https://github.com/pola-rs/polars/issues/19870 + "tests/unit/io/test_csv.py::test_read_web_file" # fails in rockylinux8 due to SSL CA issues ) if [[ $(arch) == "aarch64" ]]; then diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 7fdaff35525..ca2bdc24b25 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -286,6 +286,12 @@ ConfigureNVBench( ConfigureBench(HASHING_BENCH hashing/partition.cpp) ConfigureNVBench(HASHING_NVBENCH hashing/hash.cpp) +# ################################################################################################## +# * interop benchmark ------------------------------------------------------------------------------ +ConfigureNVBench(INTEROP_NVBENCH interop/interop.cpp) +target_link_libraries(INTEROP_NVBENCH PRIVATE nanoarrow) +target_include_directories(INTEROP_NVBENCH PRIVATE ${CMAKE_SOURCE_DIR}/tests/interop) + # ################################################################################################## # * merge benchmark ------------------------------------------------------------------------------- ConfigureBench(MERGE_BENCH merge/merge.cpp) diff --git a/cpp/benchmarks/interop/interop.cpp b/cpp/benchmarks/interop/interop.cpp new file mode 100644 index 00000000000..dad7e6f429e --- /dev/null +++ b/cpp/benchmarks/interop/interop.cpp @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include + +template +void BM_to_arrow_device(nvbench::state& state, nvbench::type_list>) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const num_columns = static_cast(state.get_int64("num_columns")); + auto const num_elements = static_cast(num_rows) * num_columns; + + std::vector types(num_columns, data_type); + + auto const table = create_random_table(types, row_count{num_rows}); + int64_t const size_bytes = estimate_size(table->view()); + + state.add_element_count(num_elements, "num_elements"); + state.add_global_memory_reads(size_bytes); + state.add_global_memory_writes(size_bytes); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::to_arrow_device(table->view(), rmm::cuda_stream_view{launch.get_stream()}); + }); +} + +template +void BM_to_arrow_host(nvbench::state& state, nvbench::type_list>) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const num_columns = static_cast(state.get_int64("num_columns")); + auto const num_elements = static_cast(num_rows) * num_columns; + + std::vector types(num_columns, data_type); + + auto const table = create_random_table(types, row_count{num_rows}); + int64_t const size_bytes = estimate_size(table->view()); + + state.add_element_count(num_elements, "num_elements"); + state.add_global_memory_reads(size_bytes); + state.add_global_memory_writes(size_bytes); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::to_arrow_host(table->view(), rmm::cuda_stream_view{launch.get_stream()}); + }); +} + +template +void BM_from_arrow_device(nvbench::state& state, nvbench::type_list>) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const num_columns = static_cast(state.get_int64("num_columns")); + auto const num_elements = static_cast(num_rows) * num_columns; + + std::vector types(num_columns, data_type); + + data_profile profile; + profile.set_struct_depth(1); + profile.set_list_depth(1); + + auto const table = create_random_table(types, row_count{num_rows}, profile); + cudf::table_view table_view = table->view(); + int64_t const size_bytes = estimate_size(table_view); + + std::vector table_metadata; + + std::transform(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_columns), + std::back_inserter(table_metadata), + [&](auto const column) { + cudf::column_metadata column_metadata{""}; + column_metadata.children_meta = std::vector( + table->get_column(column).num_children(), cudf::column_metadata{""}); + return column_metadata; + }); + + cudf::unique_schema_t schema = cudf::to_arrow_schema(table_view, table_metadata); + cudf::unique_device_array_t input = cudf::to_arrow_device(table_view); + + state.add_element_count(num_elements, "num_elements"); + state.add_global_memory_reads(size_bytes); + state.add_global_memory_writes(size_bytes); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::from_arrow_device_column( + schema.get(), input.get(), rmm::cuda_stream_view{launch.get_stream()}); + }); +} + +template +void BM_from_arrow_host(nvbench::state& state, nvbench::type_list>) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const num_columns = static_cast(state.get_int64("num_columns")); + auto const num_elements = static_cast(num_rows) * num_columns; + + std::vector types(num_columns, data_type); + + data_profile profile; + profile.set_struct_depth(1); + profile.set_list_depth(1); + + auto const table = create_random_table(types, row_count{num_rows}, profile); + cudf::table_view table_view = table->view(); + int64_t const size_bytes = estimate_size(table_view); + + std::vector table_metadata; + + std::transform(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_columns), + std::back_inserter(table_metadata), + [&](auto const column) { + cudf::column_metadata column_metadata{""}; + column_metadata.children_meta = std::vector( + table->get_column(column).num_children(), cudf::column_metadata{""}); + return column_metadata; + }); + + cudf::unique_schema_t schema = cudf::to_arrow_schema(table_view, table_metadata); + cudf::unique_device_array_t input = cudf::to_arrow_host(table_view); + + state.add_element_count(num_elements, "num_elements"); + state.add_global_memory_reads(size_bytes); + state.add_global_memory_writes(size_bytes); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + cudf::from_arrow_host_column( + schema.get(), input.get(), rmm::cuda_stream_view{launch.get_stream()}); + }); +} + +using data_types = nvbench::enum_type_list; + +static char const* stringify_type(cudf::type_id value) +{ + switch (value) { + case cudf::type_id::INT8: return "INT8"; + case cudf::type_id::INT16: return "INT16"; + case cudf::type_id::INT32: return "INT32"; + case cudf::type_id::INT64: return "INT64"; + case cudf::type_id::UINT8: return "UINT8"; + case cudf::type_id::UINT16: return "UINT16"; + case cudf::type_id::UINT32: return "UINT32"; + case cudf::type_id::UINT64: return "UINT64"; + case cudf::type_id::FLOAT32: return "FLOAT32"; + case cudf::type_id::FLOAT64: return "FLOAT64"; + case cudf::type_id::BOOL8: return "BOOL8"; + case cudf::type_id::TIMESTAMP_DAYS: return "TIMESTAMP_DAYS"; + case cudf::type_id::TIMESTAMP_SECONDS: return "TIMESTAMP_SECONDS"; + case cudf::type_id::TIMESTAMP_MILLISECONDS: return "TIMESTAMP_MILLISECONDS"; + case cudf::type_id::TIMESTAMP_MICROSECONDS: return "TIMESTAMP_MICROSECONDS"; + case cudf::type_id::TIMESTAMP_NANOSECONDS: return "TIMESTAMP_NANOSECONDS"; + case cudf::type_id::DURATION_DAYS: return "DURATION_DAYS"; + case cudf::type_id::DURATION_SECONDS: return "DURATION_SECONDS"; + case cudf::type_id::DURATION_MILLISECONDS: return "DURATION_MILLISECONDS"; + case cudf::type_id::DURATION_MICROSECONDS: return "DURATION_MICROSECONDS"; + case cudf::type_id::DURATION_NANOSECONDS: return "DURATION_NANOSECONDS"; + case cudf::type_id::DICTIONARY32: return "DICTIONARY32"; + case cudf::type_id::STRING: return "STRING"; + case cudf::type_id::LIST: return "LIST"; + case cudf::type_id::DECIMAL32: return "DECIMAL32"; + case cudf::type_id::DECIMAL64: return "DECIMAL64"; + case cudf::type_id::DECIMAL128: return "DECIMAL128"; + case cudf::type_id::STRUCT: return "STRUCT"; + default: return "unknown"; + } +} + +NVBENCH_DECLARE_ENUM_TYPE_STRINGS(cudf::type_id, stringify_type, stringify_type) + +NVBENCH_BENCH_TYPES(BM_to_arrow_host, NVBENCH_TYPE_AXES(data_types)) + .set_type_axes_names({"data_type"}) + .set_name("to_arrow_host") + .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000}) + .add_int64_axis("num_columns", {1}); + +NVBENCH_BENCH_TYPES(BM_to_arrow_device, NVBENCH_TYPE_AXES(data_types)) + .set_type_axes_names({"data_type"}) + .set_name("to_arrow_device") + .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000}) + .add_int64_axis("num_columns", {1}); + +NVBENCH_BENCH_TYPES(BM_from_arrow_host, NVBENCH_TYPE_AXES(data_types)) + .set_type_axes_names({"data_type"}) + .set_name("from_arrow_host") + .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000}) + .add_int64_axis("num_columns", {1}); + +NVBENCH_BENCH_TYPES(BM_from_arrow_device, NVBENCH_TYPE_AXES(data_types)) + .set_type_axes_names({"data_type"}) + .set_name("from_arrow_device") + .add_int64_axis("num_rows", {10'000, 100'000, 1'000'000, 10'000'000}) + .add_int64_axis("num_columns", {1}); diff --git a/cpp/benchmarks/io/json/json_reader_input.cpp b/cpp/benchmarks/io/json/json_reader_input.cpp index 4366790f208..678f2f1a600 100644 --- a/cpp/benchmarks/io/json/json_reader_input.cpp +++ b/cpp/benchmarks/io/json/json_reader_input.cpp @@ -24,17 +24,19 @@ #include -// Size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks to -// run on most GPUs, but large enough to allow highest throughput -constexpr size_t data_size = 512 << 20; +// Default size of the data in the benchmark dataframe; chosen to be low enough to allow benchmarks +// to run on most GPUs, but large enough to allow highest throughput +constexpr size_t default_data_size = 512 << 20; constexpr cudf::size_type num_cols = 64; void json_read_common(cuio_source_sink_pair& source_sink, cudf::size_type num_rows_to_read, - nvbench::state& state) + nvbench::state& state, + cudf::io::compression_type comptype = cudf::io::compression_type::NONE, + size_t data_size = default_data_size) { cudf::io::json_reader_options read_opts = - cudf::io::json_reader_options::builder(source_sink.make_source_info()); + cudf::io::json_reader_options::builder(source_sink.make_source_info()).compression(comptype); auto mem_stats_logger = cudf::memory_stats_logger(); state.set_cuda_stream(nvbench::make_cuda_stream_view(cudf::get_default_stream().value())); @@ -57,15 +59,21 @@ void json_read_common(cuio_source_sink_pair& source_sink, state.add_buffer_size(source_sink.size(), "encoded_file_size", "encoded_file_size"); } -cudf::size_type json_write_bm_data(cudf::io::sink_info sink, - std::vector const& dtypes) +cudf::size_type json_write_bm_data( + cudf::io::sink_info sink, + std::vector const& dtypes, + cudf::io::compression_type comptype = cudf::io::compression_type::NONE, + size_t data_size = default_data_size) { auto const tbl = create_random_table( cycle_dtypes(dtypes, num_cols), table_size_bytes{data_size}, data_profile_builder()); auto const view = tbl->view(); cudf::io::json_writer_options const write_opts = - cudf::io::json_writer_options::builder(sink, view).na_rep("null").rows_per_chunk(100'000); + cudf::io::json_writer_options::builder(sink, view) + .na_rep("null") + .rows_per_chunk(100'000) + .compression(comptype); cudf::io::write_json(write_opts); return view.num_rows(); } @@ -87,6 +95,26 @@ void BM_json_read_io(nvbench::state& state, nvbench::type_list +void BM_json_read_compressed_io( + nvbench::state& state, nvbench::type_list, nvbench::enum_type>) +{ + size_t const data_size = state.get_int64("data_size"); + cuio_source_sink_pair source_sink(IO); + auto const d_type = get_type_or_group({static_cast(data_type::INTEGRAL), + static_cast(data_type::FLOAT), + static_cast(data_type::DECIMAL), + static_cast(data_type::TIMESTAMP), + static_cast(data_type::DURATION), + static_cast(data_type::STRING), + static_cast(data_type::LIST), + static_cast(data_type::STRUCT)}); + auto const num_rows = + json_write_bm_data(source_sink.make_sink_info(), d_type, comptype, data_size); + + json_read_common(source_sink, num_rows, state, comptype, data_size); +} + template void BM_json_read_data_type( nvbench::state& state, nvbench::type_list, nvbench::enum_type>) @@ -110,8 +138,9 @@ using d_type_list = nvbench::enum_type_list; -using compression_list = - nvbench::enum_type_list; +using compression_list = nvbench::enum_type_list; NVBENCH_BENCH_TYPES(BM_json_read_data_type, NVBENCH_TYPE_AXES(d_type_list, nvbench::enum_type_list)) @@ -123,3 +152,10 @@ NVBENCH_BENCH_TYPES(BM_json_read_io, NVBENCH_TYPE_AXES(io_list)) .set_name("json_read_io") .set_type_axes_names({"io"}) .set_min_samples(4); + +NVBENCH_BENCH_TYPES(BM_json_read_compressed_io, + NVBENCH_TYPE_AXES(compression_list, nvbench::enum_type_list)) + .set_name("json_read_compressed_io") + .set_type_axes_names({"compression_type", "io"}) + .add_int64_power_of_two_axis("data_size", nvbench::range(20, 29, 1)) + .set_min_samples(4); diff --git a/cpp/benchmarks/io/nvbench_helpers.hpp b/cpp/benchmarks/io/nvbench_helpers.hpp index cc548ccd3de..011b2590c6f 100644 --- a/cpp/benchmarks/io/nvbench_helpers.hpp +++ b/cpp/benchmarks/io/nvbench_helpers.hpp @@ -76,6 +76,7 @@ NVBENCH_DECLARE_ENUM_TYPE_STRINGS( [](auto value) { switch (value) { case cudf::io::compression_type::SNAPPY: return "SNAPPY"; + case cudf::io::compression_type::GZIP: return "GZIP"; case cudf::io::compression_type::NONE: return "NONE"; default: return "Unknown"; } diff --git a/cpp/src/io/utilities/config_utils.cpp b/cpp/src/io/utilities/config_utils.cpp index 3307b4fa539..cea0ebad8f5 100644 --- a/cpp/src/io/utilities/config_utils.cpp +++ b/cpp/src/io/utilities/config_utils.cpp @@ -56,7 +56,8 @@ void set_up_kvikio() { static std::once_flag flag{}; std::call_once(flag, [] { - auto const compat_mode = kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", true); + auto const compat_mode = + kvikio::detail::getenv_or("KVIKIO_COMPAT_MODE", kvikio::CompatMode::ON); kvikio::defaults::compat_mode_reset(compat_mode); auto const nthreads = getenv_or("KVIKIO_NTHREADS", 4u); diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 68377ad6d5f..b37a5ac900a 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -45,7 +45,7 @@ class file_sink : public data_sink { cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode {}.", - _kvikio_file.is_compat_mode_on() ? "on" : "off"); + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } else { _cufile_out = detail::make_cufile_output(filepath); } diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index 0870e4a84a7..10814eea458 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -56,7 +56,7 @@ class file_source : public datasource { cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode {}.", - _kvikio_file.is_compat_mode_on() ? "on" : "off"); + _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } else { _cufile_in = detail::make_cufile_input(filepath); } diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index ce99f98b559..750c6cec180 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -368,6 +368,14 @@ def _process_dataset( file_list = paths if len(paths) == 1 and ioutils.is_directory(paths[0]): paths = ioutils.stringify_pathlike(paths[0]) + elif ( + filters is None + and isinstance(dataset_kwargs, dict) + and dataset_kwargs.get("partitioning") is None + ): + # Skip dataset processing if we have no filters + # or hive/directory partitioning to deal with. + return paths, row_groups, [], {} # Convert filters to ds.Expression if filters is not None: diff --git a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py index 39ac6474958..c0638e4a1c3 100644 --- a/python/dask_cudf/dask_cudf/_legacy/io/parquet.py +++ b/python/dask_cudf/dask_cudf/_legacy/io/parquet.py @@ -86,7 +86,8 @@ def _read_paths( ) dataset_kwargs = dataset_kwargs or {} - dataset_kwargs["partitioning"] = partitioning or "hive" + if partitions: + dataset_kwargs["partitioning"] = partitioning or "hive" # Use cudf to read in data try: diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index fb02e0ac772..9c5d5523019 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -700,140 +700,10 @@ def from_dict( ) @staticmethod - def read_parquet(path, *args, filesystem="fsspec", engine=None, **kwargs): - import dask_expr as dx - import fsspec - - if ( - isinstance(filesystem, fsspec.AbstractFileSystem) - or isinstance(filesystem, str) - and filesystem.lower() == "fsspec" - ): - # Default "fsspec" filesystem - from dask_cudf._legacy.io.parquet import CudfEngine + def read_parquet(*args, **kwargs): + from dask_cudf.io.parquet import read_parquet as read_parquet_expr - _raise_unsupported_parquet_kwargs(**kwargs) - return _default_backend( - dx.read_parquet, - path, - *args, - filesystem=filesystem, - engine=CudfEngine, - **kwargs, - ) - - else: - # EXPERIMENTAL filesystem="arrow" support. - # This code path uses PyArrow for IO, which is only - # beneficial for remote storage (e.g. S3) - - from fsspec.utils import stringify_path - from pyarrow import fs as pa_fs - - # CudfReadParquetPyarrowFS requires import of distributed beforehand - # (See: https://github.com/dask/dask/issues/11352) - import distributed # noqa: F401 - from dask.core import flatten - from dask.dataframe.utils import pyarrow_strings_enabled - - from dask_cudf.io.parquet import CudfReadParquetPyarrowFS - - if args: - raise ValueError(f"Unexpected positional arguments: {args}") - - if not ( - isinstance(filesystem, pa_fs.FileSystem) - or isinstance(filesystem, str) - and filesystem.lower() in ("arrow", "pyarrow") - ): - raise ValueError(f"Unexpected filesystem value: {filesystem}.") - - if not PYARROW_GE_15: - raise NotImplementedError( - "Experimental Arrow filesystem support requires pyarrow>=15" - ) - - if not isinstance(path, str): - path = stringify_path(path) - - # Extract kwargs - columns = kwargs.pop("columns", None) - filters = kwargs.pop("filters", None) - categories = kwargs.pop("categories", None) - index = kwargs.pop("index", None) - storage_options = kwargs.pop("storage_options", None) - dtype_backend = kwargs.pop("dtype_backend", None) - calculate_divisions = kwargs.pop("calculate_divisions", False) - ignore_metadata_file = kwargs.pop("ignore_metadata_file", False) - metadata_task_size = kwargs.pop("metadata_task_size", None) - split_row_groups = kwargs.pop("split_row_groups", "infer") - blocksize = kwargs.pop("blocksize", "default") - aggregate_files = kwargs.pop("aggregate_files", None) - parquet_file_extension = kwargs.pop( - "parquet_file_extension", (".parq", ".parquet", ".pq") - ) - arrow_to_pandas = kwargs.pop("arrow_to_pandas", None) - open_file_options = kwargs.pop("open_file_options", None) - - # Validate and normalize kwargs - kwargs["dtype_backend"] = dtype_backend - if arrow_to_pandas is not None: - raise ValueError( - "arrow_to_pandas not supported for the 'cudf' backend." - ) - if open_file_options is not None: - raise ValueError( - "The open_file_options argument is no longer supported " - "by the 'cudf' backend." - ) - if filters is not None: - for filter in flatten(filters, container=list): - _, op, val = filter - if op == "in" and not isinstance(val, (set, list, tuple)): - raise TypeError( - "Value of 'in' filter must be a list, set or tuple." - ) - if metadata_task_size is not None: - raise NotImplementedError( - "metadata_task_size is not supported when using the pyarrow filesystem." - ) - if split_row_groups != "infer": - raise NotImplementedError( - "split_row_groups is not supported when using the pyarrow filesystem." - ) - if parquet_file_extension != (".parq", ".parquet", ".pq"): - raise NotImplementedError( - "parquet_file_extension is not supported when using the pyarrow filesystem." - ) - if blocksize is not None and blocksize != "default": - warnings.warn( - "blocksize is not supported when using the pyarrow filesystem." - "blocksize argument will be ignored." - ) - if aggregate_files is not None: - warnings.warn( - "aggregate_files is not supported when using the pyarrow filesystem. " - "Please use the 'dataframe.parquet.minimum-partition-size' config." - "aggregate_files argument will be ignored." - ) - - return dx.new_collection( - CudfReadParquetPyarrowFS( - path, - columns=dx._util._convert_to_list(columns), - filters=filters, - categories=categories, - index=index, - calculate_divisions=calculate_divisions, - storage_options=storage_options, - filesystem=filesystem, - ignore_metadata_file=ignore_metadata_file, - arrow_to_pandas=arrow_to_pandas, - pyarrow_strings_enabled=pyarrow_strings_enabled(), - kwargs=kwargs, - _series=isinstance(columns, str), - ) - ) + return read_parquet_expr(*args, **kwargs) @staticmethod def read_csv( diff --git a/python/dask_cudf/dask_cudf/io/__init__.py b/python/dask_cudf/dask_cudf/io/__init__.py index 1e0f24d78ce..212951336c9 100644 --- a/python/dask_cudf/dask_cudf/io/__init__.py +++ b/python/dask_cudf/dask_cudf/io/__init__.py @@ -1,6 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from dask_cudf import _deprecated_api +from dask_cudf import _deprecated_api, QUERY_PLANNING_ON from . import csv, orc, json, parquet, text # noqa: F401 @@ -22,9 +22,14 @@ read_text = _deprecated_api( "dask_cudf.io.read_text", new_api="dask_cudf.read_text" ) -read_parquet = _deprecated_api( - "dask_cudf.io.read_parquet", new_api="dask_cudf.read_parquet" -) +if QUERY_PLANNING_ON: + read_parquet = parquet.read_parquet +else: + read_parquet = _deprecated_api( + "The legacy dask_cudf.io.read_parquet API", + new_api="dask_cudf.read_parquet", + rec="", + ) to_parquet = _deprecated_api( "dask_cudf.io.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/parquet.py b/python/dask_cudf/dask_cudf/io/parquet.py index a7a116875ea..bf8fae552c2 100644 --- a/python/dask_cudf/dask_cudf/io/parquet.py +++ b/python/dask_cudf/dask_cudf/io/parquet.py @@ -1,58 +1,252 @@ # Copyright (c) 2024, NVIDIA CORPORATION. + +from __future__ import annotations + import functools +import itertools +import math +import os +import warnings +from typing import TYPE_CHECKING, Any +import numpy as np import pandas as pd -from dask_expr.io.io import FusedParquetIO -from dask_expr.io.parquet import FragmentWrapper, ReadParquetPyarrowFS +from dask_expr._expr import Elemwise +from dask_expr._util import _convert_to_list +from dask_expr.io.io import FusedIO, FusedParquetIO +from dask_expr.io.parquet import ( + FragmentWrapper, + ReadParquetFSSpec, + ReadParquetPyarrowFS, +) from dask._task_spec import Task +from dask.dataframe.io.parquet.arrow import _filters_to_expression +from dask.dataframe.io.parquet.core import ParquetFunctionWrapper +from dask.tokenize import tokenize +from dask.utils import parse_bytes import cudf -from dask_cudf import _deprecated_api +from dask_cudf import QUERY_PLANNING_ON, _deprecated_api # Dask-expr imports CudfEngine from this module from dask_cudf._legacy.io.parquet import CudfEngine # noqa: F401 +if TYPE_CHECKING: + from collections.abc import MutableMapping + + +_DEVICE_SIZE_CACHE: int | None = None + + +def _get_device_size(): + try: + # Use PyNVML to find the worker device size. + import pynvml + + pynvml.nvmlInit() + index = os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(",")[0] + if index and not index.isnumeric(): + # This means index is UUID. This works for both MIG and non-MIG device UUIDs. + handle = pynvml.nvmlDeviceGetHandleByUUID(str.encode(index)) + else: + # This is a device index + handle = pynvml.nvmlDeviceGetHandleByIndex(int(index)) + return pynvml.nvmlDeviceGetMemoryInfo(handle).total + + except (ImportError, ValueError): + # Fall back to a conservative 8GiB default + return 8 * 1024**3 + + +def _normalize_blocksize(fraction: float = 0.03125): + # Set the blocksize to fraction * . + # We use the smallest worker device to set . + # (Default blocksize is 1/32 * ) + global _DEVICE_SIZE_CACHE + + if _DEVICE_SIZE_CACHE is None: + try: + # Check distributed workers (if a client exists) + from distributed import get_client + + client = get_client() + # TODO: Check "GPU" worker resources only. + # Depends on (https://github.com/rapidsai/dask-cuda/pull/1401) + device_size = min(client.run(_get_device_size).values()) + except (ImportError, ValueError): + device_size = _get_device_size() + _DEVICE_SIZE_CACHE = device_size + + return int(_DEVICE_SIZE_CACHE * fraction) + + +class NoOp(Elemwise): + # Workaround - Always wrap read_parquet operations + # in a NoOp to trigger tune_up optimizations. + _parameters = ["frame"] + _is_length_preserving = True + _projection_passthrough = True + _filter_passthrough = True + _preserves_partitioning_information = True -class CudfFusedParquetIO(FusedParquetIO): @staticmethod - def _load_multiple_files( - frag_filters, - columns, - schema, - **to_pandas_kwargs, - ): - import pyarrow as pa + def operation(x): + return x - from dask.base import apply, tokenize - from dask.threaded import get - token = tokenize(frag_filters, columns, schema) - name = f"pq-file-{token}" - dsk = { - (name, i): ( - CudfReadParquetPyarrowFS._fragment_to_table, - frag, - filter, - columns, - schema, - ) - for i, (frag, filter) in enumerate(frag_filters) - } - dsk[name] = ( - apply, - pa.concat_tables, - [list(dsk.keys())], - {"promote_options": "permissive"}, - ) - return CudfReadParquetPyarrowFS._table_to_pandas( - get(dsk, name), - **to_pandas_kwargs, - ) +class CudfReadParquetFSSpec(ReadParquetFSSpec): + _STATS_CACHE: MutableMapping[str, Any] = {} + + def approx_statistics(self): + # Use a few files to approximate column-size statistics + key = tokenize(self._dataset_info["ds"].files[:10], self.filters) + try: + return self._STATS_CACHE[key] + + except KeyError: + # Account for filters + ds_filters = None + if self.filters is not None: + ds_filters = _filters_to_expression(self.filters) + + # Use average total_uncompressed_size of three files + n_sample = 3 + column_sizes = {} + for i, frag in enumerate( + self._dataset_info["ds"].get_fragments(ds_filters) + ): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros( + n_sample, dtype="int64" + ) + column_sizes[name][i] += column.total_uncompressed_size + if (i + 1) >= n_sample: + break + + # Reorganize stats to look like arrow-fs version + self._STATS_CACHE[key] = { + "columns": [ + { + "path_in_schema": name, + "total_uncompressed_size": np.mean(sizes), + } + for name, sizes in column_sizes.items() + ] + } + return self._STATS_CACHE[key] + + @functools.cached_property + def _fusion_compression_factor(self): + # Disable fusion when blocksize=None + if self.blocksize is None: + return 1 + + # At this point, we *may* have used `blockwise` + # already to split or aggregate files. We don't + # *know* if the current partitions correspond to + # individual/full files, multiple/aggregated files + # or partial/split files. + # + # Therefore, we need to use the statistics from + # a few files to estimate the current partition + # size. This size should be similar to `blocksize` + # *if* aggregate_files is True or if the files + # are *smaller* than `blocksize`. + + # Step 1: Sample statistics + approx_stats = self.approx_statistics() + projected_size, original_size = 0, 0 + col_op = self.operand("columns") or self.columns + for col in approx_stats["columns"]: + original_size += col["total_uncompressed_size"] + if col["path_in_schema"] in col_op or ( + (split_name := col["path_in_schema"].split(".")) + and split_name[0] in col_op + ): + projected_size += col["total_uncompressed_size"] + if original_size < 1 or projected_size < 1: + return 1 + + # Step 2: Estimate the correction factor + # (Correct for possible pre-optimization fusion/splitting) + blocksize = parse_bytes(self.blocksize) + if original_size > blocksize: + # Input files are bigger than blocksize + # and we already split these large files. + # (correction_factor > 1) + correction_factor = original_size / blocksize + elif self.aggregate_files: + # Input files are smaller than blocksize + # and we already aggregate small files. + # (correction_factor == 1) + correction_factor = 1 + else: + # Input files are smaller than blocksize + # but we haven't aggregate small files yet. + # (correction_factor < 1) + correction_factor = original_size / blocksize + + # Step 3. Estimate column-projection factor + if self.operand("columns") is None: + projection_factor = 1 + else: + projection_factor = projected_size / original_size + + return max(projection_factor * correction_factor, 0.001) + + def _tune_up(self, parent): + if self._fusion_compression_factor >= 1: + return + if isinstance(parent, FusedIO): + return + return parent.substitute(self, CudfFusedIO(self)) class CudfReadParquetPyarrowFS(ReadParquetPyarrowFS): + _parameters = [ + "path", + "columns", + "filters", + "categories", + "index", + "storage_options", + "filesystem", + "blocksize", + "ignore_metadata_file", + "calculate_divisions", + "arrow_to_pandas", + "pyarrow_strings_enabled", + "kwargs", + "_partitions", + "_series", + "_dataset_info_cache", + ] + _defaults = { + "columns": None, + "filters": None, + "categories": None, + "index": None, + "storage_options": None, + "filesystem": None, + "blocksize": "256 MiB", + "ignore_metadata_file": True, + "calculate_divisions": False, + "arrow_to_pandas": None, + "pyarrow_strings_enabled": True, + "kwargs": None, + "_partitions": None, + "_series": False, + "_dataset_info_cache": None, + } + @functools.cached_property def _dataset_info(self): from dask_cudf._legacy.io.parquet import ( @@ -86,11 +280,92 @@ def _dataset_info(self): @staticmethod def _table_to_pandas(table, index_name): - df = cudf.DataFrame.from_arrow(table) - if index_name is not None: - df = df.set_index(index_name) + if isinstance(table, cudf.DataFrame): + df = table + else: + df = cudf.DataFrame.from_arrow(table) + if index_name is not None: + return df.set_index(index_name) return df + @staticmethod + def _fragments_to_cudf_dataframe( + fragment_wrappers, + filters, + columns, + schema, + ): + from dask.dataframe.io.utils import _is_local_fs + + from cudf.io.parquet import _apply_post_filters, _normalize_filters + + if not isinstance(fragment_wrappers, list): + fragment_wrappers = [fragment_wrappers] + + filesystem = None + paths, row_groups = [], [] + for fw in fragment_wrappers: + frag = fw.fragment if isinstance(fw, FragmentWrapper) else fw + paths.append(frag.path) + row_groups.append( + [rg.id for rg in frag.row_groups] if frag.row_groups else None + ) + if filesystem is None: + filesystem = frag.filesystem + + if _is_local_fs(filesystem): + filesystem = None + else: + from fsspec.implementations.arrow import ArrowFSWrapper + + filesystem = ArrowFSWrapper(filesystem) + protocol = filesystem.protocol + paths = [f"{protocol}://{path}" for path in paths] + + filters = _normalize_filters(filters) + projected_columns = None + if columns and filters: + projected_columns = [c for c in columns if c is not None] + columns = sorted( + set(v[0] for v in itertools.chain.from_iterable(filters)) + | set(projected_columns) + ) + + if row_groups == [None for path in paths]: + row_groups = None + + df = cudf.read_parquet( + paths, + columns=columns, + filters=filters, + row_groups=row_groups, + dataset_kwargs={"schema": schema}, + ) + + # Apply filters (if any are defined) + df = _apply_post_filters(df, filters) + if projected_columns: + # Elements of `projected_columns` may now be in the index. + # We must filter these names from our projection + projected_columns = [ + col for col in projected_columns if col in df._column_names + ] + df = df[projected_columns] + + # TODO: Deal with hive partitioning. + # Note that ReadParquetPyarrowFS does NOT support this yet anyway. + return df + + @functools.cached_property + def _use_device_io(self): + from dask.dataframe.io.utils import _is_local_fs + + # Use host for remote filesystem only + # (Unless we are using kvikio-S3) + return _is_local_fs(self.fs) or ( + self.fs.type_name == "s3" and cudf.get_option("kvikio_remote_io") + ) + def _filtered_task(self, name, index: int): columns = self.columns.copy() index_name = self.index.name @@ -101,12 +376,17 @@ def _filtered_task(self, name, index: int): if columns is None: columns = list(schema.names) columns.append(index_name) + + frag_to_table = self._fragment_to_table + if self._use_device_io: + frag_to_table = self._fragments_to_cudf_dataframe + return Task( name, self._table_to_pandas, Task( None, - self._fragment_to_table, + frag_to_table, fragment_wrapper=FragmentWrapper( self.fragments[index], filesystem=self.fs ), @@ -117,18 +397,441 @@ def _filtered_task(self, name, index: int): index_name=index_name, ) + @property + def _fusion_compression_factor(self): + blocksize = self.blocksize + if blocksize is None: + return 1 + elif blocksize == "default": + blocksize = "256MiB" + + projected_size = 0 + approx_stats = self.approx_statistics() + col_op = self.operand("columns") or self.columns + for col in approx_stats["columns"]: + if col["path_in_schema"] in col_op or ( + (split_name := col["path_in_schema"].split(".")) + and split_name[0] in col_op + ): + projected_size += col["total_uncompressed_size"] + + if projected_size < 1: + return 1 + + aggregate_files = max(1, int(parse_bytes(blocksize) / projected_size)) + return max(1 / aggregate_files, 0.001) + def _tune_up(self, parent): if self._fusion_compression_factor >= 1: return - if isinstance(parent, CudfFusedParquetIO): + fused_cls = ( + CudfFusedParquetIO + if self._use_device_io + else CudfFusedParquetIOHost + ) + if isinstance(parent, fused_cls): return - return parent.substitute(self, CudfFusedParquetIO(self)) + return parent.substitute(self, fused_cls(self)) -read_parquet = _deprecated_api( - "dask_cudf.io.parquet.read_parquet", - new_api="dask_cudf.read_parquet", -) +class CudfFusedIO(FusedIO): + def _task(self, name, index: int): + expr = self.operand("_expr") + bucket = self._fusion_buckets[index] + io_func = expr._filtered_task(name, 0).func + if not isinstance( + io_func, ParquetFunctionWrapper + ) or io_func.common_kwargs.get("partitions", None): + # Just use "simple" fusion if we have an unexpected + # callable, or we are dealing with hive partitioning. + return Task( + name, + cudf.concat, + [expr._filtered_task(name, i) for i in bucket], + ) + + pieces = [] + for i in bucket: + piece = expr._filtered_task(name, i).args[0] + if isinstance(piece, list): + pieces.extend(piece) + else: + pieces.append(piece) + return Task(name, io_func, pieces) + + +class CudfFusedParquetIO(FusedParquetIO): + @functools.cached_property + def _fusion_buckets(self): + partitions = self.operand("_expr")._partitions + npartitions = len(partitions) + + step = math.ceil(1 / self.operand("_expr")._fusion_compression_factor) + + # TODO: Heuristic to limit fusion should probably + # account for the number of workers. For now, just + # limiting fusion to 100 partitions at once. + step = min(step, 100) + + buckets = [ + partitions[i : i + step] for i in range(0, npartitions, step) + ] + return buckets + + @classmethod + def _load_multiple_files( + cls, + frag_filters, + columns, + schema, + **to_pandas_kwargs, + ): + frag_to_table = CudfReadParquetPyarrowFS._fragments_to_cudf_dataframe + return CudfReadParquetPyarrowFS._table_to_pandas( + frag_to_table( + [frag[0] for frag in frag_filters], + frag_filters[0][1], # TODO: Check for consistent filters? + columns, + schema, + ), + **to_pandas_kwargs, + ) + + +class CudfFusedParquetIOHost(CudfFusedParquetIO): + @classmethod + def _load_multiple_files( + cls, + frag_filters, + columns, + schema, + **to_pandas_kwargs, + ): + import pyarrow as pa + + from dask.base import apply, tokenize + from dask.threaded import get + + token = tokenize(frag_filters, columns, schema) + name = f"pq-file-{token}" + dsk = { + (name, i): ( + CudfReadParquetPyarrowFS._fragment_to_table, + frag, + filter, + columns, + schema, + ) + for i, (frag, filter) in enumerate(frag_filters) + } + dsk[name] = ( + apply, + pa.concat_tables, + [list(dsk.keys())], + {"promote_options": "permissive"}, + ) + + return CudfReadParquetPyarrowFS._table_to_pandas( + get(dsk, name), + **to_pandas_kwargs, + ) + + +def read_parquet_expr( + path, + *args, + columns=None, + filters=None, + categories=None, + index=None, + storage_options=None, + dtype_backend=None, + calculate_divisions=False, + ignore_metadata_file=False, + metadata_task_size=None, + split_row_groups="infer", + blocksize="default", + aggregate_files=None, + parquet_file_extension=(".parq", ".parquet", ".pq"), + filesystem="fsspec", + engine=None, + arrow_to_pandas=None, + open_file_options=None, + **kwargs, +): + """ + Read a Parquet file into a Dask-cuDF DataFrame. + + This reads a directory of Parquet data into a DataFrame collection. + Partitioning behavior mostly depends on the ``blocksize`` argument. + + .. note:: + Dask may automatically resize partitions at optimization time. + Please set ``blocksize=None`` to disable this behavior in Dask cuDF. + (NOTE: This will not disable fusion for the "pandas" backend) + + .. note:: + Specifying ``filesystem="arrow"`` leverages a complete reimplementation of + the Parquet reader that is solely based on PyArrow. It is faster than the + legacy implementation in some cases, but doesn't yet support all features. + + Parameters + ---------- + path : str or list + Source directory for data, or path(s) to individual parquet files. + Prefix with a protocol like ``s3://`` to read from alternative + filesystems. To read from multiple files you can pass a globstring or a + list of paths, with the caveat that they must all have the same + protocol. + columns : str or list, default None + Field name(s) to read in as columns in the output. By default all + non-index fields will be read (as determined by the pandas parquet + metadata, if present). Provide a single field name instead of a list to + read in the data as a Series. + filters : Union[List[Tuple[str, str, Any]], List[List[Tuple[str, str, Any]]]], default None + List of filters to apply, like ``[[('col1', '==', 0), ...], ...]``. + Using this argument will result in row-wise filtering of the final partitions. + + Predicates can be expressed in disjunctive normal form (DNF). This means that + the inner-most tuple describes a single column predicate. These inner predicates + are combined with an AND conjunction into a larger predicate. The outer-most + list then combines all of the combined filters with an OR disjunction. + + Predicates can also be expressed as a ``List[Tuple]``. These are evaluated + as an AND conjunction. To express OR in predicates, one must use the + (preferred for "pyarrow") ``List[List[Tuple]]`` notation. + index : str, list or False, default None + Field name(s) to use as the output frame index. By default will be + inferred from the pandas parquet file metadata, if present. Use ``False`` + to read all fields as columns. + categories : list or dict, default None + For any fields listed here, if the parquet encoding is Dictionary, + the column will be created with dtype category. Use only if it is + guaranteed that the column is encoded as dictionary in all row-groups. + If a list, assumes up to 2**16-1 labels; if a dict, specify the number + of labels expected; if None, will load categories automatically for + data written by dask, not otherwise. + storage_options : dict, default None + Key/value pairs to be passed on to the file-system backend, if any. + Note that the default file-system backend can be configured with the + ``filesystem`` argument, described below. + calculate_divisions : bool, default False + Whether to use min/max statistics from the footer metadata (or global + ``_metadata`` file) to calculate divisions for the output DataFrame + collection. Divisions will not be calculated if statistics are missing. + This option will be ignored if ``index`` is not specified and there is + no physical index column specified in the custom "pandas" Parquet + metadata. Note that ``calculate_divisions=True`` may be extremely slow + when no global ``_metadata`` file is present, especially when reading + from remote storage. Set this to ``True`` only when known divisions + are needed for your workload (see :ref:`dataframe-design-partitions`). + ignore_metadata_file : bool, default False + Whether to ignore the global ``_metadata`` file (when one is present). + If ``True``, or if the global ``_metadata`` file is missing, the parquet + metadata may be gathered and processed in parallel. Parallel metadata + processing is currently supported for ``ArrowDatasetEngine`` only. + metadata_task_size : int, default configurable + If parquet metadata is processed in parallel (see ``ignore_metadata_file`` + description above), this argument can be used to specify the number of + dataset files to be processed by each task in the Dask graph. If this + argument is set to ``0``, parallel metadata processing will be disabled. + The default values for local and remote filesystems can be specified + with the "metadata-task-size-local" and "metadata-task-size-remote" + config fields, respectively (see "dataframe.parquet"). + split_row_groups : 'infer', 'adaptive', bool, or int, default 'infer' + WARNING: The ``split_row_groups`` argument is now deprecated, please use + ``blocksize`` instead. + + blocksize : int, float or str, default 'default' + The desired size of each output ``DataFrame`` partition in terms of total + (uncompressed) parquet storage space. This argument may be used to split + large files or aggregate small files into the same partition. Use ``None`` + for a simple 1:1 mapping between files and partitions. Use a float value + less than 1.0 to specify the fractional size of the partitions with + respect to the total memory of the first NVIDIA GPU on your machine. + Default is 1/32 the total memory of a single GPU. + aggregate_files : bool or str, default None + WARNING: The behavior of ``aggregate_files=True`` is now obsolete + when query-planning is enabled (the default). Small files are now + aggregated automatically according to the ``blocksize`` setting. + Please expect this argument to be deprecated in a future release. + + WARNING: Passing a string argument to ``aggregate_files`` will result + in experimental behavior that may be removed at any time. + + parquet_file_extension: str, tuple[str], or None, default (".parq", ".parquet", ".pq") + A file extension or an iterable of extensions to use when discovering + parquet files in a directory. Files that don't match these extensions + will be ignored. This argument only applies when ``paths`` corresponds + to a directory and no ``_metadata`` file is present (or + ``ignore_metadata_file=True``). Passing in ``parquet_file_extension=None`` + will treat all files in the directory as parquet files. + + The purpose of this argument is to ensure that the engine will ignore + unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files). + It may be necessary to change this argument if the data files in your + parquet dataset do not end in ".parq", ".parquet", or ".pq". + filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use. + dataset: dict, default None + Dictionary of options to use when creating a ``pyarrow.dataset.Dataset`` object. + These options may include a "filesystem" key to configure the desired + file-system backend. However, the top-level ``filesystem`` argument will always + take precedence. + + **Note**: The ``dataset`` options may include a "partitioning" key. + However, since ``pyarrow.dataset.Partitioning`` + objects cannot be serialized, the value can be a dict of key-word + arguments for the ``pyarrow.dataset.partitioning`` API + (e.g. ``dataset={"partitioning": {"flavor": "hive", "schema": ...}}``). + Note that partitioned columns will not be converted to categorical + dtypes when a custom partitioning schema is specified in this way. + read: dict, default None + Dictionary of options to pass through to ``CudfEngine.read_partitions`` + using the ``read`` key-word argument. + """ + + import dask_expr as dx + from fsspec.utils import stringify_path + from pyarrow import fs as pa_fs + + from dask.core import flatten + from dask.dataframe.utils import pyarrow_strings_enabled + + from dask_cudf.backends import PYARROW_GE_15 + + if args: + raise ValueError(f"Unexpected positional arguments: {args}") + + if open_file_options is not None: + raise ValueError( + "The open_file_options argument is no longer supported " + "by the 'cudf' backend." + ) + if dtype_backend is not None: + raise NotImplementedError( + "dtype_backend is not supported by the 'cudf' backend." + ) + if arrow_to_pandas is not None: + raise NotImplementedError( + "arrow_to_pandas is not supported by the 'cudf' backend." + ) + if engine not in (None, "cudf", CudfEngine): + raise NotImplementedError( + "engine={engine} is not supported by the 'cudf' backend." + ) + + if not isinstance(path, str): + path = stringify_path(path) + + kwargs["dtype_backend"] = None + if arrow_to_pandas: + kwargs["arrow_to_pandas"] = None + + if filters is not None: + for filter in flatten(filters, container=list): + _, op, val = filter + if op == "in" and not isinstance(val, (set, list, tuple)): + raise TypeError( + "Value of 'in' filter must be a list, set or tuple." + ) + + # Normalize blocksize input + if blocksize == "default": + blocksize = _normalize_blocksize() + elif isinstance(blocksize, float) and blocksize < 1: + blocksize = _normalize_blocksize(blocksize) + + if ( + isinstance(filesystem, pa_fs.FileSystem) + or isinstance(filesystem, str) + and filesystem.lower() in ("arrow", "pyarrow") + ): + # EXPERIMENTAL filesystem="arrow" support. + # This code path may use PyArrow for remote IO. + + # CudfReadParquetPyarrowFS requires import of distributed beforehand + # (See: https://github.com/dask/dask/issues/11352) + import distributed # noqa: F401 + + if not PYARROW_GE_15: + raise ValueError( + "pyarrow>=15.0.0 is required to use the pyarrow filesystem." + ) + if metadata_task_size is not None: + warnings.warn( + "metadata_task_size is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if aggregate_files is not None: + warnings.warn( + "aggregate_files is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if split_row_groups != "infer": + warnings.warn( + "split_row_groups is not supported when using the pyarrow filesystem." + " This argument will be ignored!" + ) + if parquet_file_extension != (".parq", ".parquet", ".pq"): + raise NotImplementedError( + "parquet_file_extension is not supported when using the pyarrow filesystem." + ) + + return dx.new_collection( + NoOp( + CudfReadParquetPyarrowFS( + path, + columns=_convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + calculate_divisions=calculate_divisions, + storage_options=storage_options, + filesystem=filesystem, + blocksize=blocksize, + ignore_metadata_file=ignore_metadata_file, + arrow_to_pandas=None, + pyarrow_strings_enabled=pyarrow_strings_enabled(), + kwargs=kwargs, + _series=isinstance(columns, str), + ), + ) + ) + + return dx.new_collection( + NoOp( + CudfReadParquetFSSpec( + path, + columns=_convert_to_list(columns), + filters=filters, + categories=categories, + index=index, + blocksize=blocksize, + storage_options=storage_options, + calculate_divisions=calculate_divisions, + ignore_metadata_file=ignore_metadata_file, + metadata_task_size=metadata_task_size, + split_row_groups=split_row_groups, + aggregate_files=aggregate_files, + parquet_file_extension=parquet_file_extension, + filesystem=filesystem, + engine=CudfEngine, + kwargs=kwargs, + _series=isinstance(columns, str), + ), + ) + ) + + +if QUERY_PLANNING_ON: + read_parquet = read_parquet_expr + read_parquet.__doc__ = read_parquet_expr.__doc__ +else: + read_parquet = _deprecated_api( + "The legacy dask_cudf.io.parquet.read_parquet API", + new_api="dask_cudf.read_parquet", + rec="", + ) to_parquet = _deprecated_api( "dask_cudf.io.parquet.to_parquet", new_api="dask_cudf._legacy.io.parquet.to_parquet", diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 522a21e12a5..6efe6c4f388 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -46,7 +46,7 @@ def test_roundtrip_backend_dispatch(tmpdir): tmpdir = str(tmpdir) ddf.to_parquet(tmpdir, engine="pyarrow") with dask.config.set({"dataframe.backend": "cudf"}): - ddf2 = dd.read_parquet(tmpdir, index=False) + ddf2 = dd.read_parquet(tmpdir, index=False, blocksize=None) assert isinstance(ddf2, dask_cudf.DataFrame) dd.assert_eq(ddf.reset_index(drop=False), ddf2) @@ -100,7 +100,7 @@ def test_roundtrip_from_dask_index_false(tmpdir): tmpdir = str(tmpdir) ddf.to_parquet(tmpdir, engine="pyarrow") - ddf2 = dask_cudf.read_parquet(tmpdir, index=False) + ddf2 = dask_cudf.read_parquet(tmpdir, index=False, blocksize=None) dd.assert_eq(ddf.reset_index(drop=False), ddf2) @@ -667,7 +667,7 @@ def test_to_parquet_append(tmpdir, write_metadata_file): write_metadata_file=write_metadata_file, write_index=False, ) - ddf2 = dask_cudf.read_parquet(tmpdir) + ddf2 = dask_cudf.read_parquet(tmpdir, blocksize=None) dd.assert_eq(cudf.concat([df, df]), ddf2) @@ -677,13 +677,17 @@ def test_deprecated_api_paths(tmpdir): with pytest.warns(match="dask_cudf.io.to_parquet is now deprecated"): dask_cudf.io.to_parquet(df, tmpdir) - # Encourage top-level read_parquet import only - with pytest.warns(match="dask_cudf.io.read_parquet is now deprecated"): + if dask_cudf.QUERY_PLANNING_ON: df2 = dask_cudf.io.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) + dd.assert_eq(df, df2, check_divisions=False) - with pytest.warns( - match="dask_cudf.io.parquet.read_parquet is now deprecated" - ): df2 = dask_cudf.io.parquet.read_parquet(tmpdir) - dd.assert_eq(df, df2, check_divisions=False) + dd.assert_eq(df, df2, check_divisions=False) + else: + with pytest.warns(match="legacy dask_cudf.io.read_parquet"): + df2 = dask_cudf.io.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False) + + with pytest.warns(match="legacy dask_cudf.io.parquet.read_parquet"): + df2 = dask_cudf.io.parquet.read_parquet(tmpdir) + dd.assert_eq(df, df2, check_divisions=False)