From c8066a56d19f91b4f5471272ea5de2d377f062d1 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Mon, 25 Mar 2024 11:45:38 -0700 Subject: [PATCH 1/3] restructure to use stream and synchronize appropriately --- .../mtmg/detail/per_device_edgelist.hpp | 76 +++++++++---------- cpp/include/cugraph/mtmg/edgelist.hpp | 14 +++- cpp/include/cugraph/mtmg/handle.hpp | 7 +- .../cugraph/mtmg/per_thread_edgelist.hpp | 10 ++- 4 files changed, 62 insertions(+), 45 deletions(-) diff --git a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp index 7fd5bb726e6..e7c7fcb0061 100644 --- a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,13 +62,13 @@ class per_device_edgelist_t { /** * @brief Construct a new per device edgelist t object * - * @param handle MTMG resource handle - used to identify GPU resources + * @param stream_view CUDA stream view * @param device_buffer_size Number of edges to store in each device buffer * @param use_weight Whether or not the edgelist will have weights * @param use_edge_id Whether or not the edgelist will have edge ids * @param use_edge_type Whether or not the edgelist will have edge types */ - per_device_edgelist_t(cugraph::mtmg::handle_t const& handle, + per_device_edgelist_t(rmm::cuda_stream_view stream_view, size_t device_buffer_size, bool use_weight, bool use_edge_id, @@ -89,7 +89,7 @@ class per_device_edgelist_t { edge_type_ = std::make_optional(std::vector>()); } - create_new_buffers(handle); + create_new_buffers(stream_view); } /** @@ -111,14 +111,14 @@ class per_device_edgelist_t { /** * @brief Append a list of edges to the edge list * - * @param handle The resource handle - * @param src Source vertex id - * @param dst Destination vertex id - * @param wgt Edge weight - * @param edge_id Edge id - * @param edge_type Edge type + * @param stream_view CUDA stream view + * @param src Source vertex id + * @param dst Destination vertex id + * @param wgt Edge weight + * @param edge_id Edge id + * @param edge_type Edge type */ - void append(handle_t const& handle, + void append(rmm::cuda_stream_view stream_view, raft::host_span src, raft::host_span dst, std::optional> wgt, @@ -142,13 +142,13 @@ class per_device_edgelist_t { pos += copy_count; current_pos_ += copy_count; - if (current_pos_ == src_.back().size()) { create_new_buffers(handle); } + if (current_pos_ == src_.back().size()) { create_new_buffers(stream_view); } } } std::for_each(copy_positions.begin(), copy_positions.end(), - [&handle, + [&stream_view, &this_src = src_, &src, &this_dst = dst_, @@ -164,47 +164,45 @@ class per_device_edgelist_t { raft::update_device(this_src[buffer_idx].begin() + buffer_pos, src.begin() + input_pos, copy_count, - handle.get_stream()); + stream_view); raft::update_device(this_dst[buffer_idx].begin() + buffer_pos, dst.begin() + input_pos, copy_count, - handle.get_stream()); + stream_view); if (this_wgt) raft::update_device((*this_wgt)[buffer_idx].begin() + buffer_pos, wgt->begin() + input_pos, copy_count, - handle.get_stream()); + stream_view); if (this_edge_id) raft::update_device((*this_edge_id)[buffer_idx].begin() + buffer_pos, edge_id->begin() + input_pos, copy_count, - handle.get_stream()); + stream_view); if (this_edge_type) raft::update_device((*this_edge_type)[buffer_idx].begin() + buffer_pos, edge_type->begin() + input_pos, copy_count, - handle.get_stream()); + stream_view); }); - - handle.sync_stream(); } /** * @brief Mark the edgelist as ready for reading (all writes are complete) * - * @param handle The resource handle + * @param stream_view CUDA stream view */ - void finalize_buffer(handle_t const& handle) + void finalize_buffer(rmm::cuda_stream_view stream_view) { - src_.back().resize(current_pos_, handle.get_stream()); - dst_.back().resize(current_pos_, handle.get_stream()); - if (wgt_) wgt_->back().resize(current_pos_, handle.get_stream()); - if (edge_id_) edge_id_->back().resize(current_pos_, handle.get_stream()); - if (edge_type_) edge_type_->back().resize(current_pos_, handle.get_stream()); + src_.back().resize(current_pos_, stream_view); + dst_.back().resize(current_pos_, stream_view); + if (wgt_) wgt_->back().resize(current_pos_, stream_view); + if (edge_id_) edge_id_->back().resize(current_pos_, stream_view); + if (edge_type_) edge_type_->back().resize(current_pos_, stream_view); } bool use_weight() const { return wgt_.has_value(); } @@ -230,16 +228,18 @@ class per_device_edgelist_t { void consolidate_and_shuffle(cugraph::mtmg::handle_t const& handle, bool store_transposed) { if (src_.size() > 1) { + auto stream = handle.raft_handle().get_stream(); + size_t total_size = std::transform_reduce( src_.begin(), src_.end(), size_t{0}, std::plus(), [](auto& d_vector) { return d_vector.size(); }); - resize_and_copy_buffers(handle.get_stream(), src_, total_size); - resize_and_copy_buffers(handle.get_stream(), dst_, total_size); - if (wgt_) resize_and_copy_buffers(handle.get_stream(), *wgt_, total_size); - if (edge_id_) resize_and_copy_buffers(handle.get_stream(), *edge_id_, total_size); - if (edge_type_) resize_and_copy_buffers(handle.get_stream(), *edge_type_, total_size); + resize_and_copy_buffers(stream, src_, total_size); + resize_and_copy_buffers(stream, dst_, total_size); + if (wgt_) resize_and_copy_buffers(stream, *wgt_, total_size); + if (edge_id_) resize_and_copy_buffers(stream, *edge_id_, total_size); + if (edge_type_) resize_and_copy_buffers(stream, *edge_type_, total_size); } auto tmp_wgt = wgt_ ? std::make_optional(std::move((*wgt_)[0])) : std::nullopt; @@ -286,16 +286,16 @@ class per_device_edgelist_t { buffer = std::move(new_buffer); } - void create_new_buffers(cugraph::mtmg::handle_t const& handle) + void create_new_buffers(rmm::cuda_stream_view stream_view) { - src_.emplace_back(device_buffer_size_, handle.get_stream()); - dst_.emplace_back(device_buffer_size_, handle.get_stream()); + src_.emplace_back(device_buffer_size_, stream_view); + dst_.emplace_back(device_buffer_size_, stream_view); - if (wgt_) { wgt_->emplace_back(device_buffer_size_, handle.get_stream()); } + if (wgt_) { wgt_->emplace_back(device_buffer_size_, stream_view); } - if (edge_id_) { edge_id_->emplace_back(device_buffer_size_, handle.get_stream()); } + if (edge_id_) { edge_id_->emplace_back(device_buffer_size_, stream_view); } - if (edge_type_) { edge_type_->emplace_back(device_buffer_size_, handle.get_stream()); } + if (edge_type_) { edge_type_->emplace_back(device_buffer_size_, stream_view); } current_pos_ = 0; } diff --git a/cpp/include/cugraph/mtmg/edgelist.hpp b/cpp/include/cugraph/mtmg/edgelist.hpp index 90c53dfbb64..f1866fae6cf 100644 --- a/cpp/include/cugraph/mtmg/edgelist.hpp +++ b/cpp/include/cugraph/mtmg/edgelist.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,7 +39,11 @@ class edgelist_t : public detail::device_shared_wrapper_t< bool use_edge_type) { detail::per_device_edgelist_t tmp( - handle, device_buffer_size, use_weight, use_edge_id, use_edge_type); + handle.raft_handle().get_stream(), + device_buffer_size, + use_weight, + use_edge_id, + use_edge_type); detail::device_shared_wrapper_t< detail::per_device_edgelist_t>::set(handle, @@ -49,7 +53,11 @@ class edgelist_t : public detail::device_shared_wrapper_t< /** * @brief Stop inserting edges into this edgelist so we can use the edges */ - void finalize_buffer(handle_t const& handle) { this->get(handle).finalize_buffer(handle); } + void finalize_buffer(handle_t const& handle) + { + handle.sync_stream_pool(); + this->get(handle).finalize_buffer(handle.get_stream()); + } /** * @brief Consolidate for the edgelist edges into a single edgelist and then diff --git a/cpp/include/cugraph/mtmg/handle.hpp b/cpp/include/cugraph/mtmg/handle.hpp index 0b02091a3cc..26c283f6acf 100644 --- a/cpp/include/cugraph/mtmg/handle.hpp +++ b/cpp/include/cugraph/mtmg/handle.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,11 @@ class handle_t { */ void sync_stream() const { sync_stream(get_stream()); } + /** + * @brief Sync all streams in the stream pool + */ + void sync_stream_pool() const { raft::resource::sync_stream_pool(raft_handle_); } + /** * @brief get thrust policy for the stream * diff --git a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp index b672db48719..4ae30e793ab 100644 --- a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -142,11 +142,13 @@ class per_thread_edgelist_t { * @brief Flush thread data from host to GPU memory * * @param handle The resource handle + * @param sync If true, synchronize the asynchronous copy of data; + * defaults to false. */ - void flush(handle_t const& handle) + void flush(handle_t const& handle, bool sync = false) { edgelist_.append( - handle, + handle.get_stream(), raft::host_span{src_.data(), current_pos_}, raft::host_span{dst_.data(), current_pos_}, wgt_ ? std::make_optional(raft::host_span{wgt_->data(), current_pos_}) @@ -158,6 +160,8 @@ class per_thread_edgelist_t { : std::nullopt); current_pos_ = 0; + + if (sync) handle.sync_stream(); } private: From aba37670ce5003b2f8bfa2a204ba7517b948f997 Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Tue, 21 May 2024 13:48:07 -0700 Subject: [PATCH 2/3] Review use of streams, pass stream into functions where we might use stream pool vice default stream --- cpp/include/cugraph/mtmg/edge_property.hpp | 3 +- .../cugraph/mtmg/edge_property_view.hpp | 3 +- cpp/include/cugraph/mtmg/edgelist.hpp | 6 +--- .../cugraph/mtmg/per_thread_edgelist.hpp | 31 +++++++++---------- cpp/tests/mtmg/multi_node_threaded_test.cu | 4 +-- cpp/tests/mtmg/threaded_test.cu | 4 +-- cpp/tests/mtmg/threaded_test_jaccard.cu | 4 +-- cpp/tests/mtmg/threaded_test_louvain.cu | 4 +-- 8 files changed, 26 insertions(+), 33 deletions(-) diff --git a/cpp/include/cugraph/mtmg/edge_property.hpp b/cpp/include/cugraph/mtmg/edge_property.hpp index afa72492b9a..0b27ca85e46 100644 --- a/cpp/include/cugraph/mtmg/edge_property.hpp +++ b/cpp/include/cugraph/mtmg/edge_property.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ #include #include -#include namespace cugraph { namespace mtmg { diff --git a/cpp/include/cugraph/mtmg/edge_property_view.hpp b/cpp/include/cugraph/mtmg/edge_property_view.hpp index c84a6458e1d..6416ea382ef 100644 --- a/cpp/include/cugraph/mtmg/edge_property_view.hpp +++ b/cpp/include/cugraph/mtmg/edge_property_view.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ #pragma once #include -#include namespace cugraph { namespace mtmg { diff --git a/cpp/include/cugraph/mtmg/edgelist.hpp b/cpp/include/cugraph/mtmg/edgelist.hpp index f1866fae6cf..f60b1024eee 100644 --- a/cpp/include/cugraph/mtmg/edgelist.hpp +++ b/cpp/include/cugraph/mtmg/edgelist.hpp @@ -39,11 +39,7 @@ class edgelist_t : public detail::device_shared_wrapper_t< bool use_edge_type) { detail::per_device_edgelist_t tmp( - handle.raft_handle().get_stream(), - device_buffer_size, - use_weight, - use_edge_id, - use_edge_type); + handle.get_stream(), device_buffer_size, use_weight, use_edge_id, use_edge_type); detail::device_shared_wrapper_t< detail::per_device_edgelist_t>::set(handle, diff --git a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp index 4ae30e793ab..8f8b74fe9ec 100644 --- a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp @@ -18,7 +18,6 @@ #include #include -#include namespace cugraph { namespace mtmg { @@ -70,21 +69,21 @@ class per_thread_edgelist_t { /** * @brief Append an edge to the edge list * - * @param handle The resource handle - * @param src Source vertex id - * @param dst Destination vertex id - * @param wgt Edge weight - * @param edge_id Edge id - * @param edge_type Edge type + * @param stream_view The cuda stream + * @param src Source vertex id + * @param dst Destination vertex id + * @param wgt Edge weight + * @param edge_id Edge id + * @param edge_type Edge type */ - void append(handle_t const& handle, + void append(rmm::cuda_stream_view stream_view, vertex_t src, vertex_t dst, std::optional wgt, std::optional edge_id, std::optional edge_type) { - if (current_pos_ == src_.size()) { flush(handle); } + if (current_pos_ == src_.size()) { flush(stream_view); } src_[current_pos_] = src; dst_[current_pos_] = dst; @@ -98,14 +97,14 @@ class per_thread_edgelist_t { /** * @brief Append a list of edges to the edge list * - * @param handle The resource handle + * @param stream_view The cuda stream * @param src Source vertex id * @param dst Destination vertex id * @param wgt Edge weight * @param edge_id Edge id * @param edge_type Edge type */ - void append(handle_t const& handle, + void append(rmm::cuda_stream_view stream_view, raft::host_span src, raft::host_span dst, std::optional> wgt, @@ -131,7 +130,7 @@ class per_thread_edgelist_t { edge_type.begin() + pos + copy_count, edge_type_->begin() + current_pos_); - if (current_pos_ == src_.size()) { flush(handle); } + if (current_pos_ == src_.size()) { flush(stream_view); } count -= copy_count; pos += copy_count; @@ -141,14 +140,14 @@ class per_thread_edgelist_t { /** * @brief Flush thread data from host to GPU memory * - * @param handle The resource handle + * @param stream_view The cuda stream * @param sync If true, synchronize the asynchronous copy of data; * defaults to false. */ - void flush(handle_t const& handle, bool sync = false) + void flush(rmm::cuda_stream_view stream_view, bool sync = false) { edgelist_.append( - handle.get_stream(), + stream_view, raft::host_span{src_.data(), current_pos_}, raft::host_span{dst_.data(), current_pos_}, wgt_ ? std::make_optional(raft::host_span{wgt_->data(), current_pos_}) @@ -161,7 +160,7 @@ class per_thread_edgelist_t { current_pos_ = 0; - if (sync) handle.sync_stream(); + if (sync) stream_view.synchronize(); } private: diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 1ad83761d51..08e9d76dd02 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -175,7 +175,7 @@ class Tests_Multithreaded for (size_t j = starting_edge_offset; j < h_src_v.size(); j += stride) { per_thread_edgelist.append( - thread_handle, + thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, @@ -183,7 +183,7 @@ class Tests_Multithreaded std::nullopt); } - per_thread_edgelist.flush(thread_handle); + per_thread_edgelist.flush(thread_handle.get_stream()); }); } diff --git a/cpp/tests/mtmg/threaded_test.cu b/cpp/tests/mtmg/threaded_test.cu index f55a102ea67..e320e6a9c25 100644 --- a/cpp/tests/mtmg/threaded_test.cu +++ b/cpp/tests/mtmg/threaded_test.cu @@ -191,7 +191,7 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle, + thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, @@ -199,7 +199,7 @@ class Tests_Multithreaded std::nullopt); } - per_thread_edgelist.flush(thread_handle); + per_thread_edgelist.flush(thread_handle.get_stream()); }); } diff --git a/cpp/tests/mtmg/threaded_test_jaccard.cu b/cpp/tests/mtmg/threaded_test_jaccard.cu index a64cc8ee1fa..681e23d3266 100644 --- a/cpp/tests/mtmg/threaded_test_jaccard.cu +++ b/cpp/tests/mtmg/threaded_test_jaccard.cu @@ -184,7 +184,7 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle, + thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, @@ -192,7 +192,7 @@ class Tests_Multithreaded std::nullopt); } - per_thread_edgelist.flush(thread_handle); + per_thread_edgelist.flush(thread_handle.get_stream()); }); } diff --git a/cpp/tests/mtmg/threaded_test_louvain.cu b/cpp/tests/mtmg/threaded_test_louvain.cu index c8faf33dae2..fb1a9412c3e 100644 --- a/cpp/tests/mtmg/threaded_test_louvain.cu +++ b/cpp/tests/mtmg/threaded_test_louvain.cu @@ -191,7 +191,7 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle, + thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, @@ -199,7 +199,7 @@ class Tests_Multithreaded std::nullopt); } - per_thread_edgelist.flush(thread_handle); + per_thread_edgelist.flush(thread_handle.get_stream()); }); } From 9afba4b2bd1213f5775ec2ce1a0923f7f7812f2b Mon Sep 17 00:00:00 2001 From: Charles Hastings Date: Wed, 22 May 2024 14:59:45 -0700 Subject: [PATCH 3/3] change order of stream parameter to match convention --- .../mtmg/detail/per_device_edgelist.hpp | 32 +++++++++---------- cpp/include/cugraph/mtmg/edgelist.hpp | 2 +- .../cugraph/mtmg/per_thread_edgelist.hpp | 20 ++++++------ cpp/tests/mtmg/multi_node_threaded_test.cu | 4 +-- cpp/tests/mtmg/threaded_test.cu | 4 +-- cpp/tests/mtmg/threaded_test_jaccard.cu | 4 +-- cpp/tests/mtmg/threaded_test_louvain.cu | 4 +-- 7 files changed, 35 insertions(+), 35 deletions(-) diff --git a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp index e7c7fcb0061..63d7fd9685e 100644 --- a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp @@ -62,17 +62,17 @@ class per_device_edgelist_t { /** * @brief Construct a new per device edgelist t object * - * @param stream_view CUDA stream view * @param device_buffer_size Number of edges to store in each device buffer * @param use_weight Whether or not the edgelist will have weights * @param use_edge_id Whether or not the edgelist will have edge ids * @param use_edge_type Whether or not the edgelist will have edge types + * @param stream_view CUDA stream view */ - per_device_edgelist_t(rmm::cuda_stream_view stream_view, - size_t device_buffer_size, + per_device_edgelist_t(size_t device_buffer_size, bool use_weight, bool use_edge_id, - bool use_edge_type) + bool use_edge_type, + rmm::cuda_stream_view stream_view) : device_buffer_size_{device_buffer_size}, current_pos_{0}, src_{}, @@ -111,19 +111,19 @@ class per_device_edgelist_t { /** * @brief Append a list of edges to the edge list * - * @param stream_view CUDA stream view * @param src Source vertex id * @param dst Destination vertex id * @param wgt Edge weight * @param edge_id Edge id * @param edge_type Edge type + * @param stream_view CUDA stream view */ - void append(rmm::cuda_stream_view stream_view, - raft::host_span src, + void append(raft::host_span src, raft::host_span dst, std::optional> wgt, std::optional> edge_id, - std::optional> edge_type) + std::optional> edge_type, + rmm::cuda_stream_view stream_view) { std::vector> copy_positions; @@ -235,11 +235,11 @@ class per_device_edgelist_t { return d_vector.size(); }); - resize_and_copy_buffers(stream, src_, total_size); - resize_and_copy_buffers(stream, dst_, total_size); - if (wgt_) resize_and_copy_buffers(stream, *wgt_, total_size); - if (edge_id_) resize_and_copy_buffers(stream, *edge_id_, total_size); - if (edge_type_) resize_and_copy_buffers(stream, *edge_type_, total_size); + resize_and_copy_buffers(src_, total_size, stream); + resize_and_copy_buffers(dst_, total_size, stream); + if (wgt_) resize_and_copy_buffers(*wgt_, total_size, stream); + if (edge_id_) resize_and_copy_buffers(*edge_id_, total_size, stream); + if (edge_type_) resize_and_copy_buffers(*edge_type_, total_size, stream); } auto tmp_wgt = wgt_ ? std::make_optional(std::move((*wgt_)[0])) : std::nullopt; @@ -267,9 +267,9 @@ class per_device_edgelist_t { private: template - void resize_and_copy_buffers(rmm::cuda_stream_view stream, - std::vector>& buffer, - size_t total_size) + void resize_and_copy_buffers(std::vector>& buffer, + size_t total_size, + rmm::cuda_stream_view stream) { size_t pos = buffer[0].size(); buffer[0].resize(total_size, stream); diff --git a/cpp/include/cugraph/mtmg/edgelist.hpp b/cpp/include/cugraph/mtmg/edgelist.hpp index f60b1024eee..d5d2bd2bca7 100644 --- a/cpp/include/cugraph/mtmg/edgelist.hpp +++ b/cpp/include/cugraph/mtmg/edgelist.hpp @@ -39,7 +39,7 @@ class edgelist_t : public detail::device_shared_wrapper_t< bool use_edge_type) { detail::per_device_edgelist_t tmp( - handle.get_stream(), device_buffer_size, use_weight, use_edge_id, use_edge_type); + device_buffer_size, use_weight, use_edge_id, use_edge_type, handle.get_stream()); detail::device_shared_wrapper_t< detail::per_device_edgelist_t>::set(handle, diff --git a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp index 8f8b74fe9ec..73d69fdd5a7 100644 --- a/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/per_thread_edgelist.hpp @@ -69,19 +69,19 @@ class per_thread_edgelist_t { /** * @brief Append an edge to the edge list * - * @param stream_view The cuda stream * @param src Source vertex id * @param dst Destination vertex id * @param wgt Edge weight * @param edge_id Edge id * @param edge_type Edge type + * @param stream_view The cuda stream */ - void append(rmm::cuda_stream_view stream_view, - vertex_t src, + void append(vertex_t src, vertex_t dst, std::optional wgt, std::optional edge_id, - std::optional edge_type) + std::optional edge_type, + rmm::cuda_stream_view stream_view) { if (current_pos_ == src_.size()) { flush(stream_view); } @@ -97,19 +97,19 @@ class per_thread_edgelist_t { /** * @brief Append a list of edges to the edge list * - * @param stream_view The cuda stream * @param src Source vertex id * @param dst Destination vertex id * @param wgt Edge weight * @param edge_id Edge id * @param edge_type Edge type + * @param stream_view The cuda stream */ - void append(rmm::cuda_stream_view stream_view, - raft::host_span src, + void append(raft::host_span src, raft::host_span dst, std::optional> wgt, std::optional> edge_id, - std::optional> edge_type) + std::optional> edge_type, + rmm::cuda_stream_view stream_view) { size_t count = src.size(); size_t pos = 0; @@ -147,7 +147,6 @@ class per_thread_edgelist_t { void flush(rmm::cuda_stream_view stream_view, bool sync = false) { edgelist_.append( - stream_view, raft::host_span{src_.data(), current_pos_}, raft::host_span{dst_.data(), current_pos_}, wgt_ ? std::make_optional(raft::host_span{wgt_->data(), current_pos_}) @@ -156,7 +155,8 @@ class per_thread_edgelist_t { : std::nullopt, edge_type_ ? std::make_optional(raft::host_span{edge_type_->data(), current_pos_}) - : std::nullopt); + : std::nullopt, + stream_view); current_pos_ = 0; diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index 08e9d76dd02..24852562b86 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -175,12 +175,12 @@ class Tests_Multithreaded for (size_t j = starting_edge_offset; j < h_src_v.size(); j += stride) { per_thread_edgelist.append( - thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, std::nullopt, - std::nullopt); + std::nullopt, + thread_handle.get_stream()); } per_thread_edgelist.flush(thread_handle.get_stream()); diff --git a/cpp/tests/mtmg/threaded_test.cu b/cpp/tests/mtmg/threaded_test.cu index e320e6a9c25..df5a9e079df 100644 --- a/cpp/tests/mtmg/threaded_test.cu +++ b/cpp/tests/mtmg/threaded_test.cu @@ -191,12 +191,12 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, std::nullopt, - std::nullopt); + std::nullopt, + thread_handle.get_stream()); } per_thread_edgelist.flush(thread_handle.get_stream()); diff --git a/cpp/tests/mtmg/threaded_test_jaccard.cu b/cpp/tests/mtmg/threaded_test_jaccard.cu index 681e23d3266..0f531796cff 100644 --- a/cpp/tests/mtmg/threaded_test_jaccard.cu +++ b/cpp/tests/mtmg/threaded_test_jaccard.cu @@ -184,12 +184,12 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, std::nullopt, - std::nullopt); + std::nullopt, + thread_handle.get_stream()); } per_thread_edgelist.flush(thread_handle.get_stream()); diff --git a/cpp/tests/mtmg/threaded_test_louvain.cu b/cpp/tests/mtmg/threaded_test_louvain.cu index fb1a9412c3e..ab51d701b57 100644 --- a/cpp/tests/mtmg/threaded_test_louvain.cu +++ b/cpp/tests/mtmg/threaded_test_louvain.cu @@ -191,12 +191,12 @@ class Tests_Multithreaded for (size_t j = i; j < h_src_v.size(); j += num_threads) { per_thread_edgelist.append( - thread_handle.get_stream(), h_src_v[j], h_dst_v[j], h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, std::nullopt, - std::nullopt); + std::nullopt, + thread_handle.get_stream()); } per_thread_edgelist.flush(thread_handle.get_stream());