From 1d32613ee0649821fe7efb24cef97f06bb13eca0 Mon Sep 17 00:00:00 2001 From: Mike Wilson Date: Tue, 19 Dec 2023 18:22:20 +0000 Subject: [PATCH 1/3] first pass at multi-threaded parquet benchmarks Signed-off-by: Mike Wilson --- cpp/benchmarks/CMakeLists.txt | 6 + cpp/benchmarks/io/cuio_common.hpp | 4 + .../io/parquet/parquet_reader_multithread.cpp | 121 ++++++++++++++++++ .../cudf}/utilities/thread_pool.hpp | 0 cpp/src/io/utilities/file_io_utilities.hpp | 2 +- 5 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp rename cpp/{src/io => include/cudf}/utilities/thread_pool.hpp (100%) diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 9c3a05a2f5f..1b02a75b637 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -248,6 +248,12 @@ ConfigureNVBench( PARQUET_READER_NVBENCH io/parquet/parquet_reader_input.cpp io/parquet/parquet_reader_options.cpp ) +# ################################################################################################## +# * parquet multithread reader benchmark ---------------------------------------------------------------------- +ConfigureNVBench( + PARQUET_MULTITHREAD_READER_NVBENCH io/parquet/parquet_reader_multithread.cpp +) + # ################################################################################################## # * orc reader benchmark -------------------------------------------------------------------------- ConfigureNVBench(ORC_READER_NVBENCH io/orc/orc_reader_input.cpp io/orc/orc_reader_options.cpp) diff --git a/cpp/benchmarks/io/cuio_common.hpp b/cpp/benchmarks/io/cuio_common.hpp index fe509f196be..3d2a028e756 100644 --- a/cpp/benchmarks/io/cuio_common.hpp +++ b/cpp/benchmarks/io/cuio_common.hpp @@ -48,6 +48,10 @@ class cuio_source_sink_pair { // delete the temporary file std::remove(file_name.c_str()); } + // move constructor + cuio_source_sink_pair(cuio_source_sink_pair&& ss) = default; + cuio_source_sink_pair& operator=(cuio_source_sink_pair&& ss) = default; + /** * @brief Created a source info of the set type * diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp new file mode 100644 index 00000000000..dcb56b084d1 --- /dev/null +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#include + +template +void BM_parquet_multithreaded_read(nvbench::state& state, + nvbench::type_list>) +{ + auto const d_type = get_type_or_group(static_cast(DataType)); + cudf::size_type const cardinality = state.get_int64("cardinality"); + cudf::size_type const run_length = state.get_int64("run_length"); + cudf::size_type const num_cols = state.get_int64("num_cols"); + size_t const data_size = state.get_int64("data_size"); + size_t const chunk_size = state.get_int64("chunk_size"); + auto const num_threads = state.get_int64("num_threads"); + + int const num_tables = data_size / chunk_size; + std::vector source_sink_vector; + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + + size_t total_file_size = 0; + + for (auto i = 0; i < num_tables; ++i) { + cuio_source_sink_pair source_sink{cudf::io::io_type::HOST_BUFFER}; + + auto const tbl = create_random_table( + cycle_dtypes(d_type, num_cols), + table_size_bytes{chunk_size}, + data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const view = tbl->view(); + + cudf::io::parquet_writer_options write_opts = + cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view) + .compression(cudf::io::compression_type::SNAPPY); + + cudf::io::write_parquet(write_opts); + total_file_size += source_sink.size(); + + source_sink_vector.push_back(std::move(source_sink)); + } + + auto mem_stats_logger = cudf::memory_stats_logger(); + + state.exec( + nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + auto datasources = cudf::io::datasource::create(read_opts.get_source().host_buffers()); + auto reader = std::make_unique( + std::move(datasources), read_opts, stream, rmm::mr::get_current_device_resource()); + + reader->read(read_opts); + }; + + threads.paused = true; + for (auto i = 0; i < num_tables; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +using d_type_list = nvbench::enum_type_list; + +NVBENCH_BENCH_TYPES(BM_parquet_multithreaded_read, NVBENCH_TYPE_AXES(d_type_list)) + .set_name("parquet_multithreaded_read_decode") + .set_min_samples(4) + .add_int64_axis("cardinality", {0, 1000}) + .add_int64_axis("num_cols", {872}) + .add_int64_axis("run_length", {1, 32}) + .add_int64_axis("chunk_size", {128ul * 1024 * 1024}) + .add_int64_axis("data_size", {5ul * 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {4}); diff --git a/cpp/src/io/utilities/thread_pool.hpp b/cpp/include/cudf/utilities/thread_pool.hpp similarity index 100% rename from cpp/src/io/utilities/thread_pool.hpp rename to cpp/include/cudf/utilities/thread_pool.hpp diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index b55dd3b1583..968b41bc760 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -17,7 +17,7 @@ #pragma once #ifdef CUFILE_FOUND -#include "thread_pool.hpp" +#include #include #include From 21a6261f0237b0d38a909eb18c3a4f1f56f4dc04 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Mon, 20 May 2024 12:06:06 -0500 Subject: [PATCH 2/3] Rework test parameters so that we generally see at least small performance increases with more threads. And the benchmarks clearly show multiple-thread-only performance regressions that we have historically seen. --- .../io/parquet/parquet_reader_multithread.cpp | 195 ++++++++++-------- 1 file changed, 113 insertions(+), 82 deletions(-) diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index 8ee3d8c8038..4309e9cecd0 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -41,23 +41,27 @@ void set_cuio_host_pinned_pool() using host_pooled_mr = rmm::mr::pool_memory_resource; static std::shared_ptr mr = std::make_shared( std::make_shared().get(), 256ul * 1024 * 1024); - cudf::io::set_host_memory_resource(*mr); } size_t get_num_reads(nvbench::state const& state) { - size_t const data_size = state.get_int64("total_data_size"); - size_t const per_file_data_size = state.get_int64("per_file_data_size"); - return data_size / per_file_data_size; + return state.get_int64("num_threads"); } -std::string get_label(std::string const& test_name, nvbench::state const& state) +size_t get_read_size(nvbench::state const& state) { auto const num_reads = get_num_reads(state); + return state.get_int64("total_data_size") / num_reads; +} + +std::string get_label(std::string const& test_name, nvbench::state const& state) +{ auto const num_cols = state.get_int64("num_cols"); - return {test_name + ", " + std::to_string(num_cols) + "columns, " + std::to_string(num_reads) + - "reads, " + std::to_string(state.get_int64("num_threads")) + " threads"}; + size_t const read_size_mb = get_read_size(state) / (1024 * 1024); + return {test_name + ", " + std::to_string(num_cols) + " columns, " + + std::to_string(state.get_int64("num_threads")) + " threads " + + " (" + std::to_string(read_size_mb) + " MB each)"}; } std::tuple, size_t, size_t> write_file_data( @@ -66,14 +70,14 @@ std::tuple, size_t, size_t> write_file_data( cudf::size_type const cardinality = state.get_int64("cardinality"); cudf::size_type const run_length = state.get_int64("run_length"); cudf::size_type const num_cols = state.get_int64("num_cols"); - size_t const per_file_data_size = state.get_int64("per_file_data_size"); - - size_t const num_tables = get_num_reads(state); + size_t const num_files = get_num_reads(state); + size_t const per_file_data_size = get_read_size(state); + std::vector source_sink_vector; size_t total_file_size = 0; - for (size_t i = 0; i < num_tables; ++i) { + for (size_t i = 0; i < num_files; ++i) { cuio_source_sink_pair source_sink{cudf::io::io_type::HOST_BUFFER}; auto const tbl = create_random_table( @@ -84,7 +88,9 @@ std::tuple, size_t, size_t> write_file_data( cudf::io::parquet_writer_options write_opts = cudf::io::parquet_writer_options::builder(source_sink.make_sink_info(), view) - .compression(cudf::io::compression_type::SNAPPY); + .compression(cudf::io::compression_type::SNAPPY) + .max_page_size_rows(50000) + .max_page_size_bytes(1024 * 1024); cudf::io::write_parquet(write_opts); total_file_size += source_sink.size(); @@ -92,11 +98,12 @@ std::tuple, size_t, size_t> write_file_data( source_sink_vector.push_back(std::move(source_sink)); } - return {std::move(source_sink_vector), total_file_size, num_tables}; + return {std::move(source_sink_vector), total_file_size, num_files}; } void BM_parquet_multithreaded_read_common(nvbench::state& state, - std::vector const& d_types) + std::vector const& d_types, + std::string const& label) { size_t const data_size = state.get_int64("total_data_size"); auto const num_threads = state.get_int64("num_threads"); @@ -106,10 +113,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); cudf::detail::thread_pool threads(num_threads); - auto [source_sink_vector, total_file_size, num_tables] = write_file_data(state, d_types); + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); auto mem_stats_logger = cudf::memory_stats_logger(); + nvtxRangePushA(("(read) " + label).c_str()); state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { auto read_func = [&](int index) { @@ -121,7 +129,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, }; threads.paused = true; - for (size_t i = 0; i < num_tables; ++i) { + for (size_t i = 0; i < num_files; ++i) { threads.submit(read_func, i); } timer.start(); @@ -130,6 +138,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); + nvtxRangePop(); auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); @@ -140,51 +149,56 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, void BM_parquet_multithreaded_read_mixed(nvbench::state& state) { - nvtxRangePushA(get_label("mixed", state).c_str()); + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); BM_parquet_multithreaded_read_common( state, - {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING, cudf::type_id::LIST}); + {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_fixed_width(nvbench::state& state) { - nvtxRangePushA(get_label("fixed width", state).c_str()); - BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32}); + auto label = get_label("fixed width", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::INT32}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_string(nvbench::state& state) { - nvtxRangePushA(get_label("string", state).c_str()); - BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING}); + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::STRING}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_list(nvbench::state& state) { - nvtxRangePushA(get_label("list", state).c_str()); - BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST}); + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_common(state, {cudf::type_id::LIST}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, - std::vector const& d_types) + std::vector const& d_types, + std::string const& label) { - size_t const data_size = state.get_int64("total_data_size"); + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); size_t const input_limit = state.get_int64("input_limit"); size_t const output_limit = state.get_int64("output_limit"); - auto const num_threads = state.get_int64("num_threads"); set_cuio_host_pinned_pool(); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); cudf::detail::thread_pool threads(num_threads); - - auto [source_sink_vector, total_file_size, num_tables] = write_file_data(state, d_types); + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); auto mem_stats_logger = cudf::memory_stats_logger(); + nvtxRangePushA(("(read) " + label).c_str()); std::vector chunks; state.exec( nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { @@ -193,7 +207,10 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, auto& source_sink = source_sink_vector[index]; cudf::io::parquet_reader_options read_opts = cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); - auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts); + // divide chunk limits by number of threads so the number of chunks produced is the same for all cases. + // this seems better than the alternative, which is to keep the limits the same. if we do that, as the + // number of threads goes up, the number of chunks goes down - so are actually benchmarking the same thing in that case? + auto reader = cudf::io::chunked_parquet_reader(output_limit / num_threads, input_limit / num_threads, read_opts, stream); // read all the chunks do { @@ -202,7 +219,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, }; threads.paused = true; - for (size_t i = 0; i < num_tables; ++i) { + for (size_t i = 0; i < num_files; ++i) { threads.submit(read_func, i); } timer.start(); @@ -211,6 +228,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); + nvtxRangePop(); auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); @@ -221,103 +239,116 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, void BM_parquet_multithreaded_read_chunked_mixed(nvbench::state& state) { - nvtxRangePushA(get_label("mixed", state).c_str()); + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); BM_parquet_multithreaded_read_chunked_common( - state, {cudf::type_id::INT32, cudf::type_id::STRING, cudf::type_id::LIST}); + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + nvtxRangePop(); +} + +void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state) +{ + auto label = get_label("mixed", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common( + state, {cudf::type_id::INT32}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_chunked_string(nvbench::state& state) { - nvtxRangePushA(get_label("string", state).c_str()); - BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}); + auto label = get_label("string", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label); nvtxRangePop(); } void BM_parquet_multithreaded_read_chunked_list(nvbench::state& state) { - nvtxRangePushA(get_label("list", state).c_str()); - BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}); + auto label = get_label("list", state); + nvtxRangePushA(label.c_str()); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label); nvtxRangePop(); } -// mixed data types, covering the 3 main families : fixed width, strings, and lists +// mixed data types: fixed width and strings NVBENCH_BENCH(BM_parquet_multithreaded_read_mixed) .set_name("parquet_multithreaded_read_decode_mixed") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {4, 16}) - .add_int64_axis("run_length", {8}) - .add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}); + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); NVBENCH_BENCH(BM_parquet_multithreaded_read_fixed_width) .set_name("parquet_multithreaded_read_decode_fixed_width") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {4, 16}) - .add_int64_axis("run_length", {8}) - .add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}); + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); NVBENCH_BENCH(BM_parquet_multithreaded_read_string) .set_name("parquet_multithreaded_read_decode_string") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {4, 16}) - .add_int64_axis("run_length", {8}) - .add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}); + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); NVBENCH_BENCH(BM_parquet_multithreaded_read_list) .set_name("parquet_multithreaded_read_decode_list") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {4, 16}) - .add_int64_axis("run_length", {8}) - .add_int64_axis("per_file_data_size", {128ul * 1024 * 1024, 512ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {4ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}); + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); -// mixed data types, covering the 3 main families : fixed width, strings, and lists +// mixed data types: fixed width, strings NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_mixed) .set_name("parquet_multithreaded_read_decode_chunked_mixed") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {6}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) - // divides into 10 GB exactly 8 times - .add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}) - .add_int64_axis("input_limit", {768 * 1024 * 1024}) - .add_int64_axis("output_limit", {512 * 1024 * 1024}); + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_fixed_width) + .set_name("parquet_multithreaded_read_decode_chunked_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_string) - .set_name("parquet_multithreaded_read_decode_chunked_mixed") + .set_name("parquet_multithreaded_read_decode_chunked_string") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {6}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) - // divides into 10 GB exactly 8 times - .add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}) - .add_int64_axis("input_limit", {768 * 1024 * 1024}) - .add_int64_axis("output_limit", {512 * 1024 * 1024}); + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); NVBENCH_BENCH(BM_parquet_multithreaded_read_chunked_list) - .set_name("parquet_multithreaded_read_decode_chunked_mixed") + .set_name("parquet_multithreaded_read_decode_chunked_list") .set_min_samples(4) .add_int64_axis("cardinality", {1000}) - .add_int64_axis("num_cols", {6}) + .add_int64_axis("total_data_size", {512 * 1024 * 1024, 1024 * 1024 * 1024}) + .add_int64_axis("num_threads", {1, 2, 4, 8}) + .add_int64_axis("num_cols", {4}) .add_int64_axis("run_length", {8}) - // divides into 10 GB exactly 8 times - .add_int64_axis("per_file_data_size", {1280ul * 1024 * 1024}) - .add_int64_axis("total_data_size", {10ul * 1024 * 1024 * 1024}) - .add_int64_axis("num_threads", {2, 4, 8}) - .add_int64_axis("input_limit", {768 * 1024 * 1024}) - .add_int64_axis("output_limit", {512 * 1024 * 1024}); + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); From ad747cbf65b5c4766e516a7b96f709a96844e708 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Mon, 20 May 2024 15:42:38 -0500 Subject: [PATCH 3/3] Formatting. --- .../io/parquet/parquet_reader_multithread.cpp | 85 +++++++++---------- 1 file changed, 41 insertions(+), 44 deletions(-) diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index 4309e9cecd0..fbdcfb0ade9 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -44,10 +44,7 @@ void set_cuio_host_pinned_pool() cudf::io::set_host_memory_resource(*mr); } -size_t get_num_reads(nvbench::state const& state) -{ - return state.get_int64("num_threads"); -} +size_t get_num_reads(nvbench::state const& state) { return state.get_int64("num_threads"); } size_t get_read_size(nvbench::state const& state) { @@ -57,11 +54,11 @@ size_t get_read_size(nvbench::state const& state) std::string get_label(std::string const& test_name, nvbench::state const& state) { - auto const num_cols = state.get_int64("num_cols"); + auto const num_cols = state.get_int64("num_cols"); size_t const read_size_mb = get_read_size(state) / (1024 * 1024); return {test_name + ", " + std::to_string(num_cols) + " columns, " + - std::to_string(state.get_int64("num_threads")) + " threads " + - " (" + std::to_string(read_size_mb) + " MB each)"}; + std::to_string(state.get_int64("num_threads")) + " threads " + " (" + + std::to_string(read_size_mb) + " MB each)"}; } std::tuple, size_t, size_t> write_file_data( @@ -70,9 +67,9 @@ std::tuple, size_t, size_t> write_file_data( cudf::size_type const cardinality = state.get_int64("cardinality"); cudf::size_type const run_length = state.get_int64("run_length"); cudf::size_type const num_cols = state.get_int64("num_cols"); - size_t const num_files = get_num_reads(state); - size_t const per_file_data_size = get_read_size(state); - + size_t const num_files = get_num_reads(state); + size_t const per_file_data_size = get_read_size(state); + std::vector source_sink_vector; size_t total_file_size = 0; @@ -152,8 +149,7 @@ void BM_parquet_multithreaded_read_mixed(nvbench::state& state) auto label = get_label("mixed", state); nvtxRangePushA(label.c_str()); BM_parquet_multithreaded_read_common( - state, - {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); nvtxRangePop(); } @@ -185,8 +181,8 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, std::vector const& d_types, std::string const& label) { - size_t const data_size = state.get_int64("total_data_size"); - auto const num_threads = state.get_int64("num_threads"); + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); size_t const input_limit = state.get_int64("input_limit"); size_t const output_limit = state.get_int64("output_limit"); @@ -200,34 +196,36 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, nvtxRangePushA(("(read) " + label).c_str()); std::vector chunks; - state.exec( - nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) { - auto read_func = [&](int index) { - auto const stream = streams[index % num_threads]; - auto& source_sink = source_sink_vector[index]; - cudf::io::parquet_reader_options read_opts = - cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); - // divide chunk limits by number of threads so the number of chunks produced is the same for all cases. - // this seems better than the alternative, which is to keep the limits the same. if we do that, as the - // number of threads goes up, the number of chunks goes down - so are actually benchmarking the same thing in that case? - auto reader = cudf::io::chunked_parquet_reader(output_limit / num_threads, input_limit / num_threads, read_opts, stream); - - // read all the chunks - do { - auto table = reader.read_chunk(); - } while (reader.has_next()); - }; - - threads.paused = true; - for (size_t i = 0; i < num_files; ++i) { - threads.submit(read_func, i); - } - timer.start(); - threads.paused = false; - threads.wait_for_tasks(); - cudf::detail::join_streams(streams, cudf::get_default_stream()); - timer.stop(); - }); + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + auto& source_sink = source_sink_vector[index]; + cudf::io::parquet_reader_options read_opts = + cudf::io::parquet_reader_options::builder(source_sink.make_source_info()); + // divide chunk limits by number of threads so the number of chunks produced is the + // same for all cases. this seems better than the alternative, which is to keep the + // limits the same. if we do that, as the number of threads goes up, the number of + // chunks goes down - so are actually benchmarking the same thing in that case? + auto reader = cudf::io::chunked_parquet_reader( + output_limit / num_threads, input_limit / num_threads, read_opts, stream); + + // read all the chunks + do { + auto table = reader.read_chunk(); + } while (reader.has_next()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); nvtxRangePop(); auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); @@ -250,8 +248,7 @@ void BM_parquet_multithreaded_read_chunked_fixed_width(nvbench::state& state) { auto label = get_label("mixed", state); nvtxRangePushA(label.c_str()); - BM_parquet_multithreaded_read_chunked_common( - state, {cudf::type_id::INT32}, label); + BM_parquet_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label); nvtxRangePop(); }