diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 7ce4bf1d97..f4f3f13109 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -81,6 +81,7 @@ else() endif() include(cmake/thirdparty/get_nvtx.cmake) +include(cmake/thirdparty/get_thread_pool.cmake) # library targets add_library(kvikio INTERFACE) @@ -127,8 +128,9 @@ target_include_directories( kvikio INTERFACE "$" "$" ) - -target_link_libraries(kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp) +target_link_libraries( + kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp BS::thread_pool +) target_compile_features(kvikio INTERFACE cxx_std_17) # optionally build examples diff --git a/cpp/cmake/thirdparty/get_thread_pool.cmake b/cpp/cmake/thirdparty/get_thread_pool.cmake new file mode 100644 index 0000000000..3faf21366a --- /dev/null +++ b/cpp/cmake/thirdparty/get_thread_pool.cmake @@ -0,0 +1,25 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# Need to call rapids_cpm_bs_thread_pool to get support for an installed version of thread-pool and +# to support installing it ourselves +function(find_and_configure_thread_pool) + include(${rapids-cmake-dir}/cpm/bs_thread_pool.cmake) + + # Find or install thread-pool + rapids_cpm_bs_thread_pool(BUILD_EXPORT_SET kvikio-exports INSTALL_EXPORT_SET kvikio-exports) + +endfunction() + +find_and_configure_thread_pool() diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index d2ee6b8d91..c192763ccd 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,9 @@ #include #include +#include + #include -#include namespace kvikio { namespace detail { @@ -78,7 +79,7 @@ inline bool getenv_or(std::string_view env_var_name, bool default_val) */ class defaults { private: - kvikio::third_party::thread_pool _thread_pool{get_num_threads_from_env()}; + BS::thread_pool _thread_pool{get_num_threads_from_env()}; bool _compat_mode; std::size_t _task_size; std::size_t _gds_threshold; @@ -166,10 +167,7 @@ class defaults { * * @return The the default thread pool instance. */ - [[nodiscard]] static kvikio::third_party::thread_pool& thread_pool() - { - return instance()->_thread_pool; - } + [[nodiscard]] static BS::thread_pool& thread_pool() { return instance()->_thread_pool; } /** * @brief Get the number of threads in the default thread pool. diff --git a/cpp/include/kvikio/parallel_operation.hpp b/cpp/include/kvikio/parallel_operation.hpp index 4312e988dc..f345333c4f 100644 --- a/cpp/include/kvikio/parallel_operation.hpp +++ b/cpp/include/kvikio/parallel_operation.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,18 @@ namespace kvikio { +namespace detail { + +template +std::future submit_task( + F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset) +{ + return defaults::thread_pool().submit_task( + [=] { return op(buf, size, file_offset, devPtr_offset); }); +} + +} // namespace detail + /** * @brief Apply read or write operation in parallel. * @@ -52,7 +64,7 @@ std::future parallel_io(F op, // Single-task guard if (task_size >= size || page_size >= size) { - return defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset); + return detail::submit_task(op, buf, size, file_offset, devPtr_offset); } // We know an upper bound of the total number of tasks @@ -61,16 +73,14 @@ std::future parallel_io(F op, // 1) Submit `task_size` sized tasks while (size >= task_size) { - tasks.push_back(defaults::thread_pool().submit(op, buf, task_size, file_offset, devPtr_offset)); + tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset)); file_offset += task_size; devPtr_offset += task_size; size -= task_size; } // 2) Submit a task for the remainder - if (size > 0) { - tasks.push_back(defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset)); - } + if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); } // Finally, we sum the result of all tasks. auto gather_tasks = [](std::vector>&& tasks) -> std::size_t { diff --git a/cpp/include/kvikio/thread_pool.hpp b/cpp/include/kvikio/thread_pool.hpp deleted file mode 100644 index 008f119486..0000000000 --- a/cpp/include/kvikio/thread_pool.hpp +++ /dev/null @@ -1,461 +0,0 @@ -#pragma once - -/** - * @file thread_pool.hpp - * @author Barak Shoshany (baraksh@gmail.com) (http://baraksh.com) - * @version 2.0.0 - * @date 2021-08-14 - * @copyright Copyright (c) 2021 Barak Shoshany. Licensed under the MIT license. If you use this - * library in published research, please cite it as follows: - * - Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", - * doi:10.5281/zenodo.4742687, arXiv:2105.00613 (May 2021) - * - * @brief A C++17 thread pool for high-performance scientific computing. - * @details A modern C++17-compatible thread pool implementation, built from scratch with - * high-performance scientific computing in mind. The thread pool is implemented as a single - * lightweight and self-contained class, and does not have any dependencies other than the C++17 - * standard library, thus allowing a great degree of portability. In particular, this implementation - * does not utilize OpenMP or any other high-level multithreading APIs, and thus gives the - * programmer precise low-level control over the details of the parallelization, which permits more - * robust optimizations. The thread pool was extensively tested on both AMD and Intel CPUs with up - * to 40 cores and 80 threads. Other features include automatic generation of futures and easy - * parallelization of loops. Two helper classes enable synchronizing printing to an output stream by - * different threads and measuring execution time for benchmarking purposes. Please visit the GitHub - * repository at https://github.com/bshoshany/thread-pool for documentation and updates, or to - * submit feature requests and bug reports. - */ - -#define THREAD_POOL_VERSION "v2.0.0 (2021-08-14)" - -#include // std::atomic -#include // std::chrono -#include // std::int_fast64_t, std::uint_fast32_t -#include // std::function -#include // std::future, std::promise -#include // std::cout, std::ostream -#include // std::shared_ptr, std::unique_ptr -#include // std::mutex, std::scoped_lock -#include // std::queue -#include // std::this_thread, std::thread -#include // std::common_type_t, std::decay_t, std::enable_if_t, std::is_void_v, std::invoke_result_t -#include // std::move - -// ============================================================================================= // -// Begin class thread_pool // - -namespace kvikio::third_party { - -/** - * @brief A C++17 thread pool class. The user submits tasks to be executed into a queue. Whenever a - * thread becomes available, it pops a task from the queue and executes it. Each task is - * automatically assigned a future, which can be used to wait for the task to finish executing - * and/or obtain its eventual return value. - */ -class thread_pool { - typedef std::uint_fast32_t ui32; - typedef std::uint_fast64_t ui64; - - public: - // ============================ - // Constructors and destructors - // ============================ - - /** - * @brief Construct a new thread pool. - * - * @param _thread_count The number of threads to use. The default value is the total number of - * hardware threads available, as reported by the implementation. With a hyperthreaded CPU, this - * will be twice the number of CPU cores. If the argument is zero, the default value will be used - * instead. - */ - thread_pool(const ui32& _thread_count = std::thread::hardware_concurrency()) - : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()), - threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()]) - { - create_threads(); - } - - /** - * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. - * Note that if the variable paused is set to true, then any tasks still in the queue will never - * be executed. - */ - ~thread_pool() - { - wait_for_tasks(); - running = false; - destroy_threads(); - } - - // ======================= - // Public member functions - // ======================= - - /** - * @brief Get the number of tasks currently waiting in the queue to be executed by the threads. - * - * @return The number of queued tasks. - */ - ui64 get_tasks_queued() const - { - const std::scoped_lock lock(queue_mutex); - return tasks.size(); - } - - /** - * @brief Get the number of tasks currently being executed by the threads. - * - * @return The number of running tasks. - */ - ui32 get_tasks_running() const { return tasks_total - (ui32)get_tasks_queued(); } - - /** - * @brief Get the total number of unfinished tasks - either still in the queue, or running in a - * thread. - * - * @return The total number of tasks. - */ - ui32 get_tasks_total() const { return tasks_total; } - - /** - * @brief Get the number of threads in the pool. - * - * @return The number of threads. - */ - ui32 get_thread_count() const { return thread_count; } - - /** - * @brief Parallelize a loop by splitting it into blocks, submitting each block separately to the - * thread pool, and waiting for all blocks to finish executing. The user supplies a loop function, - * which will be called once per block and should iterate over the block's range. - * - * @tparam T1 The type of the first index in the loop. Should be a signed or unsigned integer. - * @tparam T2 The type of the index after the last index in the loop. Should be a signed or - * unsigned integer. If T1 is not the same as T2, a common type will be automatically inferred. - * @tparam F The type of the function to loop through. - * @param first_index The first index in the loop. - * @param index_after_last The index after the last index in the loop. The loop will iterate from - * first_index to (index_after_last - 1) inclusive. In other words, it will be equivalent to "for - * (T i = first_index; i < index_after_last; i++)". Note that if first_index == index_after_last, - * the function will terminate without doing anything. - * @param loop The function to loop through. Will be called once per block. Should take exactly - * two arguments: the first index in the block and the index after the last index in the block. - * loop(start, end) should typically involve a loop of the form "for (T i = start; i < end; i++)". - * @param num_blocks The maximum number of blocks to split the loop into. The default is to use - * the number of threads in the pool. - */ - template - void parallelize_loop(const T1& first_index, - const T2& index_after_last, - const F& loop, - ui32 num_blocks = 0) - { - typedef std::common_type_t T; - T the_first_index = (T)first_index; - T last_index = (T)index_after_last; - if (the_first_index == last_index) return; - if (last_index < the_first_index) { - T temp = last_index; - last_index = the_first_index; - the_first_index = temp; - } - last_index--; - if (num_blocks == 0) num_blocks = thread_count; - ui64 total_size = (ui64)(last_index - the_first_index + 1); - ui64 block_size = (ui64)(total_size / num_blocks); - if (block_size == 0) { - block_size = 1; - num_blocks = (ui32)total_size > 1 ? (ui32)total_size : 1; - } - std::atomic blocks_running = 0; - for (ui32 t = 0; t < num_blocks; t++) { - T start = ((T)(t * block_size) + the_first_index); - T end = - (t == num_blocks - 1) ? last_index + 1 : ((T)((t + 1) * block_size) + the_first_index); - blocks_running++; - push_task([start, end, &loop, &blocks_running] { - loop(start, end); - blocks_running--; - }); - } - while (blocks_running != 0) { - sleep_or_yield(); - } - } - - /** - * @brief Push a function with no arguments or return value into the task queue. - * - * @tparam F The type of the function. - * @param task The function to push. - */ - template - void push_task(const F& task) - { - tasks_total++; - { - const std::scoped_lock lock(queue_mutex); - tasks.push(std::function(task)); - } - } - - /** - * @brief Push a function with arguments, but no return value, into the task queue. - * @details The function is wrapped inside a lambda in order to hide the arguments, as the tasks - * in the queue must be of type std::function, so they cannot have any arguments or return - * value. If no arguments are provided, the other overload will be used, in order to avoid the - * (slight) overhead of using a lambda. - * - * @tparam F The type of the function. - * @tparam A The types of the arguments. - * @param task The function to push. - * @param args The arguments to pass to the function. - */ - template - void push_task(const F& task, const A&... args) - { - push_task([task, args...] { task(args...); }); - } - - /** - * @brief Reset the number of threads in the pool. Waits for all currently running tasks to be - * completed, then destroys all threads in the pool and creates a new thread pool with the new - * number of threads. Any tasks that were waiting in the queue before the pool was reset will then - * be executed by the new threads. If the pool was paused before resetting it, the new pool will - * be paused as well. - * - * @param _thread_count The number of threads to use. The default value is the total number of - * hardware threads available, as reported by the implementation. With a hyperthreaded CPU, this - * will be twice the number of CPU cores. If the argument is zero, the default value will be used - * instead. - */ - void reset(const ui32& _thread_count = std::thread::hardware_concurrency()) - { - bool was_paused = paused; - paused = true; - wait_for_tasks(); - running = false; - destroy_threads(); - thread_count = _thread_count ? _thread_count : std::thread::hardware_concurrency(); - threads.reset(new std::thread[thread_count]); - paused = was_paused; - running = true; - create_threads(); - } - - /** - * @brief Submit a function with zero or more arguments and no return value into the task queue, - * and get an std::future that will be set to true upon completion of the task. - * - * @tparam F The type of the function. - * @tparam A The types of the zero or more arguments to pass to the function. - * @param task The function to submit. - * @param args The zero or more arguments to pass to the function. - * @return A future to be used later to check if the function has finished its execution. - */ - template , std::decay_t...>>>> - std::future submit(const F& task, const A&... args) - { - std::shared_ptr> task_promise(new std::promise); - std::future future = task_promise->get_future(); - push_task([task, args..., task_promise] { - try { - task(args...); - task_promise->set_value(true); - } catch (...) { - try { - task_promise->set_exception(std::current_exception()); - } catch (...) { - } - } - }); - return future; - } - - /** - * @brief Submit a function with zero or more arguments and a return value into the task queue, - * and get a future for its eventual returned value. - * - * @tparam F The type of the function. - * @tparam A The types of the zero or more arguments to pass to the function. - * @tparam R The return type of the function. - * @param task The function to submit. - * @param args The zero or more arguments to pass to the function. - * @return A future to be used later to obtain the function's returned value, waiting for it to - * finish its execution if needed. - */ - template , std::decay_t...>, - typename = std::enable_if_t>> - std::future submit(const F& task, const A&... args) - { - std::shared_ptr> task_promise(new std::promise); - std::future future = task_promise->get_future(); - push_task([task, args..., task_promise] { - try { - task_promise->set_value(task(args...)); - } catch (...) { - try { - task_promise->set_exception(std::current_exception()); - } catch (...) { - } - } - }); - return future; - } - - /** - * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those - * that are currently running in the threads and those that are still waiting in the queue. - * However, if the variable paused is set to true, this function only waits for the currently - * running tasks (otherwise it would wait forever). To wait for a specific task, use submit() - * instead, and call the wait() member function of the generated future. - */ - void wait_for_tasks() - { - while (true) { - if (!paused) { - if (tasks_total == 0) break; - } else { - if (get_tasks_running() == 0) break; - } - sleep_or_yield(); - } - } - - // =========== - // Public data - // =========== - - /** - * @brief An atomic variable indicating to the workers to pause. When set to true, the workers - * temporarily stop popping new tasks out of the queue, although any tasks already executed will - * keep running until they are done. Set to false again to resume popping tasks. - */ - std::atomic paused = false; - - /** - * @brief The duration, in microseconds, that the worker function should sleep for when it cannot - * find any tasks in the queue. If set to 0, then instead of sleeping, the worker function will - * execute std::this_thread::yield() if there are no tasks in the queue. The default value is - * 1000. - */ - ui32 sleep_duration = 1000; - - private: - // ======================== - // Private member functions - // ======================== - - /** - * @brief Create the threads in the pool and assign a worker to each thread. - */ - void create_threads() - { - for (ui32 i = 0; i < thread_count; i++) { - threads[i] = std::thread(&thread_pool::worker, this); - } - } - - /** - * @brief Destroy the threads in the pool by joining them. - */ - void destroy_threads() - { - for (ui32 i = 0; i < thread_count; i++) { - threads[i].join(); - } - } - - /** - * @brief Try to pop a new task out of the queue. - * - * @param task A reference to the task. Will be populated with a function if the queue is not - * empty. - * @return true if a task was found, false if the queue is empty. - */ - bool pop_task(std::function& task) - { - const std::scoped_lock lock(queue_mutex); - if (tasks.empty()) - return false; - else { - task = std::move(tasks.front()); - tasks.pop(); - return true; - } - } - - /** - * @brief Sleep for sleep_duration microseconds. If that variable is set to zero, yield instead. - * - */ - void sleep_or_yield() - { - if (sleep_duration) - std::this_thread::sleep_for(std::chrono::microseconds(sleep_duration)); - else - std::this_thread::yield(); - } - - /** - * @brief A worker function to be assigned to each thread in the pool. Continuously pops tasks out - * of the queue and executes them, as long as the atomic variable running is set to true. - */ - void worker() - { - while (running) { - std::function task; - if (!paused && pop_task(task)) { - task(); - tasks_total--; - } else { - sleep_or_yield(); - } - } - } - - // ============ - // Private data - // ============ - - /** - * @brief A mutex to synchronize access to the task queue by different threads. - */ - mutable std::mutex queue_mutex = {}; - - /** - * @brief An atomic variable indicating to the workers to keep running. When set to false, the - * workers permanently stop working. - */ - std::atomic running = true; - - /** - * @brief A queue of tasks to be executed by the threads. - */ - std::queue> tasks = {}; - - /** - * @brief The number of threads in the pool. - */ - ui32 thread_count; - - /** - * @brief A smart pointer to manage the memory allocated for the threads. - */ - std::unique_ptr threads; - - /** - * @brief An atomic variable to keep track of the total number of unfinished tasks - either still - * in the queue, or running in a thread. - */ - std::atomic tasks_total = 0; -}; -} // namespace kvikio::third_party - -// End class thread_pool // -// ============================================================================================= //