Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update vendored thread_pool implementation #16210

Merged
merged 9 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ include(cmake/thirdparty/get_fmt.cmake)
include(cmake/thirdparty/get_spdlog.cmake)
# find nanoarrow
include(cmake/thirdparty/get_nanoarrow.cmake)
# find thread_pool
include(cmake/thirdparty/get_thread_pool.cmake)

# Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved
if(NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -804,7 +806,7 @@ add_dependencies(cudf jitify_preprocess_run)
# Specify the target module library dependencies
target_link_libraries(
cudf
PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm
PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm $<BUILD_LOCAL_INTERFACE:BS_thread_pool>
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp> cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
kvikio::kvikio $<TARGET_NAME_IF_EXISTS:cuFile_interface> nanoarrow
)
Expand Down
10 changes: 4 additions & 6 deletions cpp/benchmarks/groupby/group_max_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/groupby.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

template <typename Type>
Expand Down Expand Up @@ -58,7 +58,7 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list<T
auto gb_obj = cudf::groupby::groupby(cudf::table_view({keys_view, keys_view, keys_view}));

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

std::vector<std::vector<cudf::groupby::aggregation_request>> requests(num_threads);
for (auto& thread_requests : requests) {
Expand All @@ -75,10 +75,8 @@ void bench_groupby_max_multithreaded(nvbench::state& state, nvbench::type_list<T
nvbench::exec_tag::sync | nvbench::exec_tag::timer, [&](nvbench::launch& launch, auto& timer) {
auto perform_agg = [&](int64_t index) { gb_obj.aggregate(requests[index], streams[index]); };
timer.start();
for (int64_t i = 0; i < num_threads; ++i) {
threads.submit(perform_agg, i);
}
threads.wait_for_tasks();
threads.detach_sequence(decltype(num_threads){0}, num_threads, perform_agg);
threads.wait()
wence- marked this conversation as resolved.
Show resolved Hide resolved
cudf::detail::join_streams(streams, cudf::get_default_stream());
cudf::get_default_stream().synchronize();
timer.stop();
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include <cudf/io/orc.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -90,7 +90,7 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -112,13 +112,11 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause and unpause methods are only enabled when this macro BS_THREAD_POOL_ENABLE_PAUSE is explicitly defined. Reference link here .

threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -170,7 +168,7 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -203,13 +201,11 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#include <cudf/io/parquet.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <nvtx3/nvtx3.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -93,7 +93,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -114,13 +114,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -176,7 +174,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -207,13 +205,11 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
31 changes: 31 additions & 0 deletions cpp/cmake/thirdparty/get_thread_pool.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# =============================================================================
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds rmm and sets any additional necessary environment variables.
function(find_and_configure_thread_pool)
rapids_cpm_find(
BS_thread_pool 4.1.0
CPM_ARGS
GIT_REPOSITORY https://github.com/bshoshany/thread-pool.git
GIT_TAG 097aa718f25d44315cadb80b407144ad455ee4f9
GIT_SHALLOW TRUE
)
if(NOT TARGET BS_thread_pool)
add_library(BS_thread_pool INTERFACE)
target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include)
target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1")
endif()
endfunction()

find_and_configure_thread_pool()
Loading
Loading