Skip to content

Commit

Permalink
Expose streams in Parquet reader and writer APIs (#14359)
Browse files Browse the repository at this point in the history
This PR contributes to #13744.
-Added stream parameters to public APIs
```
cudf::io::read_parquet
cudf::io::write_parquet
cudf::io::parquet_chunked_writer
cudf::io::chunked_parquet_reader
```
-Added stream gtests

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Mark Harris (https://github.com/harrism)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #14359
  • Loading branch information
shrshi authored Jan 11, 2024
1 parent 0d87bb7 commit e50fa00
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 16 deletions.
16 changes: 13 additions & 3 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -401,13 +401,15 @@ class parquet_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_parquet(
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -438,11 +440,13 @@ class chunked_parquet_reader {
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -461,12 +465,14 @@ class chunked_parquet_reader {
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
* `0` if there is no limit
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -1163,11 +1169,13 @@ class parquet_writer_options_builder {
* @endcode
*
* @param options Settings for controlling writing behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A blob that contains the file metadata (parquet FileMetadata thrift message) if
* requested in parquet_writer_options (empty blob otherwise).
*/

std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options);
std::unique_ptr<std::vector<uint8_t>> write_parquet(
parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Merges multiple raw metadata blobs that were previously created by write_parquet
Expand Down Expand Up @@ -1778,8 +1786,10 @@ class parquet_chunked_writer {
* @brief Constructor with chunked writer options
*
* @param[in] options options used to write table
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
parquet_chunked_writer(chunked_parquet_writer_options const& options);
parquet_chunked_writer(chunked_parquet_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Writes table to output.
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,13 +488,14 @@ using namespace cudf::io::parquet::detail;
namespace detail_parquet = cudf::io::parquet::detail;

table_with_metadata read_parquet(parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader = std::make_unique<detail_parquet::reader>(
std::move(datasources), options, cudf::get_default_stream(), mr);
auto reader =
std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr);

return reader->read(options);
}
Expand Down Expand Up @@ -554,15 +555,16 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata)
/**
* @copydoc cudf::io::write_parquet
*/
std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options)
std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options,
rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

CUDF_FUNC_RANGE();

auto sinks = make_datasinks(options.get_sink());
auto writer = std::make_unique<detail_parquet::writer>(
std::move(sinks), options, io_detail::single_write_mode::YES, cudf::get_default_stream());
std::move(sinks), options, io_detail::single_write_mode::YES, stream);

writer->write(options.get_table(), options.get_partitions());

Expand All @@ -574,13 +576,10 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
: reader{std::make_unique<detail_parquet::chunked_reader>(
chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)}
{
}

Expand All @@ -590,12 +589,13 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
stream,
mr)}
{
}
Expand Down Expand Up @@ -628,14 +628,15 @@ table_with_metadata chunked_parquet_reader::read_chunk() const
/**
* @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer
*/
parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options)
parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options,
rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

auto sinks = make_datasinks(options.get_sink());

writer = std::make_unique<detail_parquet::writer>(
std::move(sinks), options, io_detail::single_write_mode::NO, cudf::get_default_stream());
std::move(sinks), options, io_detail::single_write_mode::NO, stream);
}

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
Expand Down
138 changes: 138 additions & 0 deletions cpp/tests/streams/io/parquet_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/detail/parquet.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <string>
#include <vector>

// Global environment for temporary files
auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

class ParquetTest : public cudf::test::BaseFixture {};

template <typename... UniqPtrs>
std::vector<std::unique_ptr<cudf::column>> make_uniqueptrs_vector(UniqPtrs&&... uniqptrs)
{
std::vector<std::unique_ptr<cudf::column>> ptrsvec;
(ptrsvec.push_back(std::forward<UniqPtrs>(uniqptrs)), ...);
return ptrsvec;
}

cudf::table construct_table()
{
constexpr auto num_rows = 10;

std::vector<size_t> zeros(num_rows, 0);
std::vector<size_t> ones(num_rows, 1);

cudf::test::fixed_width_column_wrapper<bool> col0(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int8_t> col1(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int16_t> col2(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int32_t> col3(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<float> col4(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<double> col5(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col6 = [&ones, num_rows] {
auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col6_data,
col6_data + num_rows);
}();
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col7 = [&ones, num_rows] {
auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{-12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col7_data,
col7_data + num_rows);
}();

cudf::test::lists_column_wrapper<int64_t> col8{
{1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}};

cudf::test::structs_column_wrapper col9 = [&ones] {
cudf::test::fixed_width_column_wrapper<int32_t> child_col(ones.begin(), ones.end());
return cudf::test::structs_column_wrapper{child_col};
}();

cudf::test::strings_column_wrapper col10 = [] {
std::vector<std::string> col10_data(num_rows, "rapids");
return cudf::test::strings_column_wrapper(col10_data.begin(), col10_data.end());
}();

auto colsptr = make_uniqueptrs_vector(col0.release(),
col1.release(),
col2.release(),
col3.release(),
col4.release(),
col5.release(),
col6.release(),
col7.release(),
col8.release(),
col9.release(),
col10.release());
return cudf::table(std::move(colsptr));
}

TEST_F(ParquetTest, ParquetWriter)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_parquet(out_opts, cudf::test::get_default_stream());
}

TEST_F(ParquetTest, ParquetReader)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_parquet(out_opts, cudf::test::get_default_stream());

cudf::io::parquet_reader_options in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto result = cudf::io::read_parquet(in_opts, cudf::test::get_default_stream());
auto meta = cudf::io::read_parquet_metadata(cudf::io::source_info{filepath});
}

TEST_F(ParquetTest, ChunkedOperations)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::chunked_parquet_writer_options out_opts =
cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath});
cudf::io::parquet_chunked_writer(out_opts, cudf::test::get_default_stream()).write(tab);

auto reader = cudf::io::chunked_parquet_reader(
1L << 31,
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}),
cudf::test::get_default_stream());
while (reader.has_next()) {
auto chunk = reader.read_chunk();
}
}

0 comments on commit e50fa00

Please sign in to comment.