From 673374ee6269b8eb52baeebc98369f530ae9d66f Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 8 Jul 2024 08:19:11 +0000 Subject: [PATCH 1/8] Vendor thread pool implementation directly via rapids_cpm_find --- cpp/CMakeLists.txt | 7 +- .../io/orc/orc_reader_multithreaded.cpp | 6 +- .../io/parquet/parquet_reader_multithread.cpp | 6 +- cpp/cmake/thirdparty/get_thread_pool.cmake | 28 ++ cpp/include/cudf/utilities/thread_pool.hpp | 381 ------------------ cpp/src/io/utilities/file_io_utilities.cpp | 2 +- cpp/src/io/utilities/file_io_utilities.hpp | 7 +- 7 files changed, 44 insertions(+), 393 deletions(-) create mode 100644 cpp/cmake/thirdparty/get_thread_pool.cmake delete mode 100644 cpp/include/cudf/utilities/thread_pool.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 2811711d58c..8e3f8dd2927 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -216,6 +216,8 @@ include(cmake/thirdparty/get_fmt.cmake) include(cmake/thirdparty/get_spdlog.cmake) # find nanoarrow include(cmake/thirdparty/get_nanoarrow.cmake) +# find thread_pool +include(cmake/thirdparty/get_thread_pool.cmake) # Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved if(NOT BUILD_SHARED_LIBS) @@ -796,6 +798,9 @@ target_compile_definitions(cudf PRIVATE "RMM_LOGGING_LEVEL=LIBCUDF_LOGGING_LEVEL # Define spdlog level target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${LIBCUDF_LOGGING_LEVEL}") +# Enable pause functions in thread_pool implementation +target_compile_definitions(cudf PRIVATE "BS_THREAD_POOL_ENABLE_PAUSE=1") + # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) @@ -804,7 +809,7 @@ target_link_libraries( cudf PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm PRIVATE $ cuco::cuco ZLIB::ZLIB nvcomp::nvcomp - kvikio::kvikio $ nanoarrow + kvikio::kvikio $ nanoarrow BS_thread_pool ) # Add Conda library, and include paths if specified diff --git a/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp index aa0ee39a179..43c553955bb 100644 --- a/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp +++ b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp @@ -24,8 +24,8 @@ #include #include #include -#include +#include #include #include @@ -90,7 +90,7 @@ void BM_orc_multithreaded_read_common(nvbench::state& state, auto const num_threads = state.get_int64("num_threads"); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); - cudf::detail::thread_pool threads(num_threads); + BS::thread_pool threads(num_threads); auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); std::vector source_info_vector; @@ -170,7 +170,7 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state, size_t const output_limit = state.get_int64("output_limit"); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); - cudf::detail::thread_pool threads(num_threads); + BS::thread_pool threads(num_threads); auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); std::vector source_info_vector; std::transform(source_sink_vector.begin(), diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index b4c8ed78ed8..a8a3fb7e553 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -23,10 +23,10 @@ #include #include #include -#include #include +#include #include #include @@ -93,7 +93,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, auto const num_threads = state.get_int64("num_threads"); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); - cudf::detail::thread_pool threads(num_threads); + BS::thread_pool threads(num_threads); auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); std::vector source_info_vector; @@ -176,7 +176,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, size_t const output_limit = state.get_int64("output_limit"); auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); - cudf::detail::thread_pool threads(num_threads); + BS::thread_pool threads(num_threads); auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); std::vector source_info_vector; std::transform(source_sink_vector.begin(), diff --git a/cpp/cmake/thirdparty/get_thread_pool.cmake b/cpp/cmake/thirdparty/get_thread_pool.cmake new file mode 100644 index 00000000000..e61e24505fa --- /dev/null +++ b/cpp/cmake/thirdparty/get_thread_pool.cmake @@ -0,0 +1,28 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# This function finds rmm and sets any additional necessary environment variables. +function(find_and_configure_thread_pool) + rapids_cpm_find( + BS_thread_pool 4.1.0 + CPM_ARGS + GIT_REPOSITORY https://github.com/bshoshany/thread-pool.git + GIT_TAG 097aa718f25d44315cadb80b407144ad455ee4f9 + GIT_SHALLOW TRUE + ) + add_library(BS_thread_pool INTERFACE) + target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include) +endfunction() + +find_and_configure_thread_pool() diff --git a/cpp/include/cudf/utilities/thread_pool.hpp b/cpp/include/cudf/utilities/thread_pool.hpp deleted file mode 100644 index c8c3eb097c4..00000000000 --- a/cpp/include/cudf/utilities/thread_pool.hpp +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -/** - * Modified from https://github.com/bshoshany/thread-pool - * @copyright Copyright (c) 2021 Barak Shoshany. Licensed under the MIT license. - * See file LICENSE for detail or copy at https://opensource.org/licenses/MIT - */ - -#include // std::atomic -#include // std::chrono -#include // std::int_fast64_t, std::uint_fast32_t -#include // std::function -#include // std::future, std::promise -#include // std::shared_ptr, std::unique_ptr -#include // std::mutex, std::scoped_lock -#include // std::queue -#include // std::this_thread, std::thread -#include // std::decay_t, std::enable_if_t, std::is_void_v, std::invoke_result_t -#include // std::move, std::swap - -namespace cudf { -namespace detail { - -/** - * @brief A C++17 thread pool class. The user submits tasks to be executed into a queue. Whenever a - * thread becomes available, it pops a task from the queue and executes it. Each task is - * automatically assigned a future, which can be used to wait for the task to finish executing - * and/or obtain its eventual return value. - */ -class thread_pool { - using ui32 = int; - - public: - /** - * @brief Construct a new thread pool. - * - * @param _thread_count The number of threads to use. The default value is the total number of - * hardware threads available, as reported by the implementation. With a hyperthreaded CPU, this - * will be twice the number of CPU cores. If the argument is zero, the default value will be used - * instead. - */ - thread_pool(ui32 const& _thread_count = std::thread::hardware_concurrency()) - : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()), - threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()]) - { - create_threads(); - } - - /** - * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. - * Note that if the variable paused is set to true, then any tasks still in the queue will never - * be executed. - */ - ~thread_pool() - { - wait_for_tasks(); - running = false; - destroy_threads(); - } - - /** - * @brief Get the number of tasks currently waiting in the queue to be executed by the threads. - * - * @return The number of queued tasks. - */ - [[nodiscard]] size_t get_tasks_queued() const - { - std::scoped_lock const lock(queue_mutex); - return tasks.size(); - } - - /** - * @brief Get the number of tasks currently being executed by the threads. - * - * @return The number of running tasks. - */ - [[nodiscard]] ui32 get_tasks_running() const { return tasks_total - (ui32)get_tasks_queued(); } - - /** - * @brief Get the total number of unfinished tasks - either still in the queue, or running in a - * thread. - * - * @return The total number of tasks. - */ - [[nodiscard]] ui32 get_tasks_total() const { return tasks_total; } - - /** - * @brief Get the number of threads in the pool. - * - * @return The number of threads. - */ - [[nodiscard]] ui32 get_thread_count() const { return thread_count; } - - /** - * @brief Parallelize a loop by splitting it into blocks, submitting each block separately to the - * thread pool, and waiting for all blocks to finish executing. The loop will be equivalent to: - * for (T i = first_index; i <= last_index; i++) loop(i); - * - * @tparam T The type of the loop index. Should be a signed or unsigned integer. - * @tparam F The type of the function to loop through. - * @param first_index The first index in the loop (inclusive). - * @param last_index The last index in the loop (inclusive). - * @param loop The function to loop through. Should take exactly one argument, the loop index. - * @param num_tasks The maximum number of tasks to split the loop into. The default is to use the - * number of threads in the pool. - */ - template - void parallelize_loop(T first_index, T last_index, F const& loop, ui32 num_tasks = 0) - { - if (num_tasks == 0) num_tasks = thread_count; - if (last_index < first_index) std::swap(last_index, first_index); - size_t total_size = last_index - first_index + 1; - size_t block_size = total_size / num_tasks; - if (block_size == 0) { - block_size = 1; - num_tasks = (ui32)total_size > 1 ? (ui32)total_size : 1; - } - std::atomic blocks_running = 0; - for (ui32 t = 0; t < num_tasks; t++) { - T start = (T)(t * block_size + first_index); - T end = (t == num_tasks - 1) ? last_index : (T)((t + 1) * block_size + first_index - 1); - blocks_running++; - push_task([start, end, &loop, &blocks_running] { - for (T i = start; i <= end; i++) - loop(i); - blocks_running--; - }); - } - while (blocks_running != 0) { - sleep_or_yield(); - } - } - - /** - * @brief Push a function with no arguments or return value into the task queue. - * - * @tparam F The type of the function. - * @param task The function to push. - */ - template - void push_task(F const& task) - { - tasks_total++; - { - std::scoped_lock const lock(queue_mutex); - tasks.push(std::function(task)); - } - } - - /** - * @brief Push a function with arguments, but no return value, into the task queue. - * @details The function is wrapped inside a lambda in order to hide the arguments, as the tasks - * in the queue must be of type std::function, so they cannot have any arguments or return - * value. If no arguments are provided, the other overload will be used, in order to avoid the - * (slight) overhead of using a lambda. - * - * @tparam F The type of the function. - * @tparam A The types of the arguments. - * @param task The function to push. - * @param args The arguments to pass to the function. - */ - template - void push_task(F const& task, A const&... args) - { - push_task([task, args...] { task(args...); }); - } - - /** - * @brief Reset the number of threads in the pool. Waits for all currently running tasks to be - * completed, then destroys all threads in the pool and creates a new thread pool with the new - * number of threads. Any tasks that were waiting in the queue before the pool was reset will then - * be executed by the new threads. If the pool was paused before resetting it, the new pool will - * be paused as well. - * - * @param _thread_count The number of threads to use. The default value is the total number of - * hardware threads available, as reported by the implementation. With a hyperthreaded CPU, this - * will be twice the number of CPU cores. If the argument is zero, the default value will be used - * instead. - */ - void reset(ui32 const& _thread_count = std::thread::hardware_concurrency()) - { - bool was_paused = paused; - paused = true; - wait_for_tasks(); - running = false; - destroy_threads(); - thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency(); - threads = std::make_unique(thread_count); - paused = was_paused; - create_threads(); - running = true; - } - - /** - * @brief Submit a function with zero or more arguments and a return value into the task queue, - * and get a future for its eventual returned value. - * - * @tparam F The type of the function. - * @tparam A The types of the zero or more arguments to pass to the function. - * @tparam R The return type of the function. - * @param task The function to submit. - * @param args The zero or more arguments to pass to the function. - * @return A future to be used later to obtain the function's returned value, waiting for it to - * finish its execution if needed. - */ - template , std::decay_t...>> - std::future submit(F const& task, A const&... args) - { - std::shared_ptr> promise(new std::promise); - std::future future = promise->get_future(); - push_task([task, args..., promise] { - try { - if constexpr (std::is_void_v) { - task(args...); - promise->set_value(); - } else { - promise->set_value(task(args...)); - } - } catch (...) { - promise->set_exception(std::current_exception()); - }; - }); - return future; - } - - /** - * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those - * that are currently running in the threads and those that are still waiting in the queue. - * However, if the variable paused is set to true, this function only waits for the currently - * running tasks (otherwise it would wait forever). To wait for a specific task, use submit() - * instead, and call the wait() member function of the generated future. - */ - void wait_for_tasks() - { - while (true) { - if (!paused) { - if (tasks_total == 0) break; - } else { - if (get_tasks_running() == 0) break; - } - sleep_or_yield(); - } - } - - /** - * @brief An atomic variable indicating to the workers to pause. When set to true, the workers - * temporarily stop popping new tasks out of the queue, although any tasks already executed will - * keep running until they are done. Set to false again to resume popping tasks. - */ - std::atomic paused = false; - - /** - * @brief The duration, in microseconds, that the worker function should sleep for when it cannot - * find any tasks in the queue. If set to 0, then instead of sleeping, the worker function will - * execute std::this_thread::yield() if there are no tasks in the queue. The default value is - * 1000. - */ - ui32 sleep_duration = 1000; - - private: - /** - * @brief Create the threads in the pool and assign a worker to each thread. - */ - void create_threads() - { - for (ui32 i = 0; i < thread_count; i++) { - threads[i] = std::thread(&thread_pool::worker, this); - } - } - - /** - * @brief Destroy the threads in the pool by joining them. - */ - void destroy_threads() - { - for (ui32 i = 0; i < thread_count; i++) { - threads[i].join(); - } - } - - /** - * @brief Try to pop a new task out of the queue. - * - * @param task A reference to the task. Will be populated with a function if the queue is not - * empty. - * @return true if a task was found, false if the queue is empty. - */ - bool pop_task(std::function& task) - { - std::scoped_lock const lock(queue_mutex); - if (tasks.empty()) - return false; - else { - task = std::move(tasks.front()); - tasks.pop(); - return true; - } - } - - /** - * @brief Sleep for sleep_duration microseconds. If that variable is set to zero, yield instead. - * - */ - void sleep_or_yield() - { - if (sleep_duration) - std::this_thread::sleep_for(std::chrono::microseconds(sleep_duration)); - else - std::this_thread::yield(); - } - - /** - * @brief A worker function to be assigned to each thread in the pool. Continuously pops tasks out - * of the queue and executes them, as long as the atomic variable running is set to true. - */ - void worker() - { - while (running) { - std::function task; - if (!paused && pop_task(task)) { - task(); - tasks_total--; - } else { - sleep_or_yield(); - } - } - } - - /** - * @brief A mutex to synchronize access to the task queue by different threads. - */ - mutable std::mutex queue_mutex; - - /** - * @brief An atomic variable indicating to the workers to keep running. When set to false, the - * workers permanently stop working. - */ - std::atomic running = true; - - /** - * @brief A queue of tasks to be executed by the threads. - */ - std::queue> tasks; - - /** - * @brief The number of threads in the pool. - */ - ui32 thread_count; - - /** - * @brief A smart pointer to manage the memory allocated for the threads. - */ - std::unique_ptr threads; - - /** - * @brief An atomic variable to keep track of the total number of unfinished tasks - either still - * in the queue, or running in a thread. - */ - std::atomic tasks_total = 0; -}; - -} // namespace detail -} // namespace cudf diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index a9d4f19c848..fb4baffec09 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -230,7 +230,7 @@ template > std::vector> make_sliced_tasks( - F function, DataT* ptr, size_t offset, size_t size, cudf::detail::thread_pool& pool) + F function, DataT* ptr, size_t offset, size_t size, BS::thread_pool& pool) { constexpr size_t default_max_slice_size = 4 * 1024 * 1024; static auto const max_slice_size = getenv_or("LIBCUDF_CUFILE_SLICE_SIZE", default_max_slice_size); diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 91ef41fba6e..441bede200d 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -19,8 +19,7 @@ #ifdef CUFILE_FOUND #include -#include - +#include #include #endif @@ -150,7 +149,7 @@ class cufile_input_impl final : public cufile_input { private: cufile_shim const* shim = nullptr; cufile_registered_file const cf_file; - cudf::detail::thread_pool pool; + BS::thread_pool pool; }; /** @@ -167,7 +166,7 @@ class cufile_output_impl final : public cufile_output { private: cufile_shim const* shim = nullptr; cufile_registered_file const cf_file; - cudf::detail::thread_pool pool; + BS::thread_pool pool; }; #else From 23e8aaa0d2b541bdbc748ccb0904383f91541507 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 8 Jul 2024 09:21:06 +0000 Subject: [PATCH 2/8] Update usage of thread pool for new vendored implementation --- .../io/orc/orc_reader_multithreaded.cpp | 20 ++++++++----------- .../io/parquet/parquet_reader_multithread.cpp | 20 ++++++++----------- cpp/src/io/utilities/file_io_utilities.cpp | 4 ++-- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp index 43c553955bb..e91bf06fdfa 100644 --- a/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp +++ b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp @@ -112,13 +112,11 @@ void BM_orc_multithreaded_read_common(nvbench::state& state, cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource()); }; - threads.paused = true; - for (size_t i = 0; i < num_files; ++i) { - threads.submit(read_func, i); - } + threads.pause(); + threads.detach_sequence(decltype(num_files){0}, num_files, read_func); timer.start(); - threads.paused = false; - threads.wait_for_tasks(); + threads.unpause(); + threads.wait(); cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); @@ -203,13 +201,11 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state, } while (reader.has_next()); }; - threads.paused = true; - for (size_t i = 0; i < num_files; ++i) { - threads.submit(read_func, i); - } + threads.pause(); + threads.detach_sequence(decltype(num_files){0}, num_files, read_func); timer.start(); - threads.paused = false; - threads.wait_for_tasks(); + threads.unpause(); + threads.wait(); cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); diff --git a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp index a8a3fb7e553..9e76ebb71ab 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp @@ -114,13 +114,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state, cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource()); }; - threads.paused = true; - for (size_t i = 0; i < num_files; ++i) { - threads.submit(read_func, i); - } + threads.pause(); + threads.detach_sequence(decltype(num_files){0}, num_files, read_func); timer.start(); - threads.paused = false; - threads.wait_for_tasks(); + threads.unpause(); + threads.wait(); cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); @@ -207,13 +205,11 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state, } while (reader.has_next()); }; - threads.paused = true; - for (size_t i = 0; i < num_files; ++i) { - threads.submit(read_func, i); - } + threads.pause(); + threads.detach_sequence(decltype(num_files){0}, num_files, read_func); timer.start(); - threads.paused = false; - threads.wait_for_tasks(); + threads.unpause(); + threads.wait(); cudf::detail::join_streams(streams, cudf::get_default_stream()); timer.stop(); }); diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index fb4baffec09..036e647e275 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -221,7 +221,6 @@ cufile_input_impl::cufile_input_impl(std::string const& filepath) // The benefit from multithreaded read plateaus around 16 threads pool(getenv_or("LIBCUDF_CUFILE_THREAD_COUNT", 16)) { - pool.sleep_duration = 10; } namespace { @@ -237,7 +236,8 @@ std::vector> make_sliced_tasks( auto const slices = make_file_io_slices(size, max_slice_size); std::vector> slice_tasks; std::transform(slices.cbegin(), slices.cend(), std::back_inserter(slice_tasks), [&](auto& slice) { - return pool.submit(function, ptr + slice.offset, slice.size, offset + slice.offset); + return pool.submit_task( + [&] { return function(ptr + slice.offset, slice.size, offset + slice.offset); }); }); return slice_tasks; } From beb835e63e9c2630a7cbea8ac17f680bd43ca4b9 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Wed, 10 Jul 2024 15:41:59 +0000 Subject: [PATCH 3/8] Cargo-cult probable cmake fix --- cpp/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 8e3f8dd2927..cda6011b075 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -807,9 +807,9 @@ add_dependencies(cudf jitify_preprocess_run) # Specify the target module library dependencies target_link_libraries( cudf - PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm + PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm $ PRIVATE $ cuco::cuco ZLIB::ZLIB nvcomp::nvcomp - kvikio::kvikio $ nanoarrow BS_thread_pool + kvikio::kvikio $ nanoarrow ) # Add Conda library, and include paths if specified From 42730817b396bf904a4fed768a113224d132b501 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Wed, 10 Jul 2024 15:46:52 +0000 Subject: [PATCH 4/8] Adapt groupby-max benchmark to new thread pool --- cpp/benchmarks/groupby/group_max_multithreaded.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/cpp/benchmarks/groupby/group_max_multithreaded.cpp b/cpp/benchmarks/groupby/group_max_multithreaded.cpp index 3b8faba618f..cb4de1dfab8 100644 --- a/cpp/benchmarks/groupby/group_max_multithreaded.cpp +++ b/cpp/benchmarks/groupby/group_max_multithreaded.cpp @@ -20,8 +20,8 @@ #include #include #include -#include +#include #include template @@ -58,7 +58,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list> requests(num_threads); for (auto& thread_requests : requests) { @@ -75,9 +75,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list Date: Wed, 10 Jul 2024 18:48:43 +0100 Subject: [PATCH 5/8] Thinko Co-authored-by: Srinivas Yadav <43375352+srinivasyadav18@users.noreply.github.com> --- cpp/benchmarks/groupby/group_max_multithreaded.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/groupby/group_max_multithreaded.cpp b/cpp/benchmarks/groupby/group_max_multithreaded.cpp index cb4de1dfab8..69fa2cdb43d 100644 --- a/cpp/benchmarks/groupby/group_max_multithreaded.cpp +++ b/cpp/benchmarks/groupby/group_max_multithreaded.cpp @@ -76,7 +76,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list Date: Thu, 11 Jul 2024 13:38:07 +0000 Subject: [PATCH 6/8] Turn on pause when finding the library --- cpp/CMakeLists.txt | 3 --- cpp/cmake/thirdparty/get_thread_pool.cmake | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 1f0462ee685..65347bd6689 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -800,9 +800,6 @@ target_compile_definitions(cudf PRIVATE "RMM_LOGGING_LEVEL=LIBCUDF_LOGGING_LEVEL # Define spdlog level target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${LIBCUDF_LOGGING_LEVEL}") -# Enable pause functions in thread_pool implementation -target_compile_definitions(cudf PRIVATE "BS_THREAD_POOL_ENABLE_PAUSE=1") - # Compile stringified JIT sources first add_dependencies(cudf jitify_preprocess_run) diff --git a/cpp/cmake/thirdparty/get_thread_pool.cmake b/cpp/cmake/thirdparty/get_thread_pool.cmake index e61e24505fa..18e56d50a6e 100644 --- a/cpp/cmake/thirdparty/get_thread_pool.cmake +++ b/cpp/cmake/thirdparty/get_thread_pool.cmake @@ -23,6 +23,7 @@ function(find_and_configure_thread_pool) ) add_library(BS_thread_pool INTERFACE) target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include) + target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1") endfunction() find_and_configure_thread_pool() From c732790636f423acc7e7507db77bf1167d9eff9e Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 11 Jul 2024 14:49:58 +0100 Subject: [PATCH 7/8] pragma once for cmake find. Co-authored-by: Robert Maynard --- cpp/cmake/thirdparty/get_thread_pool.cmake | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cpp/cmake/thirdparty/get_thread_pool.cmake b/cpp/cmake/thirdparty/get_thread_pool.cmake index 18e56d50a6e..264257c7199 100644 --- a/cpp/cmake/thirdparty/get_thread_pool.cmake +++ b/cpp/cmake/thirdparty/get_thread_pool.cmake @@ -21,9 +21,11 @@ function(find_and_configure_thread_pool) GIT_TAG 097aa718f25d44315cadb80b407144ad455ee4f9 GIT_SHALLOW TRUE ) - add_library(BS_thread_pool INTERFACE) - target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include) - target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1") + if(NOT TARGET BS_thread_pool) + add_library(BS_thread_pool INTERFACE) + target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include) + target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1") + endif() endfunction() find_and_configure_thread_pool() From 6e81007b2e5593090a913fd3ae62555c0b1a578e Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Thu, 11 Jul 2024 17:09:51 +0100 Subject: [PATCH 8/8] Typo --- cpp/benchmarks/groupby/group_max_multithreaded.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/benchmarks/groupby/group_max_multithreaded.cpp b/cpp/benchmarks/groupby/group_max_multithreaded.cpp index 69fa2cdb43d..bf1a1a5fcf7 100644 --- a/cpp/benchmarks/groupby/group_max_multithreaded.cpp +++ b/cpp/benchmarks/groupby/group_max_multithreaded.cpp @@ -76,7 +76,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list