Skip to content

Commit

Permalink
Fetch and use BS thread_pool (#408)
Browse files Browse the repository at this point in the history
Instead of hard coding the header of [`BS::thread_pool`](https://github.com/bshoshany/thread-pool?tab=readme-ov-file), we now fetch the implementation. Done for cudf in rapidsai/cudf#16210.

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Kyle Edwards (https://github.com/KyleFromNVIDIA)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #408
  • Loading branch information
madsbk authored Jul 24, 2024
1 parent e7bc8b2 commit 52e0ff5
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 476 deletions.
6 changes: 4 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ else()
endif()

include(cmake/thirdparty/get_nvtx.cmake)
include(cmake/thirdparty/get_thread_pool.cmake)

# library targets
add_library(kvikio INTERFACE)
Expand Down Expand Up @@ -127,8 +128,9 @@ target_include_directories(
kvikio INTERFACE "$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>"
"$<INSTALL_INTERFACE:include>"
)

target_link_libraries(kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp)
target_link_libraries(
kvikio INTERFACE Threads::Threads ${CMAKE_DL_LIBS} nvtx3::nvtx3-cpp BS::thread_pool
)
target_compile_features(kvikio INTERFACE cxx_std_17)

# optionally build examples
Expand Down
25 changes: 25 additions & 0 deletions cpp/cmake/thirdparty/get_thread_pool.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# =============================================================================
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# Need to call rapids_cpm_bs_thread_pool to get support for an installed version of thread-pool and
# to support installing it ourselves
function(find_and_configure_thread_pool)
include(${rapids-cmake-dir}/cpm/bs_thread_pool.cmake)

# Find or install thread-pool
rapids_cpm_bs_thread_pool(BUILD_EXPORT_SET kvikio-exports INSTALL_EXPORT_SET kvikio-exports)

endfunction()

find_and_configure_thread_pool()
12 changes: 5 additions & 7 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,8 +23,9 @@
#include <string>
#include <utility>

#include <BS_thread_pool.hpp>

#include <kvikio/shim/cufile.hpp>
#include <kvikio/thread_pool.hpp>

namespace kvikio {
namespace detail {
Expand Down Expand Up @@ -78,7 +79,7 @@ inline bool getenv_or(std::string_view env_var_name, bool default_val)
*/
class defaults {
private:
kvikio::third_party::thread_pool _thread_pool{get_num_threads_from_env()};
BS::thread_pool _thread_pool{get_num_threads_from_env()};
bool _compat_mode;
std::size_t _task_size;
std::size_t _gds_threshold;
Expand Down Expand Up @@ -166,10 +167,7 @@ class defaults {
*
* @return The the default thread pool instance.
*/
[[nodiscard]] static kvikio::third_party::thread_pool& thread_pool()
{
return instance()->_thread_pool;
}
[[nodiscard]] static BS::thread_pool& thread_pool() { return instance()->_thread_pool; }

/**
* @brief Get the number of threads in the default thread pool.
Expand Down
22 changes: 16 additions & 6 deletions cpp/include/kvikio/parallel_operation.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,18 @@

namespace kvikio {

namespace detail {

template <typename F, typename T>
std::future<std::size_t> submit_task(
F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
{
return defaults::thread_pool().submit_task(
[=] { return op(buf, size, file_offset, devPtr_offset); });
}

} // namespace detail

/**
* @brief Apply read or write operation in parallel.
*
Expand All @@ -52,7 +64,7 @@ std::future<std::size_t> parallel_io(F op,

// Single-task guard
if (task_size >= size || page_size >= size) {
return defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset);
return detail::submit_task(op, buf, size, file_offset, devPtr_offset);
}

// We know an upper bound of the total number of tasks
Expand All @@ -61,16 +73,14 @@ std::future<std::size_t> parallel_io(F op,

// 1) Submit `task_size` sized tasks
while (size >= task_size) {
tasks.push_back(defaults::thread_pool().submit(op, buf, task_size, file_offset, devPtr_offset));
tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
}

// 2) Submit a task for the remainder
if (size > 0) {
tasks.push_back(defaults::thread_pool().submit(op, buf, size, file_offset, devPtr_offset));
}
if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); }

// Finally, we sum the result of all tasks.
auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
Expand Down
Loading

0 comments on commit 52e0ff5

Please sign in to comment.