From e50fa00aed685395a16d252787a834d308a548bc Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Thu, 11 Jan 2024 14:58:50 -0800 Subject: [PATCH] Expose streams in Parquet reader and writer APIs (#14359) This PR contributes to https://github.com/rapidsai/cudf/issues/13744. -Added stream parameters to public APIs ``` cudf::io::read_parquet cudf::io::write_parquet cudf::io::parquet_chunked_writer cudf::io::chunked_parquet_reader ``` -Added stream gtests Authors: - Shruti Shivakumar (https://github.com/shrshi) Approvers: - Mark Harris (https://github.com/harrism) - Vukasin Milovanovic (https://github.com/vuule) URL: https://github.com/rapidsai/cudf/pull/14359 --- cpp/include/cudf/io/parquet.hpp | 16 ++- cpp/src/io/functions.cpp | 27 ++--- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/io/parquet_test.cpp | 138 ++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 16 deletions(-) create mode 100644 cpp/tests/streams/io/parquet_test.cpp diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index ea18da74d5a..dc035db8d39 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -401,6 +401,7 @@ class parquet_reader_options_builder { * @endcode * * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the table in the returned * table_with_metadata * @@ -408,6 +409,7 @@ class parquet_reader_options_builder { */ table_with_metadata read_parquet( parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -438,11 +440,13 @@ class chunked_parquet_reader { * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ chunked_parquet_reader( std::size_t chunk_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -461,12 +465,14 @@ class chunked_parquet_reader { * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or * `0` if there is no limit * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ chunked_parquet_reader( std::size_t chunk_read_limit, std::size_t pass_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -1163,11 +1169,13 @@ class parquet_writer_options_builder { * @endcode * * @param options Settings for controlling writing behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if * requested in parquet_writer_options (empty blob otherwise). */ -std::unique_ptr> write_parquet(parquet_writer_options const& options); +std::unique_ptr> write_parquet( + parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Merges multiple raw metadata blobs that were previously created by write_parquet @@ -1778,8 +1786,10 @@ class parquet_chunked_writer { * @brief Constructor with chunked writer options * * @param[in] options options used to write table + * @param[in] stream CUDA stream used for device memory operations and kernel launches */ - parquet_chunked_writer(chunked_parquet_writer_options const& options); + parquet_chunked_writer(chunked_parquet_writer_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Writes table to output. diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index a9049d5640e..e5489963618 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -488,13 +488,14 @@ using namespace cudf::io::parquet::detail; namespace detail_parquet = cudf::io::parquet::detail; table_with_metadata read_parquet(parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = std::make_unique( - std::move(datasources), options, cudf::get_default_stream(), mr); + auto reader = + std::make_unique(std::move(datasources), options, stream, mr); return reader->read(options); } @@ -554,7 +555,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata) /** * @copydoc cudf::io::write_parquet */ -std::unique_ptr> write_parquet(parquet_writer_options const& options) +std::unique_ptr> write_parquet(parquet_writer_options const& options, + rmm::cuda_stream_view stream) { namespace io_detail = cudf::io::detail; @@ -562,7 +564,7 @@ std::unique_ptr> write_parquet(parquet_writer_options const auto sinks = make_datasinks(options.get_sink()); auto writer = std::make_unique( - std::move(sinks), options, io_detail::single_write_mode::YES, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::YES, stream); writer->write(options.get_table(), options.get_partitions()); @@ -574,13 +576,10 @@ std::unique_ptr> write_parquet(parquet_writer_options const */ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : reader{std::make_unique(chunk_read_limit, - 0, - make_datasources(options.get_source()), - options, - cudf::get_default_stream(), - mr)} + : reader{std::make_unique( + chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)} { } @@ -590,12 +589,13 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, std::size_t pass_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : reader{std::make_unique(chunk_read_limit, pass_read_limit, make_datasources(options.get_source()), options, - cudf::get_default_stream(), + stream, mr)} { } @@ -628,14 +628,15 @@ table_with_metadata chunked_parquet_reader::read_chunk() const /** * @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer */ -parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options) +parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options, + rmm::cuda_stream_view stream) { namespace io_detail = cudf::io::detail; auto sinks = make_datasinks(options.get_sink()); writer = std::make_unique( - std::move(sinks), options, io_detail::single_write_mode::NO, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::NO, stream); } /** diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 48bc4ac6fc1..f7b805b68f5 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -655,6 +655,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing) ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp new file mode 100644 index 00000000000..c6d531bc376 --- /dev/null +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +// Global environment for temporary files +auto const temp_env = static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +class ParquetTest : public cudf::test::BaseFixture {}; + +template +std::vector> make_uniqueptrs_vector(UniqPtrs&&... uniqptrs) +{ + std::vector> ptrsvec; + (ptrsvec.push_back(std::forward(uniqptrs)), ...); + return ptrsvec; +} + +cudf::table construct_table() +{ + constexpr auto num_rows = 10; + + std::vector zeros(num_rows, 0); + std::vector ones(num_rows, 1); + + cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col6 = [&ones, num_rows] { + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{12}}; + }); + return cudf::test::fixed_width_column_wrapper(col6_data, + col6_data + num_rows); + }(); + cudf::test::fixed_width_column_wrapper col7 = [&ones, num_rows] { + auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{-12}}; + }); + return cudf::test::fixed_width_column_wrapper(col7_data, + col7_data + num_rows); + }(); + + cudf::test::lists_column_wrapper col8{ + {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; + + cudf::test::structs_column_wrapper col9 = [&ones] { + cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); + return cudf::test::structs_column_wrapper{child_col}; + }(); + + cudf::test::strings_column_wrapper col10 = [] { + std::vector col10_data(num_rows, "rapids"); + return cudf::test::strings_column_wrapper(col10_data.begin(), col10_data.end()); + }(); + + auto colsptr = make_uniqueptrs_vector(col0.release(), + col1.release(), + col2.release(), + col3.release(), + col4.release(), + col5.release(), + col6.release(), + col7.release(), + col8.release(), + col9.release(), + col10.release()); + return cudf::table(std::move(colsptr)); +} + +TEST_F(ParquetTest, ParquetWriter) +{ + auto tab = construct_table(); + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); + cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); +} + +TEST_F(ParquetTest, ParquetReader) +{ + auto tab = construct_table(); + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); + cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts, cudf::test::get_default_stream()); + auto meta = cudf::io::read_parquet_metadata(cudf::io::source_info{filepath}); +} + +TEST_F(ParquetTest, ChunkedOperations) +{ + auto tab = construct_table(); + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::chunked_parquet_writer_options out_opts = + cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath}); + cudf::io::parquet_chunked_writer(out_opts, cudf::test::get_default_stream()).write(tab); + + auto reader = cudf::io::chunked_parquet_reader( + 1L << 31, + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}), + cudf::test::get_default_stream()); + while (reader.has_next()) { + auto chunk = reader.read_chunk(); + } +}