Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update vendored thread_pool implementation #16210

Merged
merged 9 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ include(cmake/thirdparty/get_fmt.cmake)
include(cmake/thirdparty/get_spdlog.cmake)
# find nanoarrow
include(cmake/thirdparty/get_nanoarrow.cmake)
# find thread_pool
include(cmake/thirdparty/get_thread_pool.cmake)

# Workaround until https://github.com/rapidsai/rapids-cmake/issues/176 is resolved
if(NOT BUILD_SHARED_LIBS)
Expand Down Expand Up @@ -796,6 +798,9 @@ target_compile_definitions(cudf PRIVATE "RMM_LOGGING_LEVEL=LIBCUDF_LOGGING_LEVEL
# Define spdlog level
target_compile_definitions(cudf PUBLIC "SPDLOG_ACTIVE_LEVEL=SPDLOG_LEVEL_${LIBCUDF_LOGGING_LEVEL}")

# Enable pause functions in thread_pool implementation
target_compile_definitions(cudf PRIVATE "BS_THREAD_POOL_ENABLE_PAUSE=1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this off cudf and onto the BS_thread_pool target in get_thread_pool.cmake as an INTERFACE compile definition.

target_compile_definitions(BS_thread_pool INTERFACE "BS_THREAD_POOL_ENABLE_PAUSE=1")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.


# Compile stringified JIT sources first
add_dependencies(cudf jitify_preprocess_run)

Expand All @@ -804,7 +809,7 @@ target_link_libraries(
cudf
PUBLIC ${ARROW_LIBRARIES} CCCL::CCCL rmm::rmm
PRIVATE $<BUILD_LOCAL_INTERFACE:nvtx3::nvtx3-cpp> cuco::cuco ZLIB::ZLIB nvcomp::nvcomp
kvikio::kvikio $<TARGET_NAME_IF_EXISTS:cuFile_interface> nanoarrow
kvikio::kvikio $<TARGET_NAME_IF_EXISTS:cuFile_interface> nanoarrow BS_thread_pool
)

# Add Conda library, and include paths if specified
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include <cudf/io/orc.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -90,7 +90,7 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -112,13 +112,11 @@ void BM_orc_multithreaded_read_common(nvbench::state& state,
cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pause and unpause methods are only enabled when this macro BS_THREAD_POOL_ENABLE_PAUSE is explicitly defined. Reference link here .

threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -170,7 +168,7 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -203,13 +201,11 @@ void BM_orc_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
26 changes: 11 additions & 15 deletions cpp/benchmarks/io/parquet/parquet_reader_multithread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#include <cudf/io/parquet.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/pinned_memory.hpp>
#include <cudf/utilities/thread_pool.hpp>

#include <nvtx3/nvtx3.hpp>

#include <BS_thread_pool.hpp>
#include <nvbench/nvbench.cuh>

#include <vector>
Expand Down Expand Up @@ -93,7 +93,7 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
auto const num_threads = state.get_int64("num_threads");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);

auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
Expand All @@ -114,13 +114,11 @@ void BM_parquet_multithreaded_read_common(nvbench::state& state,
cudf::io::read_parquet(read_opts, stream, rmm::mr::get_current_device_resource());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down Expand Up @@ -176,7 +174,7 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
size_t const output_limit = state.get_int64("output_limit");

auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads);
cudf::detail::thread_pool threads(num_threads);
BS::thread_pool threads(num_threads);
auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types);
std::vector<cudf::io::source_info> source_info_vector;
std::transform(source_sink_vector.begin(),
Expand Down Expand Up @@ -207,13 +205,11 @@ void BM_parquet_multithreaded_read_chunked_common(nvbench::state& state,
} while (reader.has_next());
};

threads.paused = true;
for (size_t i = 0; i < num_files; ++i) {
threads.submit(read_func, i);
}
threads.pause();
threads.detach_sequence(decltype(num_files){0}, num_files, read_func);
timer.start();
threads.paused = false;
threads.wait_for_tasks();
threads.unpause();
threads.wait();
cudf::detail::join_streams(streams, cudf::get_default_stream());
timer.stop();
});
Expand Down
28 changes: 28 additions & 0 deletions cpp/cmake/thirdparty/get_thread_pool.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# =============================================================================
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations under
# the License.
# =============================================================================

# This function finds rmm and sets any additional necessary environment variables.
function(find_and_configure_thread_pool)
rapids_cpm_find(
BS_thread_pool 4.1.0
CPM_ARGS
GIT_REPOSITORY https://github.com/bshoshany/thread-pool.git
GIT_TAG 097aa718f25d44315cadb80b407144ad455ee4f9
GIT_SHALLOW TRUE
)
add_library(BS_thread_pool INTERFACE)
target_include_directories(BS_thread_pool INTERFACE ${BS_thread_pool_SOURCE_DIR}/include)
endfunction()

find_and_configure_thread_pool()
Loading
Loading