From 6a7894fd62ae600064ff41620ea7c233ed09070b Mon Sep 17 00:00:00 2001 From: afender Date: Tue, 9 Feb 2021 17:57:55 -0600 Subject: [PATCH 1/7] get_handle_from_internal_pool --- cpp/include/raft/handle.hpp | 23 ++++++++++++++++++++++- cpp/test/handle.cpp | 11 +++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index af53968653..f38aec394c 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -67,9 +67,22 @@ class handle_t { host_allocator_(std::make_shared()) { create_resources(); } + handle_t(const handle_t& h) : dev_id_(h.get_device()), num_streams_(0) {} + handle_t(const handle_t&& h) : dev_id_(h.get_device()), num_streams_(0) {} + + handle_t& operator=(const handle_t& h) { + prop_ = h.get_device_properties(); + device_prop_initialized_ = true; + device_allocator_ = get_device_allocator(); + host_allocator_ = get_host_allocator(); + return *this; + } /** Destroys all held-up resources */ - virtual ~handle_t() { destroy_resources(); } + virtual ~handle_t() { + std::cout << "dtor" << std::endl; + destroy_resources(); + } int get_device() const { return dev_id_; } @@ -136,6 +149,14 @@ class handle_t { return int_streams_vec; } + handle_t get_handle_from_internal_pool( + int stream_id, int n_streams = kNumDefaultWorkerStreams) const { + handle_t handle(n_streams); + handle = *this; + handle.set_stream(this->get_internal_stream(stream_id)); + return handle; + } + void wait_on_user_stream() const { CUDA_CHECK(cudaEventRecord(event_, user_stream_)); for (auto s : streams_) { diff --git a/cpp/test/handle.cpp b/cpp/test/handle.cpp index 5f6f3ceece..4c8b327e76 100644 --- a/cpp/test/handle.cpp +++ b/cpp/test/handle.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include #include @@ -49,4 +50,14 @@ TEST(Raft, GetInternalStreams) { ASSERT_EQ(4U, streams.size()); } +TEST(Raft, GetHandleFromPool) { + handle_t parent(4); + int sid = 2; + auto child = parent.get_handle_from_internal_pool(sid); + std::cout << "done" << std::endl; + + ASSERT_EQ(parent.get_internal_stream(sid), child.get_stream()); + ASSERT_EQ(0, child.get_num_internal_streams()); + ASSERT_EQ(parent.get_device(), child.get_device()); +} } // namespace raft From d88bb146b71251a845b960db56958fca6c5855b7 Mon Sep 17 00:00:00 2001 From: afender Date: Wed, 10 Feb 2021 16:16:58 -0600 Subject: [PATCH 2/7] added rmm stream pool as backend --- cpp/include/raft/handle.hpp | 43 ++++++++++++++----------------------- cpp/test/handle.cpp | 2 -- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index f38aec394c..8b2aa58611 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #include "cudart_utils.h" namespace raft { @@ -62,13 +63,13 @@ class handle_t { CUDA_CHECK(cudaGetDevice(&cur_dev)); return cur_dev; }()), - num_streams_(n_streams), + streams_(n_streams), device_allocator_(std::make_shared()), host_allocator_(std::make_shared()) { create_resources(); } - handle_t(const handle_t& h) : dev_id_(h.get_device()), num_streams_(0) {} - handle_t(const handle_t&& h) : dev_id_(h.get_device()), num_streams_(0) {} + handle_t(const handle_t& h) : dev_id_(h.get_device()) {} + handle_t(const handle_t&& h) : dev_id_(h.get_device()) {} handle_t& operator=(const handle_t& h) { prop_ = h.get_device_properties(); @@ -79,10 +80,7 @@ class handle_t { } /** Destroys all held-up resources */ - virtual ~handle_t() { - std::cout << "dtor" << std::endl; - destroy_resources(); - } + virtual ~handle_t() { destroy_resources(); } int get_device() const { return dev_id_; } @@ -139,12 +137,14 @@ class handle_t { return cusparse_handle_; } - cudaStream_t get_internal_stream(int sid) const { return streams_[sid]; } - int get_num_internal_streams() const { return num_streams_; } + cudaStream_t get_internal_stream(int sid) const { + return streams_.get_stream(sid).value(); + } + int get_num_internal_streams() const { return streams_.get_pool_size(); } std::vector get_internal_streams() const { std::vector int_streams_vec; - for (auto s : streams_) { - int_streams_vec.push_back(s); + for (int i = 0; i < get_num_internal_streams(); i++) { + int_streams_vec.push_back(get_internal_stream(i)); } return int_streams_vec; } @@ -159,14 +159,14 @@ class handle_t { void wait_on_user_stream() const { CUDA_CHECK(cudaEventRecord(event_, user_stream_)); - for (auto s : streams_) { - CUDA_CHECK(cudaStreamWaitEvent(s, event_, 0)); + for (int i = 0; i < get_num_internal_streams(); i++) { + CUDA_CHECK(cudaStreamWaitEvent(get_internal_stream(i), event_, 0)); } } void wait_on_internal_streams() const { - for (auto s : streams_) { - CUDA_CHECK(cudaEventRecord(event_, s)); + for (int i = 0; i < get_num_internal_streams(); i++) { + CUDA_CHECK(cudaEventRecord(event_, get_internal_stream(i))); CUDA_CHECK(cudaStreamWaitEvent(user_stream_, event_, 0)); } } @@ -213,8 +213,7 @@ class handle_t { std::unordered_map> subcomms_; const int dev_id_; - const int num_streams_; - std::vector streams_; + rmm::cuda_stream_pool streams_{0}; mutable cublasHandle_t cublas_handle_; mutable bool cublas_initialized_{false}; mutable cusolverDnHandle_t cusolver_dn_handle_; @@ -232,11 +231,6 @@ class handle_t { mutable std::mutex mutex_; void create_resources() { - for (int i = 0; i < num_streams_; ++i) { - cudaStream_t stream; - CUDA_CHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking)); - streams_.push_back(stream); - } CUDA_CHECK(cudaEventCreateWithFlags(&event_, cudaEventDisableTiming)); } @@ -258,11 +252,6 @@ class handle_t { //CUBLAS_CHECK_NO_THROW(cublasDestroy(cublas_handle_)); CUBLAS_CHECK(cublasDestroy(cublas_handle_)); } - while (!streams_.empty()) { - //CUDA_CHECK_NO_THROW(cudaStreamDestroy(streams_.back())); - CUDA_CHECK(cudaStreamDestroy(streams_.back())); - streams_.pop_back(); - } //CUDA_CHECK_NO_THROW(cudaEventDestroy(event_)); CUDA_CHECK(cudaEventDestroy(event_)); } diff --git a/cpp/test/handle.cpp b/cpp/test/handle.cpp index 4c8b327e76..ee6d6d2a48 100644 --- a/cpp/test/handle.cpp +++ b/cpp/test/handle.cpp @@ -54,8 +54,6 @@ TEST(Raft, GetHandleFromPool) { handle_t parent(4); int sid = 2; auto child = parent.get_handle_from_internal_pool(sid); - std::cout << "done" << std::endl; - ASSERT_EQ(parent.get_internal_stream(sid), child.get_stream()); ASSERT_EQ(0, child.get_num_internal_streams()); ASSERT_EQ(parent.get_device(), child.get_device()); From cf92c412371166cacd1ef262a0556d3df717581f Mon Sep 17 00:00:00 2001 From: afender Date: Wed, 10 Feb 2021 16:27:31 -0600 Subject: [PATCH 3/7] added rmm stream pool as backend --- cpp/include/raft/handle.hpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index 8b2aa58611..a42fdd67b2 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -86,6 +86,9 @@ class handle_t { void set_stream(cudaStream_t stream) { user_stream_ = stream; } cudaStream_t get_stream() const { return user_stream_; } + rmm::cuda_stream_view get_stream_view() const { + return rmm::cuda_stream_view(user_stream_); + } void set_device_allocator(std::shared_ptr allocator) { device_allocator_ = allocator; @@ -137,9 +140,15 @@ class handle_t { return cusparse_handle_; } + // legacy compatibility for cuML cudaStream_t get_internal_stream(int sid) const { return streams_.get_stream(sid).value(); } + // new accessor return rmm::cuda_stream_view + rmm::cuda_stream_view get_internal_stream_view(int sid) const { + return streams_.get_stream(sid); + } + int get_num_internal_streams() const { return streams_.get_pool_size(); } std::vector get_internal_streams() const { std::vector int_streams_vec; From 4cebf2453eec8c48fadf5ff5cb12a4e21e914509 Mon Sep 17 00:00:00 2001 From: afender Date: Wed, 10 Feb 2021 16:54:23 -0600 Subject: [PATCH 4/7] exposed rmm::cuda_stream_view for streams access --- cpp/test/handle.cpp | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/cpp/test/handle.cpp b/cpp/test/handle.cpp index ee6d6d2a48..8fef4ead61 100644 --- a/cpp/test/handle.cpp +++ b/cpp/test/handle.cpp @@ -52,10 +52,25 @@ TEST(Raft, GetInternalStreams) { TEST(Raft, GetHandleFromPool) { handle_t parent(4); - int sid = 2; - auto child = parent.get_handle_from_internal_pool(sid); - ASSERT_EQ(parent.get_internal_stream(sid), child.get_stream()); + + auto child = parent.get_handle_from_internal_pool(2); + ASSERT_EQ(parent.get_internal_stream(2), child.get_stream()); ASSERT_EQ(0, child.get_num_internal_streams()); + + child.set_stream(parent.get_internal_stream(3)); + ASSERT_EQ(parent.get_internal_stream(3), child.get_stream()); + ASSERT_NE(parent.get_internal_stream(2), child.get_stream()); + ASSERT_EQ(parent.get_device(), child.get_device()); } + +TEST(Raft, GetHandleStreamViews) { + handle_t parent(4); + + auto child = parent.get_handle_from_internal_pool(2); + ASSERT_EQ(parent.get_internal_stream_view(2), child.get_stream_view()); + ASSERT_EQ(parent.get_internal_stream_view(2).value(), + child.get_stream_view().value()); + EXPECT_FALSE(child.get_stream_view().is_default()); +} } // namespace raft From 9798885207aded2cceb37e05053da5a8f59ab206 Mon Sep 17 00:00:00 2001 From: afender Date: Thu, 11 Feb 2021 15:55:38 -0600 Subject: [PATCH 5/7] perf check --- cpp/include/raft/handle.hpp | 2 ++ cpp/test/handle.cpp | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index a42fdd67b2..92fd6c2663 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -71,6 +71,8 @@ class handle_t { handle_t(const handle_t& h) : dev_id_(h.get_device()) {} handle_t(const handle_t&& h) : dev_id_(h.get_device()) {} + // light copy operator + // skip streams, comms, and libs handles handle_t& operator=(const handle_t& h) { prop_ = h.get_device_properties(); device_prop_initialized_ = true; diff --git a/cpp/test/handle.cpp b/cpp/test/handle.cpp index 8fef4ead61..ead7382b1b 100644 --- a/cpp/test/handle.cpp +++ b/cpp/test/handle.cpp @@ -64,6 +64,18 @@ TEST(Raft, GetHandleFromPool) { ASSERT_EQ(parent.get_device(), child.get_device()); } +TEST(Raft, GetHandleFromPoolPerf) { + handle_t parent(100); + auto start = curTimeMillis(); + for (int i = 0; i < parent.get_num_internal_streams(); i++) { + auto child = parent.get_handle_from_internal_pool(i); + ASSERT_EQ(parent.get_internal_stream(i), child.get_stream()); + child.wait_on_user_stream(); + } + // upperbound on 0.1ms per child handle + ASSERT_LE(curTimeMillis() - start, 10); +} + TEST(Raft, GetHandleStreamViews) { handle_t parent(4); From def166f7c26d73e91db6a1d554dea6a32381f603 Mon Sep 17 00:00:00 2001 From: afender Date: Fri, 12 Feb 2021 12:26:36 -0600 Subject: [PATCH 6/7] reviews --- cpp/include/raft/handle.hpp | 34 +++++++++++++++++----------------- cpp/test/handle.cpp | 6 +++--- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index 92fd6c2663..42a1e4ebb8 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -68,17 +68,25 @@ class handle_t { host_allocator_(std::make_shared()) { create_resources(); } - handle_t(const handle_t& h) : dev_id_(h.get_device()) {} - handle_t(const handle_t&& h) : dev_id_(h.get_device()) {} - // light copy operator - // skip streams, comms, and libs handles - handle_t& operator=(const handle_t& h) { - prop_ = h.get_device_properties(); + /** + * @brief Construct a light handle copy from another + * user stream, cuda handles, comms and worker pool are not copied + * The user_stream of the returned handle is set to the specified stream + * of the other handle worker pool + * @param[in] stream_id stream id in `other` worker streams + * to be set as user stream in the constructed handle + * @param[in] n_streams number worker streams to be created + */ + handle_t(const handle_t& other, int stream_id, + int n_streams = kNumDefaultWorkerStreams) + : dev_id_(other.get_device()), streams_(n_streams) { + prop_ = other.get_device_properties(); device_prop_initialized_ = true; - device_allocator_ = get_device_allocator(); - host_allocator_ = get_host_allocator(); - return *this; + device_allocator_ = other.get_device_allocator(); + host_allocator_ = other.get_host_allocator(); + create_resources(); + set_stream(other.get_internal_stream(stream_id)); } /** Destroys all held-up resources */ @@ -160,14 +168,6 @@ class handle_t { return int_streams_vec; } - handle_t get_handle_from_internal_pool( - int stream_id, int n_streams = kNumDefaultWorkerStreams) const { - handle_t handle(n_streams); - handle = *this; - handle.set_stream(this->get_internal_stream(stream_id)); - return handle; - } - void wait_on_user_stream() const { CUDA_CHECK(cudaEventRecord(event_, user_stream_)); for (int i = 0; i < get_num_internal_streams(); i++) { diff --git a/cpp/test/handle.cpp b/cpp/test/handle.cpp index ead7382b1b..4cb9809844 100644 --- a/cpp/test/handle.cpp +++ b/cpp/test/handle.cpp @@ -53,7 +53,7 @@ TEST(Raft, GetInternalStreams) { TEST(Raft, GetHandleFromPool) { handle_t parent(4); - auto child = parent.get_handle_from_internal_pool(2); + handle_t child(parent, 2); ASSERT_EQ(parent.get_internal_stream(2), child.get_stream()); ASSERT_EQ(0, child.get_num_internal_streams()); @@ -68,7 +68,7 @@ TEST(Raft, GetHandleFromPoolPerf) { handle_t parent(100); auto start = curTimeMillis(); for (int i = 0; i < parent.get_num_internal_streams(); i++) { - auto child = parent.get_handle_from_internal_pool(i); + handle_t child(parent, i); ASSERT_EQ(parent.get_internal_stream(i), child.get_stream()); child.wait_on_user_stream(); } @@ -79,7 +79,7 @@ TEST(Raft, GetHandleFromPoolPerf) { TEST(Raft, GetHandleStreamViews) { handle_t parent(4); - auto child = parent.get_handle_from_internal_pool(2); + handle_t child(parent, 2); ASSERT_EQ(parent.get_internal_stream_view(2), child.get_stream_view()); ASSERT_EQ(parent.get_internal_stream_view(2).value(), child.get_stream_view().value()); From 30e341f483003a1094e326c49780c9382846d867 Mon Sep 17 00:00:00 2001 From: afender Date: Wed, 17 Feb 2021 17:56:56 -0600 Subject: [PATCH 7/7] error check --- cpp/include/raft/handle.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/include/raft/handle.hpp b/cpp/include/raft/handle.hpp index 42a1e4ebb8..dbe7e83189 100644 --- a/cpp/include/raft/handle.hpp +++ b/cpp/include/raft/handle.hpp @@ -81,6 +81,9 @@ class handle_t { handle_t(const handle_t& other, int stream_id, int n_streams = kNumDefaultWorkerStreams) : dev_id_(other.get_device()), streams_(n_streams) { + RAFT_EXPECTS( + other.get_num_internal_streams() > 0, + "ERROR: the main handle must have at least one worker stream\n"); prop_ = other.get_device_properties(); device_prop_initialized_ = true; device_allocator_ = other.get_device_allocator();