From 7197258315269d22d9b538f285ac216a8c014016 Mon Sep 17 00:00:00 2001 From: Divye Gala Date: Mon, 13 Dec 2021 19:55:03 -0800 Subject: [PATCH] Raft Handle Updates to cuGraph (#1894) Depends on https://github.com/rapidsai/raft/pull/291 Authors: - Divye Gala (https://github.com/divyegala) Approvers: - Brad Rees (https://github.com/BradReesWork) - AJ Schmidt (https://github.com/ajschmidt8) - Rick Ratzel (https://github.com/rlratzel) - Chuck Hastings (https://github.com/ChuckHastings) URL: https://github.com/rapidsai/cugraph/pull/1894 --- ci/cpu/build.sh | 7 ++ conda/recipes/pylibcugraph/meta.yaml | 1 + ...ransform_reduce_key_aggregated_out_nbr.cuh | 4 +- .../update_frontier_v_push_if_out_nbr.cuh | 5 +- cpp/include/cugraph/prims/vertex_frontier.cuh | 2 +- cpp/src/c_api/array.cpp | 2 +- cpp/src/community/legacy/ecg.cu | 10 ++- cpp/src/community/legacy/egonet.cu | 14 ++-- cpp/src/community/legacy/leiden.cuh | 14 ++-- cpp/src/community/legacy/louvain.cuh | 66 +++++++++---------- cpp/src/community/louvain.cuh | 22 +++---- .../weakly_connected_components_impl.cuh | 47 +++++++------ cpp/src/generators/erdos_renyi_generator.cu | 2 +- cpp/src/generators/generate_rmat_edgelist.cu | 8 +-- cpp/src/generators/generator_tools.cu | 6 +- cpp/src/generators/simple_generators.cu | 8 +-- cpp/src/layout/barnes_hut.cuh | 4 +- cpp/src/layout/exact_fa2.cuh | 4 +- cpp/src/linear_assignment/hungarian.cu | 21 +++--- cpp/src/sampling/random_walks.cuh | 6 +- cpp/src/structure/coarsen_graph_impl.cuh | 2 +- .../create_graph_from_edgelist_impl.cuh | 4 +- cpp/src/structure/graph_impl.cuh | 10 +-- cpp/src/structure/graph_view_impl.cuh | 8 +-- cpp/src/structure/induced_subgraph_impl.cuh | 26 ++++---- cpp/src/structure/relabel_impl.cuh | 45 ++++++------- cpp/src/structure/renumber_edgelist_impl.cuh | 8 +-- cpp/src/structure/renumber_utils_impl.cuh | 40 ++++++----- cpp/src/utilities/cython.cu | 16 ++--- cpp/src/utilities/graph_bcast.cuh | 2 +- cpp/tests/centrality/katz_centrality_test.cpp | 6 +- .../centrality/mg_katz_centrality_test.cpp | 2 +- cpp/tests/community/egonet_test.cu | 5 +- .../mg_weakly_connected_components_test.cpp | 2 +- cpp/tests/components/wcc_graphs.cu | 4 +- .../weakly_connected_components_test.cpp | 7 +- cpp/tests/cores/core_number_test.cpp | 6 +- cpp/tests/cores/mg_core_number_test.cpp | 2 +- cpp/tests/generators/erdos_renyi_test.cpp | 4 +- cpp/tests/linear_assignment/hungarian_test.cu | 52 +++++++-------- cpp/tests/link_analysis/hits_test.cpp | 2 +- cpp/tests/link_analysis/mg_hits_test.cpp | 2 +- cpp/tests/link_analysis/mg_pagerank_test.cpp | 2 +- cpp/tests/link_analysis/pagerank_test.cpp | 4 +- .../count_self_loops_and_multi_edges_test.cpp | 2 +- cpp/tests/structure/renumbering_test.cpp | 6 +- cpp/tests/structure/streams.cu | 11 ++-- cpp/tests/traversal/bfs_test.cpp | 8 +-- cpp/tests/traversal/mg_bfs_test.cpp | 4 +- cpp/tests/traversal/mg_sssp_test.cpp | 4 +- cpp/tests/traversal/ms_bfs_test.cu | 9 +-- cpp/tests/traversal/sssp_test.cpp | 8 +-- .../utilities/matrix_market_file_utilities.cu | 4 +- cpp/tests/utilities/test_graphs.hpp | 6 +- cpp/tests/utilities/test_utilities.hpp | 4 +- .../cugraph/community/egonet_wrapper.pyx | 6 +- 56 files changed, 290 insertions(+), 296 deletions(-) diff --git a/ci/cpu/build.sh b/ci/cpu/build.sh index c83ebfb9445..e8292522be3 100755 --- a/ci/cpu/build.sh +++ b/ci/cpu/build.sh @@ -62,6 +62,13 @@ conda list --show-channel-urls # FIX Added to deal with Anancoda SSL verification issues during conda builds conda config --set ssl_verify False +# FIXME: for now, force the building of all packages so they are built on a +# machine with a single CUDA version, then have the gpu/build.sh script simply +# install. This should eliminate a mismatch between different CUDA versions on +# cpu vs. gpu builds that is problematic with CUDA 11.5 Enhanced Compat. +BUILD_LIBCUGRAPH=1 +BUILD_CUGRAPH=1 + ############################################################################### # BUILD - Conda package builds ############################################################################### diff --git a/conda/recipes/pylibcugraph/meta.yaml b/conda/recipes/pylibcugraph/meta.yaml index 306864f0d4d..a8001344e1a 100644 --- a/conda/recipes/pylibcugraph/meta.yaml +++ b/conda/recipes/pylibcugraph/meta.yaml @@ -31,6 +31,7 @@ requirements: - ucx-py 0.24 - ucx-proc=*=gpu - cudatoolkit {{ cuda_version }}.* + - rmm {{ minor_version }}.* run: - python x.x - libcugraph={{ version }} diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index 9aca0361ccf..38259918ac1 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -266,7 +266,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( handle.get_stream()); } - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream kv_map_ptr.reset(); @@ -284,7 +284,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer))); kv_map_ptr->insert(pair_first, pair_first + map_keys.size()); } else { - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream kv_map_ptr.reset(); diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index fe9d884499b..a1004a76c63 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -637,8 +637,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - rmm::device_uvector frontier_vertices(local_frontier_sizes[i], - handle.get_stream_view()); + rmm::device_uvector frontier_vertices(local_frontier_sizes[i], handle.get_stream()); device_bcast(col_comm, local_frontier_vertex_first, frontier_vertices.data(), @@ -1096,7 +1095,7 @@ void update_frontier_v_push_if_out_nbr( d_tx_buffer_last_boundaries.data(), d_tx_buffer_last_boundaries.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector tx_counts(h_tx_buffer_last_boundaries.size()); std::adjacent_difference( h_tx_buffer_last_boundaries.begin(), h_tx_buffer_last_boundaries.end(), tx_counts.begin()); diff --git a/cpp/include/cugraph/prims/vertex_frontier.cuh b/cpp/include/cugraph/prims/vertex_frontier.cuh index 82e0f4ab880..86b55ab3f16 100644 --- a/cpp/include/cugraph/prims/vertex_frontier.cuh +++ b/cpp/include/cugraph/prims/vertex_frontier.cuh @@ -433,7 +433,7 @@ class VertexFrontier { h_indices.data(), d_indices.data(), d_indices.size(), handle_ptr_->get_stream()); raft::update_host( h_counts.data(), d_counts.data(), d_counts.size(), handle_ptr_->get_stream()); - handle_ptr_->get_stream_view().synchronize(); + handle_ptr_->sync_stream(); size_t offset{0}; for (size_t i = 0; i < h_indices.size(); ++i) { diff --git a/cpp/src/c_api/array.cpp b/cpp/src/c_api/array.cpp index d7d982ae5f0..01253954b1e 100644 --- a/cpp/src/c_api/array.cpp +++ b/cpp/src/c_api/array.cpp @@ -52,7 +52,7 @@ extern "C" cugraph_error_code_t cugraph_type_erased_device_array_create( size_t n_bytes = n_elems * (::data_type_sz[dtype]); auto ret_value = new cugraph::c_api::cugraph_type_erased_device_array_t( - n_elems, n_bytes, dtype, raft_handle->get_stream_view()); + n_elems, n_bytes, dtype, raft_handle->get_stream()); *array = reinterpret_cast(ret_value); return CUGRAPH_SUCCESS; diff --git a/cpp/src/community/legacy/ecg.cu b/cpp/src/community/legacy/ecg.cu index 30af37ec2e5..40a2ae27b5b 100644 --- a/cpp/src/community/legacy/ecg.cu +++ b/cpp/src/community/legacy/ecg.cu @@ -116,12 +116,10 @@ class EcgLouvain : public cugraph::legacy::Louvain { void initialize_dendrogram_level(vertex_t num_vertices) override { - this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream_view()); + this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream()); - get_permutation_vector(num_vertices, - seed_, - this->dendrogram_->current_level_begin(), - this->handle_.get_stream_view()); + get_permutation_vector( + num_vertices, seed_, this->dendrogram_->current_level_begin(), this->handle_.get_stream()); } private: @@ -147,7 +145,7 @@ void ecg(raft::handle_t const& handle, "Invalid input argument: clustering is NULL, should be a device pointer to " "memory for storing the result"); - rmm::device_uvector ecg_weights_v(graph.number_of_edges, handle.get_stream_view()); + rmm::device_uvector ecg_weights_v(graph.number_of_edges, handle.get_stream()); thrust::copy(handle.get_thrust_policy(), graph.edge_data, diff --git a/cpp/src/community/legacy/egonet.cu b/cpp/src/community/legacy/egonet.cu index 3f79e58615e..b4cfd51100d 100644 --- a/cpp/src/community/legacy/egonet.cu +++ b/cpp/src/community/legacy/egonet.cu @@ -66,7 +66,7 @@ extract(raft::handle_t const& handle, vertex_t radius) { auto v = csr_view.get_number_of_vertices(); - auto user_stream_view = handle.get_stream_view(); + auto user_stream_view = handle.get_stream(); rmm::device_vector neighbors_offsets(n_subgraphs + 1); rmm::device_vector neighbors; @@ -77,7 +77,7 @@ extract(raft::handle_t const& handle, reached.reserve(n_subgraphs); for (vertex_t i = 0; i < n_subgraphs; i++) { // Allocations and operations are attached to the worker stream - rmm::device_uvector local_reach(v, handle.get_internal_stream_view(i)); + rmm::device_uvector local_reach(v, handle.get_next_usable_stream(i)); reached.push_back(std::move(local_reach)); } @@ -89,8 +89,8 @@ extract(raft::handle_t const& handle, for (vertex_t i = 0; i < n_subgraphs; i++) { // get light handle from worker pool - raft::handle_t light_handle(handle, i); - auto worker_stream_view = light_handle.get_stream_view(); + raft::handle_t light_handle(handle.get_next_usable_stream(i)); + auto worker_stream_view = light_handle.get_stream(); // BFS with cutoff // consider adding a device API to BFS (ie. accept source on the device) @@ -132,7 +132,7 @@ extract(raft::handle_t const& handle, } // wait on every one to identify their neighboors before proceeding to concatenation - handle.wait_on_internal_streams(); + handle.sync_stream_pool(); // Construct neighboors offsets (just a scan on neighborhod vector sizes) h_neighbors_offsets[0] = 0; @@ -148,7 +148,7 @@ extract(raft::handle_t const& handle, // Construct the neighboors list concurrently for (vertex_t i = 0; i < n_subgraphs; i++) { - auto worker_stream_view = handle.get_internal_stream_view(i); + auto worker_stream_view = handle.get_next_usable_stream(i); thrust::copy(rmm::exec_policy(worker_stream_view), reached[i].begin(), reached[i].end(), @@ -160,7 +160,7 @@ extract(raft::handle_t const& handle, } // wait on every one before proceeding to grouped extraction - handle.wait_on_internal_streams(); + handle.sync_stream_pool(); #ifdef TIMING hr_timer.stop(); diff --git a/cpp/src/community/legacy/leiden.cuh b/cpp/src/community/legacy/leiden.cuh index 36778d9ab37..e8c0c518707 100644 --- a/cpp/src/community/legacy/leiden.cuh +++ b/cpp/src/community/legacy/leiden.cuh @@ -43,12 +43,11 @@ class Leiden : public Louvain { this->timer_start("update_clustering_constrained"); rmm::device_uvector next_cluster_v(this->dendrogram_->current_level_size(), - this->handle_.get_stream_view()); - rmm::device_uvector delta_Q_v(graph.number_of_edges, this->handle_.get_stream_view()); - rmm::device_uvector cluster_hash_v(graph.number_of_edges, - this->handle_.get_stream_view()); + this->handle_.get_stream()); + rmm::device_uvector delta_Q_v(graph.number_of_edges, this->handle_.get_stream()); + rmm::device_uvector cluster_hash_v(graph.number_of_edges, this->handle_.get_stream()); rmm::device_uvector old_cluster_sum_v(graph.number_of_vertices, - this->handle_.get_stream_view()); + this->handle_.get_stream()); vertex_t const* d_src_indices = this->src_indices_v_.data(); vertex_t const* d_dst_indices = graph.indices; @@ -105,7 +104,7 @@ class Leiden : public Louvain { } } - this->timer_stop(this->handle_.get_stream_view()); + this->timer_stop(this->handle_.get_stream()); return cur_Q; } @@ -134,8 +133,7 @@ class Leiden : public Louvain { // // Initialize every cluster to reference each vertex to itself // - this->dendrogram_->add_level( - 0, current_graph.number_of_vertices, this->handle_.get_stream_view()); + this->dendrogram_->add_level(0, current_graph.number_of_vertices, this->handle_.get_stream()); thrust::sequence(this->handle_.get_thrust_policy(), this->dendrogram_->current_level_begin(), diff --git a/cpp/src/community/legacy/louvain.cuh b/cpp/src/community/legacy/louvain.cuh index c7292c2590a..19e26b31071 100644 --- a/cpp/src/community/legacy/louvain.cuh +++ b/cpp/src/community/legacy/louvain.cuh @@ -54,14 +54,14 @@ class Louvain { // to change the logic to populate this properly // in generate_superverticies_graph. // - offsets_v_(graph.number_of_vertices + 1, handle.get_stream_view()), - indices_v_(graph.number_of_edges, handle.get_stream_view()), - weights_v_(graph.number_of_edges, handle.get_stream_view()), - src_indices_v_(graph.number_of_edges, handle.get_stream_view()), - vertex_weights_v_(graph.number_of_vertices, handle.get_stream_view()), - cluster_weights_v_(graph.number_of_vertices, handle.get_stream_view()), - tmp_arr_v_(graph.number_of_vertices, handle.get_stream_view()), - cluster_inverse_v_(graph.number_of_vertices, handle.get_stream_view()), + offsets_v_(graph.number_of_vertices + 1, handle.get_stream()), + indices_v_(graph.number_of_edges, handle.get_stream()), + weights_v_(graph.number_of_edges, handle.get_stream()), + src_indices_v_(graph.number_of_edges, handle.get_stream()), + vertex_weights_v_(graph.number_of_vertices, handle.get_stream()), + cluster_weights_v_(graph.number_of_vertices, handle.get_stream()), + tmp_arr_v_(graph.number_of_vertices, handle.get_stream()), + cluster_inverse_v_(graph.number_of_vertices, handle.get_stream()), number_of_vertices_(graph.number_of_vertices), number_of_edges_(graph.number_of_edges) { @@ -90,8 +90,8 @@ class Louvain { { vertex_t n_verts = graph.number_of_vertices; - rmm::device_uvector inc(n_verts, handle_.get_stream_view()); - rmm::device_uvector deg(n_verts, handle_.get_stream_view()); + rmm::device_uvector inc(n_verts, handle_.get_stream()); + rmm::device_uvector deg(n_verts, handle_.get_stream()); thrust::fill(handle_.get_thrust_policy(), inc.begin(), inc.end(), weight_t{0.0}); thrust::fill(handle_.get_thrust_policy(), deg.begin(), deg.end(), weight_t{0.0}); @@ -211,7 +211,7 @@ class Louvain { virtual void initialize_dendrogram_level(vertex_t num_vertices) { - dendrogram_->add_level(0, num_vertices, handle_.get_stream_view()); + dendrogram_->add_level(0, num_vertices, handle_.get_stream()); thrust::sequence(handle_.get_thrust_policy(), dendrogram_->current_level_begin(), @@ -245,7 +245,7 @@ class Louvain { d_cluster_weights[src] = sum; }); - timer_stop(handle_.get_stream_view()); + timer_stop(handle_.get_stream()); } virtual weight_t update_clustering(weight_t total_edge_weight, @@ -255,11 +255,10 @@ class Louvain { timer_start("update_clustering"); rmm::device_uvector next_cluster_v(dendrogram_->current_level_size(), - handle_.get_stream_view()); - rmm::device_uvector delta_Q_v(graph.number_of_edges, handle_.get_stream_view()); - rmm::device_uvector cluster_hash_v(graph.number_of_edges, handle_.get_stream_view()); - rmm::device_uvector old_cluster_sum_v(graph.number_of_vertices, - handle_.get_stream_view()); + handle_.get_stream()); + rmm::device_uvector delta_Q_v(graph.number_of_edges, handle_.get_stream()); + rmm::device_uvector cluster_hash_v(graph.number_of_edges, handle_.get_stream()); + rmm::device_uvector old_cluster_sum_v(graph.number_of_vertices, handle_.get_stream()); vertex_t* d_cluster = dendrogram_->current_level_begin(); weight_t const* d_vertex_weights = vertex_weights_v_.data(); @@ -301,7 +300,7 @@ class Louvain { } } - timer_stop(handle_.get_stream_view()); + timer_stop(handle_.get_stream()); return cur_Q; } @@ -409,12 +408,9 @@ class Louvain { rmm::device_uvector& delta_Q_v, bool up_down) { - rmm::device_uvector temp_vertices_v(graph.number_of_vertices, - handle_.get_stream_view()); - rmm::device_uvector temp_cluster_v(graph.number_of_vertices, - handle_.get_stream_view()); - rmm::device_uvector temp_delta_Q_v(graph.number_of_vertices, - handle_.get_stream_view()); + rmm::device_uvector temp_vertices_v(graph.number_of_vertices, handle_.get_stream()); + rmm::device_uvector temp_cluster_v(graph.number_of_vertices, handle_.get_stream()); + rmm::device_uvector temp_delta_Q_v(graph.number_of_vertices, handle_.get_stream()); thrust::fill( handle_.get_thrust_policy(), temp_cluster_v.begin(), temp_cluster_v.end(), vertex_t{-1}); @@ -479,12 +475,12 @@ class Louvain { // renumber the clusters to the range 0..(num_clusters-1) vertex_t num_clusters = renumber_clusters(); - cluster_weights_v_.resize(num_clusters, handle_.get_stream_view()); + cluster_weights_v_.resize(num_clusters, handle_.get_stream()); // shrink our graph to represent the graph of supervertices generate_superverticies_graph(graph, num_clusters); - timer_stop(handle_.get_stream_view()); + timer_stop(handle_.get_stream()); } vertex_t renumber_clusters() @@ -526,7 +522,7 @@ class Louvain { [d_cluster_inverse] __device__(const vertex_t idx) { return d_cluster_inverse[idx] == 1; }); vertex_t new_num_clusters = thrust::distance(tmp_arr_v_.begin(), copy_end); - tmp_arr_v_.resize(new_num_clusters, handle_.get_stream_view()); + tmp_arr_v_.resize(new_num_clusters, handle_.get_stream()); // // Now we can set each value in cluster_inverse of a cluster to its index @@ -545,16 +541,16 @@ class Louvain { d_cluster[i] = d_cluster_inverse[d_cluster[i]]; }); - cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream_view()); + cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream()); return new_num_clusters; } void generate_superverticies_graph(graph_t& graph, vertex_t num_clusters) { - rmm::device_uvector new_src_v(graph.number_of_edges, handle_.get_stream_view()); - rmm::device_uvector new_dst_v(graph.number_of_edges, handle_.get_stream_view()); - rmm::device_uvector new_weight_v(graph.number_of_edges, handle_.get_stream_view()); + rmm::device_uvector new_src_v(graph.number_of_edges, handle_.get_stream()); + rmm::device_uvector new_dst_v(graph.number_of_edges, handle_.get_stream()); + rmm::device_uvector new_weight_v(graph.number_of_edges, handle_.get_stream()); // // Renumber the COO @@ -609,11 +605,11 @@ class Louvain { graph.offsets, num_clusters, graph.number_of_edges, - handle_.get_stream_view()); + handle_.get_stream()); - src_indices_v_.resize(graph.number_of_edges, handle_.get_stream_view()); - indices_v_.resize(graph.number_of_edges, handle_.get_stream_view()); - weights_v_.resize(graph.number_of_edges, handle_.get_stream_view()); + src_indices_v_.resize(graph.number_of_edges, handle_.get_stream()); + indices_v_.resize(graph.number_of_edges, handle_.get_stream()); + weights_v_.resize(graph.number_of_edges, handle_.get_stream()); } protected: diff --git a/cpp/src/community/louvain.cuh b/cpp/src/community/louvain.cuh index a202b8394cc..da9425e0a4b 100644 --- a/cpp/src/community/louvain.cuh +++ b/cpp/src/community/louvain.cuh @@ -138,11 +138,11 @@ class Louvain { handle_(handle), dendrogram_(std::make_unique>()), current_graph_view_(graph_view), - cluster_keys_v_(0, handle.get_stream_view()), - cluster_weights_v_(0, handle.get_stream_view()), + cluster_keys_v_(0, handle.get_stream()), + cluster_weights_v_(0, handle.get_stream()), vertex_weights_v_(0, handle.get_stream()), src_vertex_weights_cache_(), - next_clusters_v_(0, handle.get_stream_view()), + next_clusters_v_(0, handle.get_stream()), src_clusters_cache_(), dst_clusters_cache_() { @@ -231,7 +231,7 @@ class Louvain { { dendrogram_->add_level(current_graph_view_.get_local_vertex_first(), current_graph_view_.get_number_of_local_vertices(), - handle_.get_stream_view()); + handle_.get_stream()); thrust::sequence(handle_.get_thrust_policy(), dendrogram_->current_level_begin(), @@ -286,8 +286,8 @@ class Louvain { timer_start("compute_vertex_and_cluster_weights"); vertex_weights_v_ = current_graph_view_.compute_out_weight_sums(handle_); - cluster_keys_v_.resize(vertex_weights_v_.size(), handle_.get_stream_view()); - cluster_weights_v_.resize(vertex_weights_v_.size(), handle_.get_stream_view()); + cluster_keys_v_.resize(vertex_weights_v_.size(), handle_.get_stream()); + cluster_weights_v_.resize(vertex_weights_v_.size(), handle_.get_stream()); thrust::sequence(handle_.get_thrust_policy(), cluster_keys_v_.begin(), @@ -301,8 +301,8 @@ class Louvain { if constexpr (graph_view_t::is_multi_gpu) { auto const comm_size = handle_.get_comms().get_size(); - rmm::device_uvector rx_keys_v(0, handle_.get_stream_view()); - rmm::device_uvector rx_weights_v(0, handle_.get_stream_view()); + rmm::device_uvector rx_keys_v(0, handle_.get_stream()); + rmm::device_uvector rx_weights_v(0, handle_.get_stream()); auto pair_first = thrust::make_zip_iterator( thrust::make_tuple(cluster_keys_v_.begin(), cluster_weights_v_.begin())); @@ -315,7 +315,7 @@ class Louvain { [key_func = cugraph::detail::compute_gpu_id_from_vertex_t{ comm_size}] __device__(auto val) { return key_func(thrust::get<0>(val)); }, - handle_.get_stream_view()); + handle_.get_stream()); cluster_keys_v_ = std::move(rx_keys_v); cluster_weights_v_ = std::move(rx_weights_v); @@ -330,7 +330,7 @@ class Louvain { vertex_weights_v_.shrink_to_fit(handle_.get_stream()); } - timer_stop(handle_.get_stream_view()); + timer_stop(handle_.get_stream()); } virtual weight_t update_clustering(weight_t total_edge_weight, weight_t resolution) @@ -379,7 +379,7 @@ class Louvain { } } - timer_stop(handle_.get_stream_view()); + timer_stop(handle_.get_stream()); return cur_Q; } diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 66c9447605d..21e9571fbb2 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -77,7 +77,7 @@ accumulate_new_roots(raft::handle_t const& handle, vertex_t max_scan_size = static_cast(handle.get_device_properties().multiProcessorCount) * vertex_t{16384}; - rmm::device_uvector new_roots(max_new_roots, handle.get_stream_view()); + rmm::device_uvector new_roots(max_new_roots, handle.get_stream()); vertex_t num_new_roots{0}; vertex_t num_scanned{0}; edge_t degree_sum{0}; @@ -87,8 +87,8 @@ accumulate_new_roots(raft::handle_t const& handle, max_scan_size, static_cast(thrust::distance(candidate_first + num_scanned, candidate_last))); - rmm::device_uvector tmp_new_roots(scan_size, handle.get_stream_view()); - rmm::device_uvector tmp_indices(tmp_new_roots.size(), handle.get_stream_view()); + rmm::device_uvector tmp_new_roots(scan_size, handle.get_stream()); + rmm::device_uvector tmp_indices(tmp_new_roots.size(), handle.get_stream()); auto input_pair_first = thrust::make_zip_iterator(thrust::make_tuple( candidate_first + num_scanned, thrust::make_counting_iterator(vertex_t{0}))); auto output_pair_first = @@ -106,12 +106,11 @@ accumulate_new_roots(raft::handle_t const& handle, return (components[vertex_partition.get_local_vertex_offset_from_vertex_nocheck(v)] == invalid_component_id::value); }))), - handle.get_stream_view()); - tmp_indices.resize(tmp_new_roots.size(), handle.get_stream_view()); + handle.get_stream()); + tmp_indices.resize(tmp_new_roots.size(), handle.get_stream()); if (tmp_new_roots.size() > 0) { - rmm::device_uvector tmp_cumulative_degrees(tmp_new_roots.size(), - handle.get_stream_view()); + rmm::device_uvector tmp_cumulative_degrees(tmp_new_roots.size(), handle.get_stream()); thrust::transform( handle.get_thrust_policy(), tmp_new_roots.begin(), @@ -150,7 +149,7 @@ accumulate_new_roots(raft::handle_t const& handle, tmp_cumulative_degrees.data() + (tmp_num_new_roots - 1), size_t{1}, handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); num_scanned += tmp_num_scanned; degree_sum += tmp_degree_sum; } else { @@ -158,8 +157,8 @@ accumulate_new_roots(raft::handle_t const& handle, } } - new_roots.resize(num_new_roots, handle.get_stream_view()); - new_roots.shrink_to_fit(handle.get_stream_view()); + new_roots.resize(num_new_roots, handle.get_stream()); + new_roots.shrink_to_fit(handle.get_stream()); return std::make_tuple(std::move(new_roots), num_scanned, degree_sum); } @@ -262,7 +261,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, GraphViewType::is_adj_matrix_transposed, GraphViewType::is_multi_gpu> level_graph(handle); - rmm::device_uvector level_renumber_map(0, handle.get_stream_view()); + rmm::device_uvector level_renumber_map(0, handle.get_stream()); std::vector> level_component_vectors{}; // vertex ID in this level to the component ID in the previous level std::vector> level_renumber_map_vectors{}; @@ -273,7 +272,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, level_graph_view.get_vertex_partition_view()); level_component_vectors.push_back(rmm::device_uvector( num_levels == 0 ? vertex_t{0} : level_graph_view.get_number_of_local_vertices(), - handle.get_stream_view())); + handle.get_stream())); level_renumber_map_vectors.push_back(std::move(level_renumber_map)); level_local_vertex_first_vectors.push_back(level_graph_view.get_local_vertex_first()); auto level_components = @@ -308,7 +307,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // selected as roots in the remaining connected components. rmm::device_uvector new_root_candidates( - level_graph_view.get_number_of_local_vertices(), handle.get_stream_view()); + level_graph_view.get_number_of_local_vertices(), handle.get_stream()); new_root_candidates.resize( thrust::distance( new_root_candidates.begin(), @@ -321,7 +320,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, return level_components[vertex_partition.get_local_vertex_offset_from_vertex_nocheck( v)] == invalid_component_id::value; })), - handle.get_stream_view()); + handle.get_stream()); auto high_degree_partition_last = thrust::stable_partition( handle.get_thrust_policy(), new_root_candidates.begin(), @@ -412,7 +411,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // FIXME: we need to add host_scalar_scatter #if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream_view()); + rmm::device_uvector d_counts(comm_size, handle.get_stream()); raft::update_device(d_counts.data(), init_max_new_root_counts.data(), init_max_new_root_counts.size(), @@ -428,7 +427,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, } else { // FIXME: we need to add host_scalar_scatter #if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream_view()); + rmm::device_uvector d_counts(comm_size, handle.get_stream()); device_bcast( comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream()); raft::update_host( @@ -439,7 +438,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, #endif } - handle.get_stream_view().synchronize(); + handle.sync_stream(); init_max_new_roots = std::min(init_max_new_roots, max_new_roots); } @@ -457,7 +456,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, allocate_dataframe_buffer>(0, handle.get_stream()); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this make it less attractive. - rmm::device_scalar num_edge_inserts(size_t{0}, handle.get_stream_view()); + rmm::device_scalar num_edge_inserts(size_t{0}, handle.get_stream()); auto adj_matrix_col_components = GraphViewType::is_multi_gpu @@ -528,7 +527,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // FIXME: if we use cuco::static_map (no duplicates, ideally we need static_set), edge_buffer // size cannot exceed (# roots)^2 and we can avoid additional sort & unique (but resizing the // buffer may be more expensive). - auto old_num_edge_inserts = num_edge_inserts.value(handle.get_stream_view()); + auto old_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); resize_dataframe_buffer(edge_buffer, old_num_edge_inserts + max_pushes, handle.get_stream()); update_frontier_v_push_if_out_nbr( @@ -577,7 +576,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, static_cast(Bucket::conflict)}); if (GraphViewType::is_multi_gpu) { - auto cur_num_edge_inserts = num_edge_inserts.value(handle.get_stream_view()); + auto cur_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); auto& conflict_bucket = vertex_frontier.get_bucket(static_cast(Bucket::conflict)); resize_dataframe_buffer( edge_buffer, cur_num_edge_inserts + conflict_bucket.size(), handle.get_stream()); @@ -605,7 +604,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // maintain the list of sorted unique edges (we can avoid this if we use cuco::static_map(no // duplicates, ideally we need static_set)). - auto new_num_edge_inserts = num_edge_inserts.value(handle.get_stream_view()); + auto new_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); if (new_num_edge_inserts > old_num_edge_inserts) { auto edge_first = get_dataframe_buffer_begin(edge_buffer); thrust::sort(handle.get_thrust_policy(), @@ -627,7 +626,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto unique_edge_last = thrust::unique(handle.get_thrust_policy(), edge_first, edge_first + new_num_edge_inserts); auto num_unique_edges = static_cast(thrust::distance(edge_first, unique_edge_last)); - num_edge_inserts.set_value_async(num_unique_edges, handle.get_stream_view()); + num_edge_inserts.set_value_async(num_unique_edges, handle.get_stream()); } vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); @@ -652,7 +651,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // 2-5. construct the next level graph from the edges emitted on conflicts - auto num_inserts = num_edge_inserts.value(handle.get_stream_view()); + auto num_inserts = num_edge_inserts.value(handle.get_stream()); auto aggregate_num_inserts = num_inserts; if (GraphViewType::is_multi_gpu) { auto& comm = handle.get_comms(); @@ -725,7 +724,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, size_t current_level = next_level - 1; rmm::device_uvector next_local_vertices(level_renumber_map_vectors[next_level].size(), - handle.get_stream_view()); + handle.get_stream()); thrust::sequence(handle.get_thrust_policy(), next_local_vertices.begin(), next_local_vertices.end(), diff --git a/cpp/src/generators/erdos_renyi_generator.cu b/cpp/src/generators/erdos_renyi_generator.cu index 3f8f558e4fe..c89c534e0f3 100644 --- a/cpp/src/generators/erdos_renyi_generator.cu +++ b/cpp/src/generators/erdos_renyi_generator.cu @@ -74,7 +74,7 @@ generate_erdos_renyi_graph_edgelist_gnp(raft::handle_t const& handle, static_cast(dst)); }); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(src_v), std::move(dst_v)); } diff --git a/cpp/src/generators/generate_rmat_edgelist.cu b/cpp/src/generators/generate_rmat_edgelist.cu index 2b63fd699b1..8ee99d61747 100644 --- a/cpp/src/generators/generate_rmat_edgelist.cu +++ b/cpp/src/generators/generate_rmat_edgelist.cu @@ -51,10 +51,10 @@ std::tuple, rmm::device_uvector> generat auto max_edges_to_generate_per_iteration = static_cast(handle.get_device_properties().multiProcessorCount) * 1024; rmm::device_uvector rands( - std::min(num_edges, max_edges_to_generate_per_iteration) * 2 * scale, handle.get_stream_view()); + std::min(num_edges, max_edges_to_generate_per_iteration) * 2 * scale, handle.get_stream()); - rmm::device_uvector srcs(num_edges, handle.get_stream_view()); - rmm::device_uvector dsts(num_edges, handle.get_stream_view()); + rmm::device_uvector srcs(num_edges, handle.get_stream()); + rmm::device_uvector dsts(num_edges, handle.get_stream()); size_t num_edges_generated{0}; while (num_edges_generated < num_edges) { @@ -64,7 +64,7 @@ std::tuple, rmm::device_uvector> generat num_edges_generated; detail::uniform_random_fill( - handle.get_stream_view(), rands.data(), num_edges_to_generate * 2 * scale, 0.0f, 1.0f, seed); + handle.get_stream(), rands.data(), num_edges_to_generate * 2 * scale, 0.0f, 1.0f, seed); seed += num_edges_to_generate * 2 * scale; thrust::transform( diff --git a/cpp/src/generators/generator_tools.cu b/cpp/src/generators/generator_tools.cu index ed09bb9c336..b128dcd0196 100644 --- a/cpp/src/generators/generator_tools.cu +++ b/cpp/src/generators/generator_tools.cu @@ -217,8 +217,8 @@ symmetrize_edgelist_from_triangular( } auto offset = d_src_v.size(); - d_src_v.resize(offset + num_strictly_triangular_edges, handle.get_stream_view()); - d_dst_v.resize(offset + num_strictly_triangular_edges, handle.get_stream_view()); + d_src_v.resize(offset + num_strictly_triangular_edges, handle.get_stream()); + d_dst_v.resize(offset + num_strictly_triangular_edges, handle.get_stream()); thrust::copy(handle.get_thrust_policy(), d_dst_v.begin(), @@ -229,7 +229,7 @@ symmetrize_edgelist_from_triangular( d_src_v.begin() + num_strictly_triangular_edges, d_dst_v.begin() + offset); if (optional_d_weights_v) { - optional_d_weights_v->resize(d_src_v.size(), handle.get_stream_view()); + optional_d_weights_v->resize(d_src_v.size(), handle.get_stream()); thrust::copy(handle.get_thrust_policy(), optional_d_weights_v->begin(), optional_d_weights_v->begin() + num_strictly_triangular_edges, diff --git a/cpp/src/generators/simple_generators.cu b/cpp/src/generators/simple_generators.cu index cd133e1fc6a..94b7bb48579 100644 --- a/cpp/src/generators/simple_generators.cu +++ b/cpp/src/generators/simple_generators.cu @@ -77,7 +77,7 @@ generate_path_graph_edgelist(raft::handle_t const& handle, dst_iterator += num_edges; } - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(d_src_v), std::move(d_dst_v)); } @@ -141,7 +141,7 @@ generate_2d_mesh_graph_edgelist( }); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(d_src_v), std::move(d_dst_v)); } @@ -219,7 +219,7 @@ generate_3d_mesh_graph_edgelist( }); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(d_src_v), std::move(d_dst_v)); } @@ -289,7 +289,7 @@ generate_complete_graph_edgelist( }); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(d_src_v), std::move(d_dst_v)); } diff --git a/cpp/src/layout/barnes_hut.cuh b/cpp/src/layout/barnes_hut.cuh index 61e47b03b5c..f412e7cb402 100644 --- a/cpp/src/layout/barnes_hut.cuh +++ b/cpp/src/layout/barnes_hut.cuh @@ -53,7 +53,7 @@ void barnes_hut(raft::handle_t const& handle, bool verbose = false, internals::GraphBasedDimRedCallback* callback = nullptr) { - rmm::cuda_stream_view stream_view(handle.get_stream_view()); + rmm::cuda_stream_view stream_view(handle.get_stream()); const edge_t e = graph.number_of_edges; const vertex_t n = graph.number_of_vertices; @@ -135,7 +135,7 @@ void barnes_hut(raft::handle_t const& handle, raft::copy(nodes_pos + nnodes + 1, y_start, n, stream_view.value()); } else { uniform_random_fill( - handle.get_stream_view(), nodes_pos, (nnodes + 1) * 2, -100.0f, 100.0f, random_state); + handle.get_stream(), nodes_pos, (nnodes + 1) * 2, -100.0f, 100.0f, random_state); } // Allocate arrays for force computation diff --git a/cpp/src/layout/exact_fa2.cuh b/cpp/src/layout/exact_fa2.cuh index db84594c8b8..965e6ddaaf2 100644 --- a/cpp/src/layout/exact_fa2.cuh +++ b/cpp/src/layout/exact_fa2.cuh @@ -51,7 +51,7 @@ void exact_fa2(raft::handle_t const& handle, bool verbose = false, internals::GraphBasedDimRedCallback* callback = nullptr) { - auto stream_view = handle.get_stream_view(); + auto stream_view = handle.get_stream(); const edge_t e = graph.number_of_edges; const vertex_t n = graph.number_of_vertices; @@ -79,7 +79,7 @@ void exact_fa2(raft::handle_t const& handle, d_swinging = swinging.data(); d_traction = traction.data(); - uniform_random_fill(handle.get_stream_view(), pos, n * 2, -100.0f, 100.0f, uint64_t{0}); + uniform_random_fill(handle.get_stream(), pos, n * 2, -100.0f, 100.0f, uint64_t{0}); if (x_start && y_start) { raft::copy(pos, x_start, n, stream_view.value()); diff --git a/cpp/src/linear_assignment/hungarian.cu b/cpp/src/linear_assignment/hungarian.cu index 7af829da2b3..27e3e230920 100644 --- a/cpp/src/linear_assignment/hungarian.cu +++ b/cpp/src/linear_assignment/hungarian.cu @@ -63,7 +63,7 @@ weight_t hungarian(raft::handle_t const& handle, weight_t epsilon) { if (num_rows == num_cols) { - rmm::device_uvector col_assignments_v(num_rows, handle.get_stream_view()); + rmm::device_uvector col_assignments_v(num_rows, handle.get_stream()); // Create an instance of LinearAssignmentProblem using problem size, number of subproblems raft::lap::LinearAssignmentProblem lpx(handle, num_rows, 1, epsilon); @@ -84,9 +84,9 @@ weight_t hungarian(raft::handle_t const& handle, weight_t{0}, thrust::maximum()); - rmm::device_uvector tmp_cost_v(n * n, handle.get_stream_view()); - rmm::device_uvector tmp_row_assignment_v(n, handle.get_stream_view()); - rmm::device_uvector tmp_col_assignment_v(n, handle.get_stream_view()); + rmm::device_uvector tmp_cost_v(n * n, handle.get_stream()); + rmm::device_uvector tmp_row_assignment_v(n, handle.get_stream()); + rmm::device_uvector tmp_col_assignment_v(n, handle.get_stream()); thrust::transform(handle.get_thrust_policy(), thrust::make_counting_iterator(0), @@ -141,11 +141,10 @@ weight_t hungarian_sparse(raft::handle_t const& handle, vertex_t matrix_dimension = std::max(num_rows, num_cols); - rmm::device_uvector cost_v(matrix_dimension * matrix_dimension, - handle.get_stream_view()); - rmm::device_uvector tasks_v(num_cols, handle.get_stream_view()); - rmm::device_uvector temp_tasks_v(graph.number_of_vertices, handle.get_stream_view()); - rmm::device_uvector temp_workers_v(graph.number_of_vertices, handle.get_stream_view()); + rmm::device_uvector cost_v(matrix_dimension * matrix_dimension, handle.get_stream()); + rmm::device_uvector tasks_v(num_cols, handle.get_stream()); + rmm::device_uvector temp_tasks_v(graph.number_of_vertices, handle.get_stream()); + rmm::device_uvector temp_workers_v(graph.number_of_vertices, handle.get_stream()); weight_t* d_cost = cost_v.data(); vertex_t* d_tasks = tasks_v.data(); @@ -173,7 +172,7 @@ weight_t hungarian_sparse(raft::handle_t const& handle, [] __device__(vertex_t v) { return v >= 0; }); vertex_t size = thrust::distance(d_tasks, temp_end); - tasks_v.resize(size, handle.get_stream_view()); + tasks_v.resize(size, handle.get_stream()); // // Now we'll assign costs into the dense array @@ -223,7 +222,7 @@ weight_t hungarian_sparse(raft::handle_t const& handle, // temp_assignment_v will hold the assignment in the dense // bipartite matrix numbering // - rmm::device_uvector temp_assignment_v(matrix_dimension, handle.get_stream_view()); + rmm::device_uvector temp_assignment_v(matrix_dimension, handle.get_stream()); vertex_t* d_temp_assignment = temp_assignment_v.data(); weight_t min_cost = detail::hungarian( diff --git a/cpp/src/sampling/random_walks.cuh b/cpp/src/sampling/random_walks.cuh index dfcd589e14e..d0f25b7b845 100644 --- a/cpp/src/sampling/random_walks.cuh +++ b/cpp/src/sampling/random_walks.cuh @@ -140,7 +140,7 @@ struct rrandom_gen_t { static void generate_random(raft::handle_t const& handle, real_t* p_d_rnd, size_t sz, seed_t seed) { cugraph::detail::uniform_random_fill( - handle.get_stream_view(), p_d_rnd, sz, real_t{0.0}, real_t{1.0}, seed); + handle.get_stream(), p_d_rnd, sz, real_t{0.0}, real_t{1.0}, seed); } private: @@ -533,7 +533,7 @@ struct random_walker_t { thrust::make_counting_iterator(0), predicate_w); - handle_.get_stream_view().synchronize(); + handle_.sync_stream(); d_coalesced_v.resize(thrust::distance(d_coalesced_v.begin(), new_end_v), handle_.get_stream()); d_coalesced_w.resize(thrust::distance(d_coalesced_w.begin(), new_end_w), handle_.get_stream()); @@ -1295,7 +1295,7 @@ query_rw_sizes_offsets(raft::handle_t const& handle, index_t num_paths, index_t d_weight_sizes.begin(), [] __device__(auto vertex_path_sz) { return vertex_path_sz - 1; }); - handle.get_stream_view().synchronize(); + handle.sync_stream(); thrust::exclusive_scan(handle.get_thrust_policy(), d_weight_sizes.begin(), diff --git a/cpp/src/structure/coarsen_graph_impl.cuh b/cpp/src/structure/coarsen_graph_impl.cuh index 44f87c71085..b0f6c7eca05 100644 --- a/cpp/src/structure/coarsen_graph_impl.cuh +++ b/cpp/src/structure/coarsen_graph_impl.cuh @@ -276,7 +276,7 @@ coarsen_graph( std::vector h_counts(counts.size()); raft::update_host(h_counts.data(), counts.data(), counts.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector h_displacements(h_counts.size(), size_t{0}); std::partial_sum(h_counts.begin(), h_counts.end() - 1, h_displacements.begin() + 1); diff --git a/cpp/src/structure/create_graph_from_edgelist_impl.cuh b/cpp/src/structure/create_graph_from_edgelist_impl.cuh index 849705df4ac..f05f5f957c6 100644 --- a/cpp/src/structure/create_graph_from_edgelist_impl.cuh +++ b/cpp/src/structure/create_graph_from_edgelist_impl.cuh @@ -229,7 +229,7 @@ create_graph_from_edgelist_impl(raft::handle_t const& handle, std::vector h_edge_counts(edge_counts.size()); raft::update_host( h_edge_counts.data(), edge_counts.data(), edge_counts.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector edgelist_edge_counts(col_comm_size, edge_t{0}); auto edgelist_intra_partition_segment_offsets = @@ -347,7 +347,7 @@ create_graph_from_edgelist_impl(raft::handle_t const& handle, num_vertices = input_vertex_list_size; } else { num_vertices = 1 + cugraph::detail::compute_maximum_vertex_id( - handle.get_stream_view(), edgelist_rows, edgelist_cols); + handle.get_stream(), edgelist_rows, edgelist_cols); } } diff --git a/cpp/src/structure/graph_impl.cuh b/cpp/src/structure/graph_impl.cuh index 222b51ed9cf..9cde74ec1c4 100644 --- a/cpp/src/structure/graph_impl.cuh +++ b/cpp/src/structure/graph_impl.cuh @@ -539,7 +539,7 @@ graph_tget_handle_ptr()->get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); - auto default_stream_view = this->get_handle_ptr()->get_stream_view(); + auto default_stream_view = this->get_handle_ptr()->get_stream(); CUGRAPH_EXPECTS(edgelists.size() == static_cast(col_comm_size), "Invalid input argument: erroneous edgelists.size()."); @@ -914,13 +914,13 @@ graph_t( handle, meta.number_of_vertices, edgelist.number_of_edges, meta.properties), - offsets_(rmm::device_uvector(0, handle.get_stream_view())), - indices_(rmm::device_uvector(0, handle.get_stream_view())), + offsets_(rmm::device_uvector(0, handle.get_stream())), + indices_(rmm::device_uvector(0, handle.get_stream())), segment_offsets_(meta.segment_offsets) { // cheap error checks - auto default_stream_view = this->get_handle_ptr()->get_stream_view(); + auto default_stream_view = this->get_handle_ptr()->get_stream(); auto is_weighted = edgelist.p_edge_weights.has_value(); @@ -1341,7 +1341,7 @@ graph_t(0, handle.get_stream_view()), - rmm::device_uvector(0, handle.get_stream_view()), - rmm::device_uvector(0, handle.get_stream_view()), - rmm::device_uvector(0, handle.get_stream_view())); + return std::make_tuple(rmm::device_uvector(0, handle.get_stream()), + rmm::device_uvector(0, handle.get_stream()), + rmm::device_uvector(0, handle.get_stream()), + rmm::device_uvector(0, handle.get_stream())); } else { // 2-1. Phase 1: calculate memory requirements size_t num_aggregate_subgraph_vertices{}; raft::update_host( &num_aggregate_subgraph_vertices, subgraph_offsets + num_subgraphs, 1, handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); rmm::device_uvector subgraph_vertex_output_offsets( num_aggregate_subgraph_vertices + 1, - handle.get_stream_view()); // for each element of subgraph_vertices + handle.get_stream()); // for each element of subgraph_vertices auto matrix_partition = matrix_partition_device_view_t( graph_view.get_matrix_partition_view()); @@ -174,15 +174,15 @@ extract_induced_subgraphs( subgraph_vertex_output_offsets.data() + num_aggregate_subgraph_vertices, 1, handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); // 2-2. Phase 2: find the edges in the induced subgraphs - rmm::device_uvector edge_majors(num_aggregate_edges, handle.get_stream_view()); - rmm::device_uvector edge_minors(num_aggregate_edges, handle.get_stream_view()); + rmm::device_uvector edge_majors(num_aggregate_edges, handle.get_stream()); + rmm::device_uvector edge_minors(num_aggregate_edges, handle.get_stream()); auto edge_weights = graph_view.is_weighted() - ? std::make_optional>( - num_aggregate_edges, handle.get_stream_view()) + ? std::make_optional>(num_aggregate_edges, + handle.get_stream()) : std::nullopt; // fill the edge list buffer (to be returned) for each vetex in the aggregate subgraph vertex @@ -245,7 +245,7 @@ extract_induced_subgraphs( } }); - rmm::device_uvector subgraph_edge_offsets(num_subgraphs + 1, handle.get_stream_view()); + rmm::device_uvector subgraph_edge_offsets(num_subgraphs + 1, handle.get_stream()); thrust::gather(handle.get_thrust_policy(), subgraph_offsets, subgraph_offsets + (num_subgraphs + 1), diff --git a/cpp/src/structure/relabel_impl.cuh b/cpp/src/structure/relabel_impl.cuh index a1fa063ba04..e9d9220de81 100644 --- a/cpp/src/structure/relabel_impl.cuh +++ b/cpp/src/structure/relabel_impl.cuh @@ -61,30 +61,28 @@ void relabel(raft::handle_t const& handle, // find unique old labels (to be relabeled) - rmm::device_uvector unique_old_labels(num_labels, handle.get_stream_view()); + rmm::device_uvector unique_old_labels(num_labels, handle.get_stream()); thrust::copy(handle.get_thrust_policy(), labels, labels + num_labels, unique_old_labels.data()); thrust::sort(handle.get_thrust_policy(), unique_old_labels.begin(), unique_old_labels.end()); unique_old_labels.resize(thrust::distance(unique_old_labels.begin(), thrust::unique(handle.get_thrust_policy(), unique_old_labels.begin(), unique_old_labels.end())), - handle.get_stream_view()); - unique_old_labels.shrink_to_fit(handle.get_stream_view()); + handle.get_stream()); + unique_old_labels.shrink_to_fit(handle.get_stream()); // collect new labels for the unique old labels - rmm::device_uvector new_labels_for_unique_old_labels(0, handle.get_stream_view()); + rmm::device_uvector new_labels_for_unique_old_labels(0, handle.get_stream()); { // shuffle the old_new_label_pairs based on applying the compute_gpu_id_from_vertex_t functor // to the old labels - rmm::device_uvector rx_label_pair_old_labels(0, handle.get_stream_view()); - rmm::device_uvector rx_label_pair_new_labels(0, handle.get_stream_view()); + rmm::device_uvector rx_label_pair_old_labels(0, handle.get_stream()); + rmm::device_uvector rx_label_pair_new_labels(0, handle.get_stream()); { - rmm::device_uvector label_pair_old_labels(num_label_pairs, - handle.get_stream_view()); - rmm::device_uvector label_pair_new_labels(num_label_pairs, - handle.get_stream_view()); + rmm::device_uvector label_pair_old_labels(num_label_pairs, handle.get_stream()); + rmm::device_uvector label_pair_new_labels(num_label_pairs, handle.get_stream()); thrust::copy(handle.get_thrust_policy(), std::get<0>(old_new_label_pairs), std::get<0>(old_new_label_pairs) + num_label_pairs, @@ -102,12 +100,12 @@ void relabel(raft::handle_t const& handle, pair_first, pair_first + num_label_pairs, [key_func] __device__(auto val) { return key_func(thrust::get<0>(val)); }, - handle.get_stream_view()); + handle.get_stream()); } // update intermediate relabel map - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream auto poly_alloc = rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()); @@ -126,24 +124,24 @@ void relabel(raft::handle_t const& handle, thrust::make_tuple(rx_label_pair_old_labels.begin(), rx_label_pair_new_labels.begin())); relabel_map.insert(pair_first, pair_first + rx_label_pair_old_labels.size()); - rx_label_pair_old_labels.resize(0, handle.get_stream_view()); - rx_label_pair_new_labels.resize(0, handle.get_stream_view()); - rx_label_pair_old_labels.shrink_to_fit(handle.get_stream_view()); - rx_label_pair_new_labels.shrink_to_fit(handle.get_stream_view()); + rx_label_pair_old_labels.resize(0, handle.get_stream()); + rx_label_pair_new_labels.resize(0, handle.get_stream()); + rx_label_pair_old_labels.shrink_to_fit(handle.get_stream()); + rx_label_pair_new_labels.shrink_to_fit(handle.get_stream()); // shuffle unique_old_labels, relabel using the intermediate relabel map, and shuffle back { - rmm::device_uvector rx_unique_old_labels(0, handle.get_stream_view()); + rmm::device_uvector rx_unique_old_labels(0, handle.get_stream()); std::vector rx_value_counts{}; std::tie(rx_unique_old_labels, rx_value_counts) = groupby_gpuid_and_shuffle_values( handle.get_comms(), unique_old_labels.begin(), unique_old_labels.end(), [key_func] __device__(auto val) { return key_func(val); }, - handle.get_stream_view()); + handle.get_stream()); - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream if (skip_missing_labels) { thrust::transform(handle.get_thrust_policy(), @@ -164,15 +162,12 @@ void relabel(raft::handle_t const& handle, // corresponding old labels } - std::tie(new_labels_for_unique_old_labels, std::ignore) = - shuffle_values(handle.get_comms(), - rx_unique_old_labels.begin(), - rx_value_counts, - handle.get_stream_view()); + std::tie(new_labels_for_unique_old_labels, std::ignore) = shuffle_values( + handle.get_comms(), rx_unique_old_labels.begin(), rx_value_counts, handle.get_stream()); } } - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream { auto poly_alloc = diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 8d4eb0abf10..d4940e94634 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -203,13 +203,13 @@ compute_renumber_map(raft::handle_t const& handle, minor_labels = std::move(rx_minor_labels); } } - minor_labels.shrink_to_fit(handle.get_stream_view()); + minor_labels.shrink_to_fit(handle.get_stream()); // 3. merge major and minor labels and vertex labels rmm::device_uvector merged_labels(major_labels.size() + minor_labels.size(), - handle.get_stream_view()); - rmm::device_uvector merged_counts(merged_labels.size(), handle.get_stream_view()); + handle.get_stream()); + rmm::device_uvector merged_counts(merged_labels.size(), handle.get_stream()); thrust::merge_by_key(handle.get_thrust_policy(), major_labels.begin(), major_labels.end(), @@ -338,7 +338,7 @@ compute_renumber_map(raft::handle_t const& handle, d_segment_offsets.data(), d_segment_offsets.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(labels), h_segment_offsets, diff --git a/cpp/src/structure/renumber_utils_impl.cuh b/cpp/src/structure/renumber_utils_impl.cuh index 871d71975df..79ebf3d4feb 100644 --- a/cpp/src/structure/renumber_utils_impl.cuh +++ b/cpp/src/structure/renumber_utils_impl.cuh @@ -336,7 +336,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, if (do_expensive_check) { rmm::device_uvector labels(local_int_vertex_last - local_int_vertex_first, - handle.get_stream_view()); + handle.get_stream()); thrust::copy(handle.get_thrust_policy(), renumber_map_labels, renumber_map_labels + labels.size(), @@ -359,8 +359,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, auto& comm = handle.get_comms(); auto const comm_size = comm.get_size(); - rmm::device_uvector sorted_unique_ext_vertices(num_vertices, - handle.get_stream_view()); + rmm::device_uvector sorted_unique_ext_vertices(num_vertices, handle.get_stream()); sorted_unique_ext_vertices.resize( thrust::distance( sorted_unique_ext_vertices.begin(), @@ -369,7 +368,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, vertices + num_vertices, sorted_unique_ext_vertices.begin(), [] __device__(auto v) { return v != invalid_vertex_id::value; })), - handle.get_stream_view()); + handle.get_stream()); thrust::sort(handle.get_thrust_policy(), sorted_unique_ext_vertices.begin(), sorted_unique_ext_vertices.end()); @@ -378,7 +377,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, thrust::unique(handle.get_thrust_policy(), sorted_unique_ext_vertices.begin(), sorted_unique_ext_vertices.end())), - handle.get_stream_view()); + handle.get_stream()); auto int_vertices_for_sorted_unique_ext_vertices = collect_values_for_unique_keys( comm, @@ -388,9 +387,9 @@ void renumber_ext_vertices(raft::handle_t const& handle, sorted_unique_ext_vertices.begin(), sorted_unique_ext_vertices.end(), detail::compute_gpu_id_from_vertex_t{comm_size}, - handle.get_stream_view()); + handle.get_stream()); - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream renumber_map_ptr.reset(); @@ -408,7 +407,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, sorted_unique_ext_vertices.begin(), int_vertices_for_sorted_unique_ext_vertices.begin())); renumber_map_ptr->insert(kv_pair_first, kv_pair_first + sorted_unique_ext_vertices.size()); } else { - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream renumber_map_ptr.reset(); @@ -429,7 +428,7 @@ void renumber_ext_vertices(raft::handle_t const& handle, } if (do_expensive_check) { - rmm::device_uvector contains(num_vertices, handle.get_stream_view()); + rmm::device_uvector contains(num_vertices, handle.get_stream()); renumber_map_ptr->contains(vertices, vertices + num_vertices, contains.begin()); auto vc_pair_first = thrust::make_zip_iterator(thrust::make_tuple(vertices, contains.begin())); CUGRAPH_EXPECTS(thrust::count_if(handle.get_thrust_policy(), @@ -515,8 +514,7 @@ void unrenumber_int_vertices(raft::handle_t const& handle, comm_rank == 0 ? vertex_t{0} : vertex_partition_lasts[comm_rank - 1]; auto local_int_vertex_last = vertex_partition_lasts[comm_rank]; - rmm::device_uvector sorted_unique_int_vertices(num_vertices, - handle.get_stream_view()); + rmm::device_uvector sorted_unique_int_vertices(num_vertices, handle.get_stream()); sorted_unique_int_vertices.resize( thrust::distance( sorted_unique_int_vertices.begin(), @@ -525,7 +523,7 @@ void unrenumber_int_vertices(raft::handle_t const& handle, vertices + num_vertices, sorted_unique_int_vertices.begin(), [] __device__(auto v) { return v != invalid_vertex_id::value; })), - handle.get_stream_view()); + handle.get_stream()); thrust::sort(handle.get_thrust_policy(), sorted_unique_int_vertices.begin(), sorted_unique_int_vertices.end()); @@ -534,16 +532,16 @@ void unrenumber_int_vertices(raft::handle_t const& handle, thrust::unique(handle.get_thrust_policy(), sorted_unique_int_vertices.begin(), sorted_unique_int_vertices.end())), - handle.get_stream_view()); + handle.get_stream()); rmm::device_uvector d_vertex_partition_lasts(vertex_partition_lasts.size(), - handle.get_stream_view()); + handle.get_stream()); raft::update_device(d_vertex_partition_lasts.data(), vertex_partition_lasts.data(), vertex_partition_lasts.size(), handle.get_stream()); rmm::device_uvector d_tx_int_vertex_offsets(d_vertex_partition_lasts.size(), - handle.get_stream_view()); + handle.get_stream()); thrust::lower_bound(handle.get_thrust_policy(), sorted_unique_int_vertices.begin(), sorted_unique_int_vertices.end(), @@ -555,14 +553,14 @@ void unrenumber_int_vertices(raft::handle_t const& handle, d_tx_int_vertex_offsets.data(), d_tx_int_vertex_offsets.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::adjacent_difference( h_tx_int_vertex_counts.begin(), h_tx_int_vertex_counts.end(), h_tx_int_vertex_counts.begin()); - rmm::device_uvector rx_int_vertices(0, handle.get_stream_view()); + rmm::device_uvector rx_int_vertices(0, handle.get_stream()); std::vector rx_int_vertex_counts{}; std::tie(rx_int_vertices, rx_int_vertex_counts) = shuffle_values( - comm, sorted_unique_int_vertices.begin(), h_tx_int_vertex_counts, handle.get_stream_view()); + comm, sorted_unique_int_vertices.begin(), h_tx_int_vertex_counts, handle.get_stream()); auto tx_ext_vertices = std::move(rx_int_vertices); thrust::transform(handle.get_thrust_policy(), @@ -574,11 +572,11 @@ void unrenumber_int_vertices(raft::handle_t const& handle, }); rmm::device_uvector rx_ext_vertices_for_sorted_unique_int_vertices( - 0, handle.get_stream_view()); + 0, handle.get_stream()); std::tie(rx_ext_vertices_for_sorted_unique_int_vertices, std::ignore) = - shuffle_values(comm, tx_ext_vertices.begin(), rx_int_vertex_counts, handle.get_stream_view()); + shuffle_values(comm, tx_ext_vertices.begin(), rx_int_vertex_counts, handle.get_stream()); - handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream + handle.sync_stream(); // cuco::static_map currently does not take stream auto poly_alloc = rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()); auto stream_adapter = rmm::mr::make_stream_allocator_adaptor(poly_alloc, cudaStream_t{nullptr}); diff --git a/cpp/src/utilities/cython.cu b/cpp/src/utilities/cython.cu index 0f5ba693f28..35a6be4edc3 100644 --- a/cpp/src/utilities/cython.cu +++ b/cpp/src/utilities/cython.cu @@ -112,7 +112,7 @@ std::vector compute_edge_counts(raft::handle_t const& handle, std::vector h_edge_counts(num_local_partitions, 0); raft::update_host( h_edge_counts.data(), d_edge_counts.data(), d_edge_counts.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return h_edge_counts; } @@ -821,9 +821,9 @@ std::unique_ptr call_egonet(raft::handle_t const& handle, static_cast(n_subgraphs), std::make_unique(std::get<0>(g).release()), std::make_unique(std::get<1>(g).release()), - std::make_unique( - std::get<2>(g) ? (*std::get<2>(g)).release() - : rmm::device_buffer(size_t{0}, handle.get_stream_view())), + std::make_unique(std::get<2>(g) + ? (*std::get<2>(g)).release() + : rmm::device_buffer(size_t{0}, handle.get_stream())), std::make_unique(std::get<3>(g).release())}; return std::make_unique(std::move(coo_contents)); } else if (graph_container.edgeType == numberTypeEnum::int64Type) { @@ -840,9 +840,9 @@ std::unique_ptr call_egonet(raft::handle_t const& handle, static_cast(n_subgraphs), std::make_unique(std::get<0>(g).release()), std::make_unique(std::get<1>(g).release()), - std::make_unique( - std::get<2>(g) ? (*std::get<2>(g)).release() - : rmm::device_buffer(size_t{0}, handle.get_stream_view())), + std::make_unique(std::get<2>(g) + ? (*std::get<2>(g)).release() + : rmm::device_buffer(size_t{0}, handle.get_stream())), std::make_unique(std::get<3>(g).release())}; return std::make_unique(std::move(coo_contents)); } else { @@ -1258,7 +1258,7 @@ std::unique_ptr> call_shuffle( std::vector h_edge_counts(edge_counts.size()); raft::update_host( h_edge_counts.data(), edge_counts.data(), edge_counts.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); ptr_ret->get_edge_counts().resize(h_edge_counts.size()); for (size_t i = 0; i < h_edge_counts.size(); ++i) { diff --git a/cpp/src/utilities/graph_bcast.cuh b/cpp/src/utilities/graph_bcast.cuh index fccd2e63ee1..8cd8bcfda71 100644 --- a/cpp/src/utilities/graph_bcast.cuh +++ b/cpp/src/utilities/graph_bcast.cuh @@ -83,7 +83,7 @@ graph_t graph_broadcast(raft::handle_t const& handle, graph_t* graph_ptr) CUGRAPH_EXPECTS(total_graph_dev_sz > 0, "Graph size comm failure."); rmm::device_uvector data_buffer(total_graph_dev_sz, - handle.get_stream_view()); + handle.get_stream()); device_bcast(handle.get_comms(), data_buffer.data(), diff --git a/cpp/tests/centrality/katz_centrality_test.cpp b/cpp/tests/centrality/katz_centrality_test.cpp index 84ec1427de5..826fc3cd473 100644 --- a/cpp/tests/centrality/katz_centrality_test.cpp +++ b/cpp/tests/centrality/katz_centrality_test.cpp @@ -138,7 +138,7 @@ class Tests_KatzCentrality auto degrees = graph_view.compute_in_degrees(handle); std::vector h_degrees(degrees.size()); raft::update_host(h_degrees.data(), degrees.data(), degrees.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto max_it = std::max_element(h_degrees.begin(), h_degrees.end()); result_t const alpha = result_t{1.0} / static_cast(*max_it + 1); @@ -202,7 +202,7 @@ class Tests_KatzCentrality handle.get_stream()); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector h_reference_katz_centralities( unrenumbered_graph_view.get_number_of_vertices()); @@ -238,7 +238,7 @@ class Tests_KatzCentrality handle.get_stream()); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto threshold_ratio = 1e-3; auto threshold_magnitude = diff --git a/cpp/tests/centrality/mg_katz_centrality_test.cpp b/cpp/tests/centrality/mg_katz_centrality_test.cpp index 41270d81f84..d78544a1805 100644 --- a/cpp/tests/centrality/mg_katz_centrality_test.cpp +++ b/cpp/tests/centrality/mg_katz_centrality_test.cpp @@ -189,7 +189,7 @@ class Tests_MGKatzCentrality d_sg_katz_centralities.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto threshold_ratio = 1e-3; auto threshold_magnitude = diff --git a/cpp/tests/community/egonet_test.cu b/cpp/tests/community/egonet_test.cu index f2b80bc6cb0..c9aba9f640b 100644 --- a/cpp/tests/community/egonet_test.cu +++ b/cpp/tests/community/egonet_test.cu @@ -69,8 +69,9 @@ class Tests_InducedEgo : public ::testing::TestWithParam { template void run_current_test(InducedEgo_Usecase const& configuration) { - int n_streams = std::min(configuration.ego_sources.size(), static_cast(128)); - raft::handle_t handle(n_streams); + int n_streams = std::min(configuration.ego_sources.size(), static_cast(128)); + auto stream_pool = std::make_shared(n_streams); + raft::handle_t handle(rmm::cuda_stream_per_thread, stream_pool); cugraph::graph_t graph(handle); std::tie(graph, std::ignore) = cugraph::test:: diff --git a/cpp/tests/components/mg_weakly_connected_components_test.cpp b/cpp/tests/components/mg_weakly_connected_components_test.cpp index fccf14b8371..7187477cf7d 100644 --- a/cpp/tests/components/mg_weakly_connected_components_test.cpp +++ b/cpp/tests/components/mg_weakly_connected_components_test.cpp @@ -168,7 +168,7 @@ class Tests_MGWeaklyConnectedComponents d_sg_components.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::unordered_map mg_to_sg_map{}; for (size_t i = 0; i < h_sg_components.size(); ++i) { diff --git a/cpp/tests/components/wcc_graphs.cu b/cpp/tests/components/wcc_graphs.cu index 3429ad3cf21..5144f719c44 100644 --- a/cpp/tests/components/wcc_graphs.cu +++ b/cpp/tests/components/wcc_graphs.cu @@ -46,7 +46,7 @@ LineGraph_Usecase::construct_graph(raft::handle_t const& handle, thrust::sequence(execution_policy, vertices_v.begin(), vertices_v.end(), vertex_t{0}); cugraph::detail::uniform_random_fill( - handle.get_stream_view(), order_v.data(), num_vertices_, double{0.0}, double{1.0}, seed); + handle.get_stream(), order_v.data(), num_vertices_, double{0.0}, double{1.0}, seed); thrust::sort_by_key(execution_policy, order_v.begin(), order_v.end(), vertices_v.begin()); @@ -64,7 +64,7 @@ LineGraph_Usecase::construct_graph(raft::handle_t const& handle, thrust::sequence(execution_policy, vertices_v.begin(), vertices_v.end(), vertex_t{0}); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return cugraph:: create_graph_from_edgelist( diff --git a/cpp/tests/components/weakly_connected_components_test.cpp b/cpp/tests/components/weakly_connected_components_test.cpp index fb870bcd1a4..9022c913ff3 100644 --- a/cpp/tests/components/weakly_connected_components_test.cpp +++ b/cpp/tests/components/weakly_connected_components_test.cpp @@ -163,7 +163,7 @@ class Tests_WeaklyConnectedComponent unrenumbered_graph_view.get_number_of_edges(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector h_reference_components( unrenumbered_graph_view.get_number_of_vertices()); @@ -175,8 +175,7 @@ class Tests_WeaklyConnectedComponent std::vector h_cugraph_components(graph_view.get_number_of_vertices()); if (renumber) { - rmm::device_uvector d_unrenumbered_components(size_t{0}, - handle.get_stream_view()); + rmm::device_uvector d_unrenumbered_components(size_t{0}, handle.get_stream()); std::tie(std::ignore, d_unrenumbered_components) = cugraph::test::sort_by_key(handle, *d_renumber_map_labels, d_components); raft::update_host(h_cugraph_components.data(), @@ -189,7 +188,7 @@ class Tests_WeaklyConnectedComponent d_components.size(), handle.get_stream()); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::unordered_map cuda_to_reference_map{}; for (size_t i = 0; i < h_reference_components.size(); ++i) { diff --git a/cpp/tests/cores/core_number_test.cpp b/cpp/tests/cores/core_number_test.cpp index 647ef83b88d..dd1d2c5c908 100644 --- a/cpp/tests/cores/core_number_test.cpp +++ b/cpp/tests/cores/core_number_test.cpp @@ -284,7 +284,7 @@ class Tests_CoreNumber unrenumbered_graph_view.get_number_of_edges(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto h_reference_core_numbers = core_number_reference(h_offsets.data(), h_indices.data(), @@ -303,14 +303,14 @@ class Tests_CoreNumber d_unrenumbered_core_numbers.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); } else { raft::update_host(h_cugraph_core_numbers.data(), d_core_numbers.data(), d_core_numbers.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); } ASSERT_TRUE(std::equal(h_reference_core_numbers.begin(), diff --git a/cpp/tests/cores/mg_core_number_test.cpp b/cpp/tests/cores/mg_core_number_test.cpp index 78efd735781..675d4deea39 100644 --- a/cpp/tests/cores/mg_core_number_test.cpp +++ b/cpp/tests/cores/mg_core_number_test.cpp @@ -182,7 +182,7 @@ class Tests_MGCoreNumber d_sg_core_numbers.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); ASSERT_TRUE(std::equal(h_mg_aggregate_core_numbers.begin(), h_mg_aggregate_core_numbers.end(), diff --git a/cpp/tests/generators/erdos_renyi_test.cpp b/cpp/tests/generators/erdos_renyi_test.cpp index 3fdf8c1eda3..c668db26f2a 100644 --- a/cpp/tests/generators/erdos_renyi_test.cpp +++ b/cpp/tests/generators/erdos_renyi_test.cpp @@ -60,7 +60,7 @@ void er_test(size_t num_vertices, float p) std::tie(d_src_v, d_dst_v) = cugraph::generate_erdos_renyi_graph_edgelist_gnp(handle, num_vertices, p, 0); - handle.get_stream_view().synchronize(); + handle.sync_stream(); std::vector h_src_v(d_src_v.size()); std::vector h_dst_v(d_dst_v.size()); @@ -68,7 +68,7 @@ void er_test(size_t num_vertices, float p) raft::update_host(h_src_v.data(), d_src_v.data(), d_src_v.size(), handle.get_stream()); raft::update_host(h_dst_v.data(), d_dst_v.data(), d_dst_v.size(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); float expected_edge_count = p * num_vertices * num_vertices; diff --git a/cpp/tests/linear_assignment/hungarian_test.cu b/cpp/tests/linear_assignment/hungarian_test.cu index 81ffec90221..9c1e7083bd3 100644 --- a/cpp/tests/linear_assignment/hungarian_test.cu +++ b/cpp/tests/linear_assignment/hungarian_test.cu @@ -75,11 +75,11 @@ TEST_F(HungarianTest, Bipartite4x4) int32_t num_vertices = 1 + std::max(*std::max_element(src_data, src_data + length), *std::max_element(dst_data, dst_data + length)); - rmm::device_uvector src_v(length, handle.get_stream_view()); - rmm::device_uvector dst_v(length, handle.get_stream_view()); - rmm::device_uvector cost_v(length, handle.get_stream_view()); - rmm::device_uvector workers_v(length_workers, handle.get_stream_view()); - rmm::device_uvector assignment_v(length_workers, handle.get_stream_view()); + rmm::device_uvector src_v(length, handle.get_stream()); + rmm::device_uvector dst_v(length, handle.get_stream()); + rmm::device_uvector cost_v(length, handle.get_stream()); + rmm::device_uvector workers_v(length_workers, handle.get_stream()); + rmm::device_uvector assignment_v(length_workers, handle.get_stream()); raft::update_device(src_v.begin(), src_data, length, handle.get_stream()); raft::update_device(dst_v.begin(), dst_data, length, handle.get_stream()); @@ -117,11 +117,11 @@ TEST_F(HungarianTest, Bipartite5x5) int32_t num_vertices = 1 + std::max(*std::max_element(src_data, src_data + length), *std::max_element(dst_data, dst_data + length)); - rmm::device_uvector src_v(length, handle.get_stream_view()); - rmm::device_uvector dst_v(length, handle.get_stream_view()); - rmm::device_uvector cost_v(length, handle.get_stream_view()); - rmm::device_uvector workers_v(length_workers, handle.get_stream_view()); - rmm::device_uvector assignment_v(length_workers, handle.get_stream_view()); + rmm::device_uvector src_v(length, handle.get_stream()); + rmm::device_uvector dst_v(length, handle.get_stream()); + rmm::device_uvector cost_v(length, handle.get_stream()); + rmm::device_uvector workers_v(length_workers, handle.get_stream()); + rmm::device_uvector assignment_v(length_workers, handle.get_stream()); raft::update_device(src_v.begin(), src_data, length, handle.get_stream()); raft::update_device(dst_v.begin(), dst_data, length, handle.get_stream()); @@ -163,11 +163,11 @@ TEST_F(HungarianTest, Bipartite4x4_multiple_answers) int32_t num_vertices = 1 + std::max(*std::max_element(src_data, src_data + length), *std::max_element(dst_data, dst_data + length)); - rmm::device_uvector src_v(length, handle.get_stream_view()); - rmm::device_uvector dst_v(length, handle.get_stream_view()); - rmm::device_uvector cost_v(length, handle.get_stream_view()); - rmm::device_uvector workers_v(length_workers, handle.get_stream_view()); - rmm::device_uvector assignment_v(length_workers, handle.get_stream_view()); + rmm::device_uvector src_v(length, handle.get_stream()); + rmm::device_uvector dst_v(length, handle.get_stream()); + rmm::device_uvector cost_v(length, handle.get_stream()); + rmm::device_uvector workers_v(length_workers, handle.get_stream()); + rmm::device_uvector assignment_v(length_workers, handle.get_stream()); raft::update_device(src_v.begin(), src_data, length, handle.get_stream()); raft::update_device(dst_v.begin(), dst_data, length, handle.get_stream()); @@ -203,8 +203,8 @@ TEST_F(HungarianTest, May29InfLoop) std::vector expected({3, 2, 1, 0}); std::vector assignment({0, 0, 0, 0}); - rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream_view()); - rmm::device_uvector assignment_v(num_rows, handle.get_stream_view()); + rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream()); + rmm::device_uvector assignment_v(num_rows, handle.get_stream()); raft::update_device(cost_v.begin(), cost, num_rows * num_cols, handle.get_stream()); @@ -232,8 +232,8 @@ TEST_F(HungarianTest, Dense4x6) std::vector expected({3, 2, 1, 0}); std::vector assignment({0, 0, 0, 0}); - rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream_view()); - rmm::device_uvector assignment_v(num_rows, handle.get_stream_view()); + rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream()); + rmm::device_uvector assignment_v(num_rows, handle.get_stream()); raft::update_device(cost_v.begin(), cost, num_rows * num_cols, handle.get_stream()); @@ -262,8 +262,8 @@ TEST_F(HungarianTest, Dense6x4) std::vector expected2({3, 2, 5, 1, 4, 0}); std::vector assignment({0, 0, 0, 0, 0, 0}); - rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream_view()); - rmm::device_uvector assignment_v(num_rows, handle.get_stream_view()); + rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream()); + rmm::device_uvector assignment_v(num_rows, handle.get_stream()); raft::update_device(cost_v.begin(), cost, num_rows * num_cols, handle.get_stream()); @@ -320,8 +320,8 @@ TEST_F(HungarianTest, PythonTestFailure) std::vector expected({0, 2, 1, 4, 3}); std::vector assignment({0, 0, 0, 0, 0}); - rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream_view()); - rmm::device_uvector assignment_v(num_rows, handle.get_stream_view()); + rmm::device_uvector cost_v(num_rows * num_cols, handle.get_stream()); + rmm::device_uvector assignment_v(num_rows, handle.get_stream()); raft::update_device(cost_v.begin(), cost, num_rows * num_cols, handle.get_stream()); @@ -346,9 +346,9 @@ void random_test(int32_t num_rows, int32_t num_cols, int32_t upper_bound, int re HighResTimer hr_timer; - rmm::device_uvector data_v(num_rows * num_cols, handle.get_stream_view()); - rmm::device_uvector state_vals_v(num_threads, handle.get_stream_view()); - rmm::device_uvector assignment_v(num_rows, handle.get_stream_view()); + rmm::device_uvector data_v(num_rows * num_cols, handle.get_stream()); + rmm::device_uvector state_vals_v(num_threads, handle.get_stream()); + rmm::device_uvector assignment_v(num_rows, handle.get_stream()); std::vector validate(num_cols); diff --git a/cpp/tests/link_analysis/hits_test.cpp b/cpp/tests/link_analysis/hits_test.cpp index edd8c40286d..b5c2159fdb4 100644 --- a/cpp/tests/link_analysis/hits_test.cpp +++ b/cpp/tests/link_analysis/hits_test.cpp @@ -248,7 +248,7 @@ class Tests_Hits : public ::testing::TestWithParam(graph_view.get_number_of_vertices())) * diff --git a/cpp/tests/link_analysis/mg_hits_test.cpp b/cpp/tests/link_analysis/mg_hits_test.cpp index 4875641f7bf..7499a75f9ce 100644 --- a/cpp/tests/link_analysis/mg_hits_test.cpp +++ b/cpp/tests/link_analysis/mg_hits_test.cpp @@ -213,7 +213,7 @@ class Tests_MGHits : public ::testing::TestWithParam h_reference_pageranks(unrenumbered_graph_view.get_number_of_vertices()); @@ -381,7 +381,7 @@ class Tests_PageRank h_cugraph_pageranks.data(), d_pageranks.data(), d_pageranks.size(), handle.get_stream()); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto threshold_ratio = 1e-3; auto threshold_magnitude = diff --git a/cpp/tests/structure/count_self_loops_and_multi_edges_test.cpp b/cpp/tests/structure/count_self_loops_and_multi_edges_test.cpp index 3fa211fbda7..60aac430a06 100644 --- a/cpp/tests/structure/count_self_loops_and_multi_edges_test.cpp +++ b/cpp/tests/structure/count_self_loops_and_multi_edges_test.cpp @@ -151,7 +151,7 @@ class Tests_CountSelfLoopsAndMultiEdges unrenumbered_graph_view.get_number_of_edges(), handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); auto self_loop_and_multi_edge_counts = count_self_loops_and_multi_edges_reference( h_offsets.data(), h_indices.data(), unrenumbered_graph_view.get_number_of_vertices()); diff --git a/cpp/tests/structure/renumbering_test.cpp b/cpp/tests/structure/renumbering_test.cpp index 9ca2472670b..a42a99f83e2 100644 --- a/cpp/tests/structure/renumbering_test.cpp +++ b/cpp/tests/structure/renumbering_test.cpp @@ -65,9 +65,9 @@ class Tests_Renumbering std::vector h_final_src_v{}; std::vector h_final_dst_v{}; - rmm::device_uvector src_v(0, handle.get_stream_view()); - rmm::device_uvector dst_v(0, handle.get_stream_view()); - rmm::device_uvector renumber_map_labels_v(0, handle.get_stream_view()); + rmm::device_uvector src_v(0, handle.get_stream()); + rmm::device_uvector dst_v(0, handle.get_stream()); + rmm::device_uvector renumber_map_labels_v(0, handle.get_stream()); vertex_t number_of_vertices{}; std::tie(src_v, dst_v, std::ignore, std::ignore, number_of_vertices, std::ignore) = diff --git a/cpp/tests/structure/streams.cu b/cpp/tests/structure/streams.cu index c89ffe1e532..3390daa6575 100644 --- a/cpp/tests/structure/streams.cu +++ b/cpp/tests/structure/streams.cu @@ -25,16 +25,17 @@ struct StreamTest : public ::testing::Test { }; TEST_F(StreamTest, basic_test) { - int n_streams = 4; - raft::handle_t handle(n_streams); + int n_streams = 4; + auto stream_pool = std::make_shared(n_streams); + raft::handle_t handle(rmm::cuda_stream_per_thread, stream_pool); const size_t intput_size = 4096; #pragma omp parallel for for (int i = 0; i < n_streams; i++) { - rmm::device_uvector u(intput_size, handle.get_internal_stream_view(i)), - v(intput_size, handle.get_internal_stream_view(i)); - thrust::transform(rmm::exec_policy(handle.get_internal_stream_view(i)), + rmm::device_uvector u(intput_size, handle.get_next_usable_stream(i)), + v(intput_size, handle.get_next_usable_stream(i)); + thrust::transform(rmm::exec_policy(handle.get_next_usable_stream(i)), u.begin(), u.end(), v.begin(), diff --git a/cpp/tests/traversal/bfs_test.cpp b/cpp/tests/traversal/bfs_test.cpp index bb2ab014236..7914cc61548 100644 --- a/cpp/tests/traversal/bfs_test.cpp +++ b/cpp/tests/traversal/bfs_test.cpp @@ -172,7 +172,7 @@ class Tests_BFS : public ::testing::TestWithParam(bfs_usecase.source); if (renumber) { @@ -182,7 +182,7 @@ class Tests_BFS : public ::testing::TestWithParam const d_sg_source(unrenumbered_source, handle.get_stream()); cugraph::bfs(handle, @@ -232,7 +232,7 @@ class Tests_MGBFS : public ::testing::TestWithParam { template void run_current_test(MsBfs_Usecase const& configuration) { - using weight_t = float; - raft::handle_t handle(16); + using weight_t = float; + auto stream_pool = std::make_shared(16); + raft::handle_t handle(rmm::cuda_stream_per_thread, stream_pool); auto edgelists = cugraph::generate_rmat_edgelists(handle, @@ -172,9 +173,9 @@ class Tests_MsBfs : public ::testing::TestWithParam { d_predecessors_ref.reserve(h_sources.size()); for (size_t i = 0; i < h_sources.size(); i++) { rmm::device_uvector tmp_distances(graph_view.get_number_of_vertices(), - handle.get_internal_stream_view(i)); + handle.get_next_usable_stream(i)); rmm::device_uvector tmp_predecessors(graph_view.get_number_of_vertices(), - handle.get_internal_stream_view(i)); + handle.get_next_usable_stream(i)); d_distances_ref.push_back(std::move(tmp_distances)); d_predecessors_ref.push_back(std::move(tmp_predecessors)); diff --git a/cpp/tests/traversal/sssp_test.cpp b/cpp/tests/traversal/sssp_test.cpp index 0242ebc1a3a..f2bcbd4161a 100644 --- a/cpp/tests/traversal/sssp_test.cpp +++ b/cpp/tests/traversal/sssp_test.cpp @@ -176,7 +176,7 @@ class Tests_SSSP : public ::testing::TestWithParam(sssp_usecase.source); if (renumber) { @@ -186,7 +186,7 @@ class Tests_SSSP : public ::testing::TestWithParamdata() : tmp_weights_v->data(), i == 0 ? weights_v->size() : tmp_weights_v->size(), weight_t{0.0}, @@ -265,7 +265,7 @@ class Rmat_Usecase : public detail::TranslateGraph_Usecase { auto start_offset = vertices_v.size(); vertices_v.resize(start_offset + (partition_vertex_lasts[i] - partition_vertex_firsts[i]), handle.get_stream()); - cugraph::detail::sequence_fill(handle.get_stream_view(), + cugraph::detail::sequence_fill(handle.get_stream(), vertices_v.begin() + start_offset, vertices_v.size() - start_offset, partition_vertex_firsts[i]); @@ -341,7 +341,7 @@ class PathGraph_Usecase { rmm::device_uvector d_vertices(num_vertices_, handle.get_stream()); cugraph::detail::sequence_fill( handle.get_stream(), d_vertices.data(), num_vertices_, vertex_t{0}); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return std::make_tuple(std::move(src_v), std::move(dst_v), diff --git a/cpp/tests/utilities/test_utilities.hpp b/cpp/tests/utilities/test_utilities.hpp index 4729d3d9f13..be21c7b2b0d 100644 --- a/cpp/tests/utilities/test_utilities.hpp +++ b/cpp/tests/utilities/test_utilities.hpp @@ -263,7 +263,7 @@ std::pair compare_graphs(raft::handle_t const& handle, handle.get_stream()); } - handle.get_stream_view().synchronize(); + handle.sync_stream(); if (lv_ro != rv_ro) return std::make_pair(false, std::string("offsets")); @@ -352,7 +352,7 @@ std::vector to_host(raft::handle_t const& handle, T const* data, L size) { std::vector h_data(size); raft::update_host(h_data.data(), data, size, handle.get_stream()); - handle.get_stream_view().synchronize(); + handle.sync_stream(); return h_data; } diff --git a/python/cugraph/cugraph/community/egonet_wrapper.pyx b/python/cugraph/cugraph/community/egonet_wrapper.pyx index f5d1d51ed24..0d8eb312275 100644 --- a/python/cugraph/cugraph/community/egonet_wrapper.pyx +++ b/python/cugraph/cugraph/community/egonet_wrapper.pyx @@ -28,6 +28,8 @@ from cugraph.structure.graph_primtypes cimport move_device_buffer_to_column from cugraph.raft.common.handle cimport handle_t from cugraph.structure import graph_primtypes_wrapper from cugraph.community.egonet cimport call_egonet +from cugraph.raft.common.handle cimport handle_t +from cugraph.raft.common.handle import Handle def egonet(input_graph, vertices, radius=1): @@ -73,8 +75,8 @@ def egonet(input_graph, vertices, radius=1): if n_subgraphs > 1 : n_streams = min(n_subgraphs, 32) cdef unique_ptr[handle_t] handle_ptr - handle_ptr.reset(new handle_t(n_streams)) - handle_ = handle_ptr.get(); + handle = Handle(n_streams=n_streams) + cdef handle_t* handle_ = handle.getHandle() cdef graph_container_t graph_container populate_graph_container(graph_container,