Skip to content

Commit

Permalink
Update vendored thread_pool implementation (#16210)
Browse files Browse the repository at this point in the history
Since we introduced the vendored thread_pool in #8752, upstream has introduced some new features, and particularly now uses condition variables/notification to handle when there are no tasks in the queue. This avoids the issue described in #16209 where the thread pool by default artificially introduces a delay of 1000microseconds to all tasks whenever the task queue is emptied.

- Closes #16209

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Robert Maynard (https://github.com/robertmaynard)

URL: #16210
  • Loading branch information
wence- authored Jul 19, 2024
1 parent 4acca4d commit debbef0
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 425 deletions.
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ include(cmake/thirdparty/get_fmt.cmake)
include(cmake/thirdparty/get_spdlog.cmake)
# find nanoarrow
include(cmake/thirdparty/get_nanoarrow.cmake)
# find thread_pool
include(cmake/thirdparty/get_thread_pool.cmake)

# Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved
if(NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -804,7 +806,7 @@ add_dependencies(cudf jitify_preprocess_run)
# Specify the target module library dependencies
target_link_libraries(
cudf
PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm
PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm $<BUILD_LOCAL_INTERFACE:BS_thread_pool>
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp> cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
kvikio::kvikio $<TARGET_NAME_IF_EXISTS:cuFile_interface> nanoarrow
)
Expand Down
10 changes: 4 additions & 6 deletions cpp/benchmarks/groupby/group_max_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/groupby.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

template <typename Type>
Expand Down Expand Up @@ -58,7 +58,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list<T
auto gb_obj = cudf::groupby::groupby(cudf::table_view({keys_view, keys_view, keys_view}));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

std::vector<std::vector<cudf::groupby::aggregation_request>> requests(num_threads);
for (auto& thread_requests : requests) {
Expand All @@ -75,10 +75,8 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list<T
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
auto perform_agg = [&](int64_t index) { gb_obj.aggregate(requests[index], streams[index]); };
timer.start();
for (int64_t i = 0; i < num_threads; ++i) {
threads.submit(perform_agg, i);
}
threads.wait_for_tasks();
threads.detach_sequence(decltype(num_threads){0}, num_threads, perform_agg);
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
cudf::get_default_stream().synchronize();
timer.stop();
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include <cudf/io/orc.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -90,7 +90,7 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -112,13 +112,11 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -170,7 +168,7 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -203,13 +201,11 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#include <cudf/io/parquet.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <nvtx3/nvtx3.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -93,7 +93,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -114,13 +114,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -176,7 +174,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -207,13 +205,11 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
31 changes: 31 additions & 0 deletions cpp/cmake/thirdparty/get_thread_pool.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# =============================================================================
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds rmm and sets any additional necessary environment variables.
function(find_and_configure_thread_pool)
rapids_cpm_find(
BS_thread_pool 4.1.0
CPM_ARGS
GIT_REPOSITORY https://github.com/bshoshany/thread-pool.git
GIT_TAG 097aa718f25d44315cadb80b407144ad455ee4f9
GIT_SHALLOW TRUE
)
if(NOT TARGET BS_thread_pool)
add_library(BS_thread_pool INTERFACE)
target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include)
target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1")
endif()
endfunction()

find_and_configure_thread_pool()
Loading

0 comments on commit debbef0

Please sign in to comment.