Skip to content

Commit

Permalink
One cudaStream_t instance per raft::handle_t (#291)
Browse files Browse the repository at this point in the history
closes #293 
closes #115 

This PR also updates the cython build to `std=c++17`.

Authors:
  - Divye Gala (https://github.com/divyegala)
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - William Hicks (https://github.com/wphicks)
  - Seunghwa Kang (https://github.com/seunghwak)
  - Corey J. Nolet (https://github.com/cjnolet)
  - Tamas Bela Feher (https://github.com/tfeher)
  - AJ Schmidt (https://github.com/ajschmidt8)

URL: #291
  • Loading branch information
divyegala authored Dec 13, 2021
1 parent a68a4aa commit 906e5e6
Show file tree
Hide file tree
Showing 57 changed files with 515 additions and 409 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/modules/ConfigureCUDA.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ if(CMAKE_COMPILER_IS_GNUCXX)
list(APPEND RAFT_CXX_FLAGS -Wall -Werror -Wno-unknown-pragmas -Wno-error=deprecated-declarations)
endif()

list(APPEND RAFT_CUDA_FLAGS --expt-extended-lambda --expt-relaxed-constexpr)
list(APPEND RAFT_CUDA_FLAGS --expt-extended-lambda --expt-relaxed-constexpr --default-stream per-thread)

# set warnings as errors
if(CMAKE_CUDA_COMPILER_VERSION VERSION_GREATER_EQUAL 11.2.0)
Expand Down
8 changes: 4 additions & 4 deletions cpp/include/raft/comms/test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,10 @@ bool test_commsplit(const handle_t& h, int n_colors)
if (n_colors > size) n_colors = size;

// first we need to assign to a color, then assign the rank within the color
int color = rank % n_colors;
int key = rank / n_colors;

handle_t new_handle(1);
int color = rank % n_colors;
int key = rank / n_colors;
auto stream_pool = std::make_shared<rmm::cuda_stream_pool>(1);
handle_t new_handle(rmm::cuda_stream_default, stream_pool);
auto shared_comm = std::make_shared<comms_t>(communicator.comm_split(color, key));
new_handle.set_comms(shared_comm);

Expand Down
174 changes: 107 additions & 67 deletions cpp/include/raft/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,64 +47,44 @@ namespace raft {
* necessary cuda kernels and/or libraries
*/
class handle_t {
private:
static constexpr int kNumDefaultWorkerStreams = 0;

public:
// delete copy/move constructors and assignment operators as
// copying and moving underlying resources is unsafe
handle_t(const handle_t&) = delete;
handle_t& operator=(const handle_t&) = delete;
handle_t(handle_t&&) = delete;
handle_t& operator=(handle_t&&) = delete;

/**
* @brief Construct a handle with the specified number of worker streams
* @brief Construct a handle with a stream view and stream pool
*
* @param[in] n_streams number worker streams to be created
* @param[in] stream the default stream (which has the default per-thread stream if unspecified)
* @param[in] stream_pool the stream pool used (which has default of nullptr if unspecified)
*/
explicit handle_t(int n_streams = kNumDefaultWorkerStreams)
handle_t(rmm::cuda_stream_view stream_view = rmm::cuda_stream_per_thread,
std::shared_ptr<rmm::cuda_stream_pool> stream_pool = {nullptr})
: dev_id_([]() -> int {
int cur_dev = -1;
RAFT_CUDA_TRY(cudaGetDevice(&cur_dev));
return cur_dev;
}())
}()),
stream_view_{stream_view},
stream_pool_{stream_pool}
{
if (n_streams != 0) { streams_ = std::make_unique<rmm::cuda_stream_pool>(n_streams); }
create_resources();
thrust_policy_ = std::make_unique<rmm::exec_policy>(user_stream_);
}

/**
* @brief Construct a light handle copy from another
* user stream, cuda handles, comms and worker pool are not copied
* The user_stream of the returned handle is set to the specified stream
* of the other handle worker pool
* @param[in] other other handle for which to use streams
* @param[in] stream_id stream id in `other` worker streams
* to be set as user stream in the constructed handle
* @param[in] n_streams number worker streams to be created
*/
handle_t(const handle_t& other, int stream_id, int n_streams = kNumDefaultWorkerStreams)
: dev_id_(other.get_device())
{
RAFT_EXPECTS(other.get_num_internal_streams() > 0,
"ERROR: the main handle must have at least one worker stream\n");
if (n_streams != 0) { streams_ = std::make_unique<rmm::cuda_stream_pool>(n_streams); }
prop_ = other.get_device_properties();
device_prop_initialized_ = true;
create_resources();
set_stream(other.get_internal_stream(stream_id));
thrust_policy_ = std::make_unique<rmm::exec_policy>(user_stream_);
}

/** Destroys all held-up resources */
virtual ~handle_t() { destroy_resources(); }

int get_device() const { return dev_id_; }

void set_stream(cudaStream_t stream) { user_stream_ = stream; }
cudaStream_t get_stream() const { return user_stream_; }
rmm::cuda_stream_view get_stream_view() const { return rmm::cuda_stream_view(user_stream_); }

cublasHandle_t get_cublas_handle() const
{
std::lock_guard<std::mutex> _(mutex_);
if (!cublas_initialized_) {
RAFT_CUBLAS_TRY_NO_THROW(cublasCreate(&cublas_handle_));
RAFT_CUBLAS_TRY_NO_THROW(cublasSetStream(cublas_handle_, stream_view_));
cublas_initialized_ = true;
}
return cublas_handle_;
Expand All @@ -115,6 +95,7 @@ class handle_t {
std::lock_guard<std::mutex> _(mutex_);
if (!cusolver_dn_initialized_) {
RAFT_CUSOLVER_TRY_NO_THROW(cusolverDnCreate(&cusolver_dn_handle_));
RAFT_CUSOLVER_TRY_NO_THROW(cusolverDnSetStream(cusolver_dn_handle_, stream_view_));
cusolver_dn_initialized_ = true;
}
return cusolver_dn_handle_;
Expand All @@ -125,6 +106,7 @@ class handle_t {
std::lock_guard<std::mutex> _(mutex_);
if (!cusolver_sp_initialized_) {
RAFT_CUSOLVER_TRY_NO_THROW(cusolverSpCreate(&cusolver_sp_handle_));
RAFT_CUSOLVER_TRY_NO_THROW(cusolverSpSetStream(cusolver_sp_handle_, stream_view_));
cusolver_sp_initialized_ = true;
}
return cusolver_sp_handle_;
Expand All @@ -135,55 +117,111 @@ class handle_t {
std::lock_guard<std::mutex> _(mutex_);
if (!cusparse_initialized_) {
RAFT_CUSPARSE_TRY_NO_THROW(cusparseCreate(&cusparse_handle_));
RAFT_CUSPARSE_TRY_NO_THROW(cusparseSetStream(cusparse_handle_, stream_view_));
cusparse_initialized_ = true;
}
return cusparse_handle_;
}

rmm::exec_policy& get_thrust_policy() const { return *thrust_policy_; }

// legacy compatibility for cuML
cudaStream_t get_internal_stream(int sid) const
/**
* @brief synchronize main stream on the handle
*/
void sync_stream() const { stream_view_.synchronize(); }

/**
* @brief returns main stream on the handle
*/
rmm::cuda_stream_view get_stream() const { return stream_view_; }

/**
* @brief returns whether stream pool was initialized on the handle
*/

bool is_stream_pool_initialized() const { return stream_pool_.get() != nullptr; }

/**
* @brief returns stream pool on the handle
*/
const rmm::cuda_stream_pool& get_stream_pool() const
{
RAFT_EXPECTS(streams_.get() != nullptr,
"ERROR: rmm::cuda_stream_pool was not initialized with a non-zero value");
return streams_->get_stream(sid).value();
RAFT_EXPECTS(stream_pool_, "ERROR: rmm::cuda_stream_pool was not initialized");
return *stream_pool_;
}
// new accessor return rmm::cuda_stream_view
rmm::cuda_stream_view get_internal_stream_view(int sid) const

std::size_t get_stream_pool_size() const
{
RAFT_EXPECTS(streams_.get() != nullptr,
"ERROR: rmm::cuda_stream_pool was not initialized with a non-zero value");
return streams_->get_stream(sid);
return is_stream_pool_initialized() ? stream_pool_->get_pool_size() : 0;
}

int get_num_internal_streams() const
/**
* @brief return stream from pool
*/
rmm::cuda_stream_view get_stream_from_stream_pool() const
{
return streams_.get() != nullptr ? streams_->get_pool_size() : 0;
RAFT_EXPECTS(stream_pool_, "ERROR: rmm::cuda_stream_pool was not initialized");
return stream_pool_->get_stream();
}

std::vector<cudaStream_t> get_internal_streams() const
/**
* @brief return stream from pool at index
*/
rmm::cuda_stream_view get_stream_from_stream_pool(std::size_t stream_idx) const
{
RAFT_EXPECTS(stream_pool_, "ERROR: rmm::cuda_stream_pool was not initialized");
return stream_pool_->get_stream(stream_idx);
}

/**
* @brief return stream from pool if size > 0, else main stream on handle
*/
rmm::cuda_stream_view get_next_usable_stream() const
{
return is_stream_pool_initialized() ? get_stream_from_stream_pool() : stream_view_;
}

/**
* @brief return stream from pool at index if size > 0, else main stream on handle
*
* @param[in] stream_index the required index of the stream in the stream pool if available
*/
rmm::cuda_stream_view get_next_usable_stream(std::size_t stream_idx) const
{
return is_stream_pool_initialized() ? get_stream_from_stream_pool(stream_idx) : stream_view_;
}

/**
* @brief synchronize the stream pool on the handle
*/
void sync_stream_pool() const
{
std::vector<cudaStream_t> int_streams_vec;
for (int i = 0; i < get_num_internal_streams(); i++) {
int_streams_vec.push_back(get_internal_stream(i));
for (std::size_t i = 0; i < get_stream_pool_size(); i++) {
stream_pool_->get_stream(i).synchronize();
}
return int_streams_vec;
}

void wait_on_user_stream() const
/**
* @brief synchronize subset of stream pool
*
* @param[in] stream_indices the indices of the streams in the stream pool to synchronize
*/
void sync_stream_pool(const std::vector<std::size_t> stream_indices) const
{
RAFT_CUDA_TRY(cudaEventRecord(event_, user_stream_));
for (int i = 0; i < get_num_internal_streams(); i++) {
RAFT_CUDA_TRY(cudaStreamWaitEvent(get_internal_stream(i), event_, 0));
RAFT_EXPECTS(stream_pool_, "ERROR: rmm::cuda_stream_pool was not initialized");
for (const auto& stream_index : stream_indices) {
stream_pool_->get_stream(stream_index).synchronize();
}
}

void wait_on_internal_streams() const
/**
* @brief ask stream pool to wait on last event in main stream
*/
void wait_stream_pool_on_stream() const
{
for (int i = 0; i < get_num_internal_streams(); i++) {
RAFT_CUDA_TRY(cudaEventRecord(event_, get_internal_stream(i)));
RAFT_CUDA_TRY(cudaStreamWaitEvent(user_stream_, event_, 0));
RAFT_CUDA_TRY(cudaEventRecord(event_, stream_view_));
for (std::size_t i = 0; i < get_stream_pool_size(); i++) {
RAFT_CUDA_TRY(cudaStreamWaitEvent(stream_pool_->get_stream(i), event_, 0));
}
}

Expand Down Expand Up @@ -229,7 +267,6 @@ class handle_t {
std::unordered_map<std::string, std::shared_ptr<comms::comms_t>> subcomms_;

const int dev_id_;
std::unique_ptr<rmm::cuda_stream_pool> streams_{nullptr};
mutable cublasHandle_t cublas_handle_;
mutable bool cublas_initialized_{false};
mutable cusolverDnHandle_t cusolver_dn_handle_;
Expand All @@ -239,20 +276,22 @@ class handle_t {
mutable cusparseHandle_t cusparse_handle_;
mutable bool cusparse_initialized_{false};
std::unique_ptr<rmm::exec_policy> thrust_policy_{nullptr};
cudaStream_t user_stream_{nullptr};
rmm::cuda_stream_view stream_view_{rmm::cuda_stream_per_thread};
std::shared_ptr<rmm::cuda_stream_pool> stream_pool_{nullptr};
cudaEvent_t event_;
mutable cudaDeviceProp prop_;
mutable bool device_prop_initialized_{false};
mutable std::mutex mutex_;

void create_resources()
{
thrust_policy_ = std::make_unique<rmm::exec_policy>(stream_view_);

RAFT_CUDA_TRY(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming));
}

void destroy_resources()
{
///@todo: enable *_NO_THROW variants once we have enabled logging
if (cusparse_initialized_) { RAFT_CUSPARSE_TRY_NO_THROW(cusparseDestroy(cusparse_handle_)); }
if (cusolver_dn_initialized_) {
RAFT_CUSOLVER_TRY_NO_THROW(cusolverDnDestroy(cusolver_dn_handle_));
Expand All @@ -270,11 +309,12 @@ class handle_t {
*/
class stream_syncer {
public:
explicit stream_syncer(const handle_t& handle) : handle_(handle)
explicit stream_syncer(const handle_t& handle) : handle_(handle) { handle_.sync_stream(); }
~stream_syncer()
{
handle_.wait_on_user_stream();
handle_.wait_stream_pool_on_stream();
handle_.sync_stream_pool();
}
~stream_syncer() { handle_.wait_on_internal_streams(); }

stream_syncer(const stream_syncer& other) = delete;
stream_syncer& operator=(const stream_syncer& other) = delete;
Expand Down
17 changes: 12 additions & 5 deletions cpp/include/raft/label/classlabels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,23 @@ int getUniquelabels(rmm::device_uvector<value_t>& unique, value_t* y, size_t n,

// Query how much temporary storage we will need for cub operations
// and allocate it
cub::DeviceRadixSort::SortKeys(NULL, bytes, y, workspace.data(), n);
cub::DeviceRadixSort::SortKeys(
NULL, bytes, y, workspace.data(), n, 0, sizeof(value_t) * 8, stream);
cub::DeviceSelect::Unique(
NULL, bytes2, workspace.data(), workspace.data(), d_num_selected.data(), n);
NULL, bytes2, workspace.data(), workspace.data(), d_num_selected.data(), n, stream);
bytes = max(bytes, bytes2);
rmm::device_uvector<char> cub_storage(bytes, stream);

// Select Unique classes
cub::DeviceRadixSort::SortKeys(cub_storage.data(), bytes, y, workspace.data(), n);
cub::DeviceSelect::Unique(
cub_storage.data(), bytes, workspace.data(), workspace.data(), d_num_selected.data(), n);
cub::DeviceRadixSort::SortKeys(
cub_storage.data(), bytes, y, workspace.data(), n, 0, sizeof(value_t) * 8, stream);
cub::DeviceSelect::Unique(cub_storage.data(),
bytes,
workspace.data(),
workspace.data(),
d_num_selected.data(),
n,
stream);

int n_unique = d_num_selected.value(stream);
// Copy unique classes to output
Expand Down
6 changes: 2 additions & 4 deletions cpp/include/raft/spatial/knn/detail/ball_cover.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,15 @@ void k_closest_landmarks(const raft::handle_t& handle,
std::vector<value_t*> input = {index.get_R()};
std::vector<std::uint32_t> sizes = {index.n_landmarks};

brute_force_knn_impl<std::uint32_t, std::int64_t>(input,
brute_force_knn_impl<std::uint32_t, std::int64_t>(handle,
input,
sizes,
index.n,
const_cast<value_t*>(query_pts),
n_query_pts,
R_knn_inds,
R_knn_dists,
k,
handle.get_stream(),
nullptr,
0,
true,
true,
nullptr,
Expand Down
Loading

0 comments on commit 906e5e6

Please sign in to comment.