From 886c724a5e94f8edcca9e5aaa71f1253bf2b9fce Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Mon, 18 Apr 2022 19:51:38 -0700 Subject: [PATCH] Graph primitives API updates (#2220) Partially address #2003. 1. Renumber VertexFrontier & SortedUniqueKeyBucket to vertex_frontier_t & sorted_unique_key_bucket_t to be consistent with the naming schemes for graph_t & graph_view_t. 2. vertex_frontier_t to take number of buckets as an input parameter (instead of non-type template parameter) and rename `get_bucket` to `bucket`. 3. Use `constexpr size_t` instead of `enum calss` for bucket indices to avoid unnecessary type casting. 4. Update `fill()` of `edge_partition_src|dst_property_t` to take `handle` instead of `stream` to be consistent with other member functions (e.g. `clear()`) 5. Remove `..._v` primitives that working on a subset of local vertices. 6. Update `v_op` to take vertex ID (to be consistent with `e_op` which takes source & destination IDs). 7. Other miscellaneous clean-ups. Authors: - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Kumar Aatish (https://github.com/kaatish) URL: https://github.com/rapidsai/cugraph/pull/2220 --- .../copy_v_transform_reduce_in_out_nbr.cuh | 2 +- cpp/include/cugraph/prims/count_if_v.cuh | 71 +++++++--------- .../prims/edge_partition_src_dst_property.cuh | 22 +++-- .../cugraph/prims/property_op_utils.cuh | 4 +- cpp/include/cugraph/prims/reduce_v.cuh | 40 --------- .../cugraph/prims/transform_reduce_v.cuh | 85 +++++++------------ .../update_frontier_v_push_if_out_nbr.cuh | 48 +++++------ cpp/include/cugraph/prims/vertex_frontier.cuh | 33 ++++--- cpp/src/centrality/katz_centrality_impl.cuh | 10 ++- cpp/src/community/louvain.cuh | 1 - .../weakly_connected_components_impl.cuh | 82 +++++++----------- cpp/src/cores/core_number_impl.cuh | 66 +++++++------- cpp/src/link_analysis/hits_impl.cuh | 13 +-- cpp/src/link_analysis/pagerank_impl.cuh | 61 +++++++------ cpp/src/traversal/bfs_impl.cuh | 77 ++++++++--------- cpp/src/traversal/sssp_impl.cuh | 71 +++++++--------- cpp/tests/prims/mg_count_if_v.cu | 8 +- cpp/tests/prims/mg_reduce_v.cu | 7 +- cpp/tests/prims/mg_transform_reduce_v.cu | 23 +++-- .../mg_update_frontier_v_push_if_out_nbr.cu | 47 +++++----- 20 files changed, 331 insertions(+), 440 deletions(-) diff --git a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh index 0f944a178e9..04fc797517a 100644 --- a/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/cugraph/prims/copy_v_transform_reduce_in_out_nbr.cuh @@ -503,7 +503,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto execution_policy = handle.get_thrust_policy(); if constexpr (GraphViewType::is_multi_gpu) { - minor_tmp_buffer.fill(minor_init, handle.get_stream()); + minor_tmp_buffer.fill(handle, minor_init); } else { thrust::fill(execution_policy, vertex_value_output_first, diff --git a/cpp/include/cugraph/prims/count_if_v.cuh b/cpp/include/cugraph/prims/count_if_v.cuh index 76b889bc365..215fe125a87 100644 --- a/cpp/include/cugraph/prims/count_if_v.cuh +++ b/cpp/include/cugraph/prims/count_if_v.cuh @@ -27,6 +27,24 @@ namespace cugraph { +namespace detail { + +template +struct count_if_call_v_op_t { + vertex_t local_vertex_partition_range_first{}; + VertexValueInputIterator vertex_value_input_first{}; + VertexOp v_op{}; + + __device__ bool operator()(vertex_t i) + { + return v_op(local_vertex_partition_range_first + i, *(vertex_value_input_first + i)) + ? vertex_t{1} + : vertex_t{0}; + } +}; + +} // namespace detail + /** * @brief Count the number of vertices that satisfies the given predicate. * @@ -42,8 +60,8 @@ namespace cugraph { * @param vertex_value_input_first Iterator pointing to the vertex properties for the first * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). - * @param v_op Unary operator takes *(@p vertex_value_input_first + i) (where i is [0, @p - * graph_view.local_vertex_partition_range_size())) and returns true if this vertex should be + * @param v_op Binary operator takes vertex ID and *(@p vertex_value_input_first + i) (where i is + * [0, @p graph_view.local_vertex_partition_range_size())) and returns true if this vertex should be * included in the returned count. * @return GraphViewType::vertex_type Number of times @p v_op returned true. */ @@ -53,47 +71,16 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle, VertexValueInputIterator vertex_value_input_first, VertexOp v_op) { - auto count = - thrust::count_if(handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.local_vertex_partition_range_size(), - v_op); - if (GraphViewType::is_multi_gpu) { - count = - host_scalar_allreduce(handle.get_comms(), count, raft::comms::op_t::SUM, handle.get_stream()); - } - return count; -} + using vertex_t = typename GraphViewType::vertex_type; -/** - * @brief Count the number of vertices that satisfies the given predicate. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::count_if() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). @p input_last - @p input_first (or the sum of @p - * input_last - @p input_first values in multi-GPU) should not overflow GraphViewType::vertex_type. - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam VertexOp VertexOp Type of the unary predicate operator. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be passed to - * @p v_op. - * @param input_last Iterator pointing to the end (exclusive) of the values to be passed to @p v_op. - * @param v_op Unary operator takes *(@p input_first + i) (where i is [0, @p input_last - @p - * input_first)) and returns true if this vertex should be included in the returned count. - * @return GraphViewType::vertex_type Number of times @p v_op returned true. - */ -template -typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - VertexOp v_op) -{ - auto count = thrust::count_if(handle.get_thrust_policy(), input_first, input_last, v_op); + auto it = thrust::make_transform_iterator( + thrust::make_counting_iterator(vertex_t{0}), + detail::count_if_call_v_op_t{ + graph_view.local_vertex_partition_range_first(), vertex_value_input_first, v_op}); + auto count = thrust::reduce(handle.get_thrust_policy(), + it, + it + graph_view.local_vertex_partition_range_size(), + vertex_t{0}); if (GraphViewType::is_multi_gpu) { count = host_scalar_allreduce(handle.get_comms(), count, raft::comms::op_t::SUM, handle.get_stream()); diff --git a/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh b/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh index b01ead105ae..25c28eca1e7 100644 --- a/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh +++ b/cpp/include/cugraph/prims/edge_partition_src_dst_property.cuh @@ -254,10 +254,12 @@ class edge_partition_major_property_t { edge_partition_major_value_start_offsets_ = std::nullopt; } - void fill(T value, rmm::cuda_stream_view stream) + void fill(raft::handle_t const& handle, T value) { - thrust::fill( - rmm::exec_policy(stream), value_data(), value_data() + size_dataframe_buffer(buffer_), value); + thrust::fill(handle.get_thrust_policy(), + value_data(), + value_data() + size_dataframe_buffer(buffer_), + value); } auto key_first() { return key_first_; } @@ -267,6 +269,7 @@ class edge_partition_major_property_t { (*edge_partition_key_offsets_).back()) : std::nullopt; } + auto value_data() { return get_dataframe_buffer_begin(buffer_); } auto device_view() const @@ -351,14 +354,17 @@ class edge_partition_minor_property_t { shrink_to_fit_dataframe_buffer(buffer_, handle.get_stream()); } - void fill(T value, rmm::cuda_stream_view stream) + void fill(raft::handle_t const& handle, T value) { - thrust::fill( - rmm::exec_policy(stream), value_data(), value_data() + size_dataframe_buffer(buffer_), value); + thrust::fill(handle.get_thrust_policy(), + value_data(), + value_data() + size_dataframe_buffer(buffer_), + value); } auto key_first() { return key_first_; } auto key_last() { return key_last_; } + auto value_data() { return get_dataframe_buffer_begin(buffer_); } auto device_view() const @@ -480,7 +486,7 @@ class edge_partition_src_property_t { void clear(raft::handle_t const& handle) { property_.clear(handle); } - void fill(T value, rmm::cuda_stream_view stream) { property_.fill(value, stream); } + void fill(raft::handle_t const& handle, T value) { property_.fill(handle, value); } auto key_first() { return property_.key_first(); } auto key_last() { return property_.key_last(); } @@ -561,7 +567,7 @@ class edge_partition_dst_property_t { void clear(raft::handle_t const& handle) { property_.clear(handle); } - void fill(T value, rmm::cuda_stream_view stream) { property_.fill(value, stream); } + void fill(raft::handle_t const& handle, T value) { property_.fill(handle, value); } auto key_first() { return property_.key_first(); } auto key_last() { return property_.key_last(); } diff --git a/cpp/include/cugraph/prims/property_op_utils.cuh b/cpp/include/cugraph/prims/property_op_utils.cuh index c50a1fde93f..fc48df64c3c 100644 --- a/cpp/include/cugraph/prims/property_op_utils.cuh +++ b/cpp/include/cugraph/prims/property_op_utils.cuh @@ -191,7 +191,7 @@ struct property_op, Op> private: template - __host__ __device__ constexpr auto sum_impl(T& t1, T& t2, std::index_sequence) + __host__ __device__ constexpr auto binary_op_impl(T& t1, T& t2, std::index_sequence) { return thrust::make_tuple((Op::type>()( thrust::get(t1), thrust::get(t2)))...); @@ -200,7 +200,7 @@ struct property_op, Op> public: __host__ __device__ constexpr auto operator()(const Type& t1, const Type& t2) { - return sum_impl(t1, t2, std::make_index_sequence::value>()); + return binary_op_impl(t1, t2, std::make_index_sequence::value>()); } }; diff --git a/cpp/include/cugraph/prims/reduce_v.cuh b/cpp/include/cugraph/prims/reduce_v.cuh index b2981c2693f..fa4e59b7a65 100644 --- a/cpp/include/cugraph/prims/reduce_v.cuh +++ b/cpp/include/cugraph/prims/reduce_v.cuh @@ -68,44 +68,4 @@ T reduce_v(raft::handle_t const& handle, return ret; } -/** - * @brief Reduce the vertex properties. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::reduce() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam T Type of the initial value. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be reduced. - * @param input_last Iterator pointing to the end (exclusive) of the values to be reduced. - * @param init Initial value to be added to the reduced input vertex properties. - * @return T Reduction of the input vertex properties. - */ -template -T reduce_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - T init = T{}, - raft::comms::op_t op = raft::comms::op_t::SUM) -{ - auto ret = op_dispatch(op, [&handle, &graph_view, input_first, input_last, init](auto op) { - return thrust::reduce( - handle.get_thrust_policy(), - input_first, - input_last, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, - op); - }); - if constexpr (GraphViewType::is_multi_gpu) { - ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); - } - return ret; -} - } // namespace cugraph diff --git a/cpp/include/cugraph/prims/transform_reduce_v.cuh b/cpp/include/cugraph/prims/transform_reduce_v.cuh index 20646c9963e..fedc93d783f 100644 --- a/cpp/include/cugraph/prims/transform_reduce_v.cuh +++ b/cpp/include/cugraph/prims/transform_reduce_v.cuh @@ -27,6 +27,22 @@ namespace cugraph { +namespace detail { + +template +struct transform_reduce_call_v_op_t { + vertex_t local_vertex_partition_range_first{}; + VertexValueInputIterator vertex_value_input_first{}; + VertexOp v_op{}; + + __device__ T operator()(vertex_t i) + { + return v_op(local_vertex_partition_range_first + i, *(vertex_value_input_first + i)); + } +}; + +} // namespace detail + /** * @brief Apply an operator to the vertex properties and reduce. * @@ -43,8 +59,9 @@ namespace cugraph { * @param vertex_value_input_first Iterator pointing to the vertex properties for the first * (inclusive) vertex (assigned to this process in multi-GPU). `vertex_value_input_last` (exclusive) * is deduced as @p vertex_value_input_first + @p graph_view.local_vertex_partition_range_size(). - * @param v_op Unary operator takes *(@p vertex_value_input_first + i) (where i is [0, @p - * graph_view.local_vertex_partition_range_size())) and returns a transformed value to be reduced. + * @param v_op Binary operator takes vertex ID and *(@p vertex_value_input_first + i) (where i is + * [0, @p graph_view.local_vertex_partition_range_size())) and returns a transformed value to be + * reduced. * @param init Initial value to be added to the transform-reduced input vertex properties. * @return T Reduction of the @p v_op outputs. */ @@ -56,61 +73,19 @@ T transform_reduce_v(raft::handle_t const& handle, T init, raft::comms::op_t op = raft::comms::op_t::SUM) { - auto id = identity_element(op); - auto ret = - op_dispatch(op, [&handle, &graph_view, vertex_value_input_first, v_op, id, init](auto op) { - return thrust::transform_reduce( - handle.get_thrust_policy(), - vertex_value_input_first, - vertex_value_input_first + graph_view.local_vertex_partition_range_size(), - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? id : init, - op); - }); - if (GraphViewType::is_multi_gpu) { - ret = host_scalar_allreduce(handle.get_comms(), ret, op, handle.get_stream()); - } - return ret; -} + using vertex_t = typename GraphViewType::vertex_type; -/** - * @brief Apply an operator to the vertex properties and reduce. - * - * This version (conceptually) iterates over only a subset of the graph vertices. This function - * actually works as thrust::transform_reduce() on [@p input_first, @p input_last) (followed by - * inter-process reduction in multi-GPU). - * - * @tparam GraphViewType Type of the passed non-owning graph object. - * @tparam InputIterator Type of the iterator for input values. - * @tparam VertexOp - * @tparam T Type of the initial value. - * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and - * handles to various CUDA libraries) to run graph algorithms. - * @param graph_view Non-owning graph object. - * @param input_first Iterator pointing to the beginning (inclusive) of the values to be passed to - * @p v_op. - * @param input_last Iterator pointing to the end (exclusive) of the values to be passed to @p v_op. - * @param v_op Unary operator takes *(@p input_first + i) (where i is [0, @p input_last - @p - * input_first)) and returns a transformed value to be reduced. - * @param init Initial value to be added to the transform-reduced input vertex properties. - * @return T Reduction of the @p v_op outputs. - */ -template -T transform_reduce_v(raft::handle_t const& handle, - GraphViewType const& graph_view, - InputIterator input_first, - InputIterator input_last, - VertexOp v_op, - T init = T{}, - raft::comms::op_t op = raft::comms::op_t::SUM) -{ - auto ret = op_dispatch(op, [&handle, input_first, input_last, v_op, init](auto op) { - return thrust::transform_reduce( + auto id = identity_element(op); + auto it = thrust::make_transform_iterator( + thrust::make_counting_iterator(vertex_t{0}), + detail::transform_reduce_call_v_op_t{ + graph_view.local_vertex_partition_range_first(), vertex_value_input_first, v_op}); + auto ret = op_dispatch(op, [&handle, &graph_view, it, id, init](auto op) { + return thrust::reduce( handle.get_thrust_policy(), - input_first, - input_last, - v_op, - ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? T{} : init, + it, + it + graph_view.local_vertex_partition_range_size(), + ((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() != 0)) ? id : init, op); }); if (GraphViewType::is_multi_gpu) { diff --git a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh index fe2d632a342..027c2763e0e 100644 --- a/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/cugraph/prims/update_frontier_v_push_if_out_nbr.cuh @@ -131,7 +131,7 @@ template -struct call_v_op_t { +struct update_frontier_call_v_op_t { VertexValueInputIterator vertex_value_input_first{}; VertexValueOutputIterator vertex_value_output_first{}; VertexOp v_op{}; @@ -835,7 +835,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( edge_t ret{0}; - auto const& cur_frontier_bucket = frontier.get_bucket(cur_frontier_bucket_idx); + auto const& cur_frontier_bucket = frontier.bucket(cur_frontier_bucket_idx); vertex_t const* local_frontier_vertex_first{nullptr}; vertex_t const* local_frontier_vertex_last{nullptr}; if constexpr (std::is_same_v) { @@ -952,11 +952,11 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( * @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and * handles to various CUDA libraries) to run graph algorithms. * @param graph_view Non-owning graph object. - * @param frontier VertexFrontier class object for vertex frontier managements. This object includes - * multiple bucket objects. - * @param cur_frontier_bucket_idx Index of the VertexFrontier bucket holding vertices for the + * @param frontier VertexFrontierType class object for vertex frontier managements. This object + * includes multiple bucket objects. + * @param cur_frontier_bucket_idx Index of the vertex frontier bucket holding vertices for the * current iteration. - * @param next_frontier_bucket_indices Indices of the VertexFrontier buckets to store new frontier + * @param next_frontier_bucket_indices Indices of the vertex frontier buckets to store new frontier * vertices for the next iteration. * @param edge_partition_src_value_input Device-copyable wrapper used to access source input * property values (for the edge sources assigned to this process in multi-GPU). Use either @@ -1031,8 +1031,8 @@ void update_frontier_v_push_if_out_nbr( using key_t = typename VertexFrontierType::key_type; using payload_t = typename ReduceOp::type; - auto frontier_key_first = frontier.get_bucket(cur_frontier_bucket_idx).begin(); - auto frontier_key_last = frontier.get_bucket(cur_frontier_bucket_idx).end(); + auto frontier_key_first = frontier.bucket(cur_frontier_bucket_idx).begin(); + auto frontier_key_last = frontier.bucket(cur_frontier_bucket_idx).end(); // 1. fill the buffer @@ -1334,7 +1334,7 @@ void update_frontier_v_push_if_out_nbr( // 3. update vertex properties and frontier if (num_buffer_elements > 0) { - static_assert(VertexFrontierType::kNumBuckets <= std::numeric_limits::max()); + assert(frontier.num_buckets() <= std::numeric_limits::max()); rmm::device_uvector bucket_indices(num_buffer_elements, handle.get_stream()); auto vertex_partition = vertex_partition_device_view_t( @@ -1376,21 +1376,21 @@ void update_frontier_v_push_if_out_nbr( resize_dataframe_buffer(payload_buffer, size_t{0}, handle.get_stream()); shrink_to_fit_dataframe_buffer(payload_buffer, handle.get_stream()); } else { - thrust::transform( - handle.get_thrust_policy(), - get_dataframe_buffer_begin(key_buffer), - get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, - bucket_indices.begin(), - detail::call_v_op_t{vertex_value_input_first, - vertex_value_output_first, - v_op, - vertex_partition, - VertexFrontierType::kInvalidBucketIdx}); + thrust::transform(handle.get_thrust_policy(), + get_dataframe_buffer_begin(key_buffer), + get_dataframe_buffer_begin(key_buffer) + num_buffer_elements, + bucket_indices.begin(), + detail::update_frontier_call_v_op_t{ + vertex_value_input_first, + vertex_value_output_first, + v_op, + vertex_partition, + VertexFrontierType::kInvalidBucketIdx}); } auto bucket_key_pair_first = thrust::make_zip_iterator( diff --git a/cpp/include/cugraph/prims/vertex_frontier.cuh b/cpp/include/cugraph/prims/vertex_frontier.cuh index 86b55ab3f16..54400c2af0c 100644 --- a/cpp/include/cugraph/prims/vertex_frontier.cuh +++ b/cpp/include/cugraph/prims/vertex_frontier.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +40,7 @@ namespace cugraph { // stores unique key objects in the sorted (non-descending) order; key type is either vertex_t // (tag_t == void) or thrust::tuple (tag_t != void) template -class SortedUniqueKeyBucket { +class sorted_unique_key_bucket_t { static_assert(std::is_same_v || std::is_arithmetic_v); using optional_buffer_type = std:: @@ -48,13 +48,13 @@ class SortedUniqueKeyBucket { public: template >* = nullptr> - SortedUniqueKeyBucket(raft::handle_t const& handle) + sorted_unique_key_bucket_t(raft::handle_t const& handle) : handle_ptr_(&handle), vertices_(0, handle.get_stream()), tags_(std::byte{0}) { } template >* = nullptr> - SortedUniqueKeyBucket(raft::handle_t const& handle) + sorted_unique_key_bucket_t(raft::handle_t const& handle) : handle_ptr_(&handle), vertices_(0, handle.get_stream()), tags_(0, handle.get_stream()) { } @@ -277,32 +277,31 @@ class SortedUniqueKeyBucket { optional_buffer_type tags_; }; -template -class VertexFrontier { +template +class vertex_frontier_t { static_assert(std::is_same_v || std::is_arithmetic_v); public: using key_type = std::conditional_t, vertex_t, thrust::tuple>; - static size_t constexpr kNumBuckets = num_buckets; static size_t constexpr kInvalidBucketIdx{std::numeric_limits::max()}; - VertexFrontier(raft::handle_t const& handle) : handle_ptr_(&handle) + vertex_frontier_t(raft::handle_t const& handle, size_t num_buckets) : handle_ptr_(&handle) { + buckets_.reserve(num_buckets); for (size_t i = 0; i < num_buckets; ++i) { buckets_.emplace_back(handle); } } - SortedUniqueKeyBucket& get_bucket(size_t bucket_idx) + size_t num_buckets() const { return buckets_.size(); } + + sorted_unique_key_bucket_t& bucket(size_t bucket_idx) { return buckets_[bucket_idx]; } - SortedUniqueKeyBucket const& get_bucket(size_t bucket_idx) const + sorted_unique_key_bucket_t const& bucket(size_t bucket_idx) const { return buckets_[bucket_idx]; } @@ -317,12 +316,12 @@ class VertexFrontier { std::vector const& move_to_bucket_indices, SplitOp split_op) { - auto& this_bucket = get_bucket(this_bucket_idx); + auto& this_bucket = bucket(this_bucket_idx); if (this_bucket.size() == 0) { return; } // 1. apply split_op to each bucket element - static_assert(kNumBuckets <= std::numeric_limits::max()); + assert(buckets_.size() <= std::numeric_limits::max()); rmm::device_uvector bucket_indices(this_bucket.size(), handle_ptr_->get_stream()); thrust::transform( handle_ptr_->get_thrust_policy(), @@ -447,14 +446,14 @@ class VertexFrontier { // 2. insert to the target buckets for (size_t i = 0; i < insert_offsets.size(); ++i) { - get_bucket(insert_bucket_indices[i]) + bucket(insert_bucket_indices[i]) .insert(key_first + insert_offsets[i], key_first + (insert_offsets[i] + insert_sizes[i])); } } private: raft::handle_t const* handle_ptr_{nullptr}; - std::vector> buckets_{}; + std::vector> buckets_{}; }; } // namespace cugraph diff --git a/cpp/src/centrality/katz_centrality_impl.cuh b/cpp/src/centrality/katz_centrality_impl.cuh index 1f0900c720f..1fa5c93c4b9 100644 --- a/cpp/src/centrality/katz_centrality_impl.cuh +++ b/cpp/src/centrality/katz_centrality_impl.cuh @@ -72,8 +72,10 @@ void katz_centrality(raft::handle_t const& handle, // FIXME: should I check for betas? if (has_initial_guess) { - auto num_negative_values = count_if_v( - handle, pull_graph_view, katz_centralities, [] __device__(auto val) { return val < 0.0; }); + auto num_negative_values = + count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) { + return val < 0.0; + }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -132,7 +134,7 @@ void katz_centrality(raft::handle_t const& handle, handle, pull_graph_view, thrust::make_zip_iterator(thrust::make_tuple(new_katz_centralities, old_katz_centralities)), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0.0}); iter++; @@ -156,7 +158,7 @@ void katz_centrality(raft::handle_t const& handle, handle, pull_graph_view, katz_centralities, - [] __device__(auto val) { return val * val; }, + [] __device__(auto, auto val) { return val * val; }, result_t{0.0}); l2_norm = std::sqrt(l2_norm); CUGRAPH_EXPECTS(l2_norm > 0.0, diff --git a/cpp/src/community/louvain.cuh b/cpp/src/community/louvain.cuh index a558be1e198..a5551d76ae2 100644 --- a/cpp/src/community/louvain.cuh +++ b/cpp/src/community/louvain.cuh @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index f4b76670a3f..fd2609eb242 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -179,8 +179,8 @@ struct v_op_t { // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this requires // placing the atomic barrier on managed memory and this adds additional complication. size_t* num_edge_inserts{}; - size_t next_bucket_idx{}; - size_t conflict_bucket_idx{}; // relevant only if GraphViewType::is_multi_gpu is true + size_t bucket_idx_next{}; + size_t bucket_idx_conflict{}; // relevant only if GraphViewType::is_multi_gpu is true template __device__ std::enable_if_t>> @@ -195,11 +195,11 @@ struct v_op_t { atomicCAS(level_components + v_offset, invalid_component_id::value, tag); if (old != invalid_component_id::value && old != tag) { // conflict return thrust::optional>{ - thrust::make_tuple(conflict_bucket_idx, std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_conflict, std::byte{0} /* dummy */)}; } else { return (old == invalid_component_id::value) ? thrust::optional>{thrust::make_tuple( - next_bucket_idx, std::byte{0} /* dummy */)} + bucket_idx_next, std::byte{0} /* dummy */)} : thrust::nullopt; } } @@ -209,7 +209,7 @@ struct v_op_t { operator()(thrust::tuple /* tagged_v */, int /* v_val */) const { return thrust::optional>{ - thrust::make_tuple(next_bucket_idx, std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; } }; @@ -243,12 +243,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // 2. recursively run multi-root frontier expansion - enum class Bucket { - cur, - next, - conflict /* relevant only if GraphViewType::is_multi_gpu is true */, - num_buckets - }; + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t bucket_idx_conflict = 2; + constexpr size_t num_buckets = 4; + // tuning parameter to balance work per iteration (should be large enough to be throughput // bounded) vs # conflicts between frontiers with different roots (# conflicts == # edges for the // next level) @@ -449,11 +448,8 @@ void weakly_connected_components_impl(raft::handle_t const& handle, // 2-3. initialize vertex frontier, edge_buffer, and edge_partition_dst_components (if // multi-gpu) - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + vertex_frontier_t vertex_frontier(handle, + num_buckets); vertex_t next_candidate_offset{0}; edge_t edge_count{0}; @@ -468,8 +464,7 @@ void weakly_connected_components_impl(raft::handle_t const& handle, ? edge_partition_dst_property_t(handle, level_graph_view) : edge_partition_dst_property_t(handle); if constexpr (GraphViewType::is_multi_gpu) { - edge_partition_dst_components.fill(invalid_component_id::value, - handle.get_stream()); + edge_partition_dst_components.fill(handle, invalid_component_id::value); } // 2.4 iterate till every vertex gets visited @@ -502,33 +497,25 @@ void weakly_connected_components_impl(raft::handle_t const& handle, auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(new_roots.begin(), new_roots.begin())); - vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(pair_first, pair_first + new_roots.size()); + vertex_frontier.bucket(bucket_idx_cur).insert(pair_first, pair_first + new_roots.size()); } - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() == 0) { - break; - } + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() == 0) { break; } if constexpr (GraphViewType::is_multi_gpu) { update_edge_partition_dst_property( handle, level_graph_view, - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .begin() - .get_iterator_tuple()), - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .end() - .get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).begin().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).end().get_iterator_tuple()), level_components, edge_partition_dst_components); } - auto max_pushes = - GraphViewType::is_multi_gpu - ? compute_num_out_nbrs_from_frontier( - handle, level_graph_view, vertex_frontier, static_cast(Bucket::cur)) - : edge_count; + auto max_pushes = GraphViewType::is_multi_gpu + ? compute_num_out_nbrs_from_frontier( + handle, level_graph_view, vertex_frontier, bucket_idx_cur) + : edge_count; // FIXME: if we use cuco::static_map (no duplicates, ideally we need static_set), edge_buffer // size cannot exceed (# roots)^2 and we can avoid additional sort & unique (but resizing the @@ -540,10 +527,9 @@ void weakly_connected_components_impl(raft::handle_t const& handle, handle, level_graph_view, vertex_frontier, - static_cast(Bucket::cur), - GraphViewType::is_multi_gpu ? std::vector{static_cast(Bucket::next), - static_cast(Bucket::conflict)} - : std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + GraphViewType::is_multi_gpu ? std::vector{bucket_idx_next, bucket_idx_conflict} + : std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), [col_components = @@ -579,12 +565,12 @@ void weakly_connected_components_impl(raft::handle_t const& handle, level_components, get_dataframe_buffer_begin(edge_buffer), num_edge_inserts.data(), - static_cast(Bucket::next), - static_cast(Bucket::conflict)}); + bucket_idx_next, + bucket_idx_conflict}); if (GraphViewType::is_multi_gpu) { auto cur_num_edge_inserts = num_edge_inserts.value(handle.get_stream()); - auto& conflict_bucket = vertex_frontier.get_bucket(static_cast(Bucket::conflict)); + auto& conflict_bucket = vertex_frontier.bucket(bucket_idx_conflict); resize_dataframe_buffer( edge_buffer, cur_num_edge_inserts + conflict_bucket.size(), handle.get_stream()); thrust::for_each( @@ -636,17 +622,13 @@ void weakly_connected_components_impl(raft::handle_t const& handle, num_edge_inserts.set_value_async(num_unique_edges, handle.get_stream()); } - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); edge_count = thrust::transform_reduce( handle.get_thrust_policy(), - thrust::get<0>(vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .begin() - .get_iterator_tuple()), - thrust::get<0>( - vertex_frontier.get_bucket(static_cast(Bucket::cur)).end().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).begin().get_iterator_tuple()), + thrust::get<0>(vertex_frontier.bucket(bucket_idx_cur).end().get_iterator_tuple()), [vertex_partition, degrees = degrees.data()] __device__(auto v) { return degrees[vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v)]; }, diff --git a/cpp/src/cores/core_number_impl.cuh b/cpp/src/cores/core_number_impl.cuh index b9d2d75c4fb..6d077e7085c 100644 --- a/cpp/src/cores/core_number_impl.cuh +++ b/cpp/src/cores/core_number_impl.cuh @@ -143,9 +143,11 @@ void core_number(raft::handle_t const& handle, // start iteration - enum class Bucket { cur, next, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + vertex_frontier_t vertex_frontier(handle, num_buckets); edge_partition_dst_property_t, edge_t> dst_core_numbers(handle, graph_view); @@ -177,15 +179,14 @@ void core_number(raft::handle_t const& handle, remaining_vertices.end(), [core_numbers, k, v_first = graph_view.local_vertex_partition_range_first()] __device__( auto v) { return core_numbers[v - v_first] >= k; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(less_than_k_first, remaining_vertices.end()); + vertex_frontier.bucket(bucket_idx_cur).insert(less_than_k_first, remaining_vertices.end()); remaining_vertices.resize(thrust::distance(remaining_vertices.begin(), less_than_k_first), handle.get_stream()); auto delta = (graph_view.is_symmetric() && (degree_type == k_core_degree_type_t::INOUT)) ? edge_t{2} : edge_t{1}; - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() > 0) { + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() > 0) { do { // FIXME: If most vertices have core numbers less than k, (dst_val >= k) will be mostly // false leading to too many unnecessary edge traversals (this is especially problematic if @@ -198,8 +199,8 @@ void core_number(raft::handle_t const& handle, handle, graph_view, vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dst_core_numbers.device_view(), [k, delta] __device__(vertex_t src, vertex_t dst, auto, auto dst_val) { @@ -219,7 +220,7 @@ void core_number(raft::handle_t const& handle, new_core_number = new_core_number < (k - delta) ? (k - delta) : new_core_number; new_core_number = new_core_number < k_first ? edge_t{0} : new_core_number; return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), new_core_number)}; + thrust::make_tuple(bucket_idx_next, new_core_number)}; }); } @@ -230,33 +231,31 @@ void core_number(raft::handle_t const& handle, CUGRAPH_FAIL("unimplemented."); } - update_edge_partition_dst_property( - handle, - graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), - core_numbers, - dst_core_numbers); + update_edge_partition_dst_property(handle, + graph_view, + vertex_frontier.bucket(bucket_idx_next).begin(), + vertex_frontier.bucket(bucket_idx_next).end(), + core_numbers, + dst_core_numbers); - vertex_frontier.get_bucket(static_cast(Bucket::next)) + vertex_frontier.bucket(bucket_idx_next) .resize(static_cast(thrust::distance( - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), + vertex_frontier.bucket(bucket_idx_next).begin(), thrust::remove_if( handle.get_thrust_policy(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), + vertex_frontier.bucket(bucket_idx_next).begin(), + vertex_frontier.bucket(bucket_idx_next).end(), [core_numbers, k, v_first = graph_view.local_vertex_partition_range_first()] __device__(auto v) { return core_numbers[v - v_first] >= k; })))); - vertex_frontier.get_bucket(static_cast(Bucket::next)).shrink_to_fit(); + vertex_frontier.bucket(bucket_idx_next).shrink_to_fit(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); - } while (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() > 0); + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); + } while (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() > 0); // FIXME: scanning the remaining vertices can add significant overhead if the number of // distinct core numbers in [k_first, std::min(max_degree, k_last)] is large and there are @@ -280,12 +279,15 @@ void core_number(raft::handle_t const& handle, v_to_core_number_t{core_numbers, graph_view.local_vertex_partition_range_first()}); auto min_core_number = - reduce_v(handle, - graph_view, - remaining_vertex_core_number_first, - remaining_vertex_core_number_first + remaining_vertices.size(), - std::numeric_limits::max(), - raft::comms::op_t::MIN); + thrust::reduce(handle.get_thrust_policy(), + remaining_vertex_core_number_first, + remaining_vertex_core_number_first + remaining_vertices.size(), + std::numeric_limits::max(), + thrust::minimum{}); + if constexpr (multi_gpu) { + min_core_number = host_scalar_allreduce( + handle.get_comms(), min_core_number, raft::comms::op_t::MIN, handle.get_stream()); + } k = std::max(k + delta, static_cast(min_core_number + edge_t{delta})); } } diff --git a/cpp/src/link_analysis/hits_impl.cuh b/cpp/src/link_analysis/hits_impl.cuh index 5eb7fa2918c..5422c1d327f 100644 --- a/cpp/src/link_analysis/hits_impl.cuh +++ b/cpp/src/link_analysis/hits_impl.cuh @@ -35,12 +35,7 @@ void normalize(raft::handle_t const& handle, result_t* hubs, raft::comms::op_t op) { - auto hubs_norm = reduce_v(handle, - graph_view, - hubs, - hubs + graph_view.local_vertex_partition_range_size(), - identity_element(op), - op); + auto hubs_norm = reduce_v(handle, graph_view, hubs, identity_element(op), op); CUGRAPH_EXPECTS(hubs_norm > 0, "Norm is required to be a positive value."); thrust::transform(handle.get_thrust_policy(), hubs, @@ -80,7 +75,7 @@ std::tuple hits(raft::handle_t const& handle, // Check validity of initial guess if supplied if (has_initial_hubs_guess && do_expensive_check) { auto num_negative_values = - count_if_v(handle, graph_view, hubs, [] __device__(auto val) { return val < 0.0; }); + count_if_v(handle, graph_view, hubs, [] __device__(auto, auto val) { return val < 0.0; }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -102,7 +97,7 @@ std::tuple hits(raft::handle_t const& handle, if (has_initial_hubs_guess) { update_edge_partition_src_property(handle, graph_view, prev_hubs, prev_src_hubs); } else { - prev_src_hubs.fill(result_t{1.0} / num_vertices, handle.get_stream()); + prev_src_hubs.fill(handle, result_t{1.0} / num_vertices); thrust::fill(handle.get_thrust_policy(), prev_hubs, prev_hubs + graph_view.local_vertex_partition_range_size(), @@ -144,7 +139,7 @@ std::tuple hits(raft::handle_t const& handle, handle, graph_view, thrust::make_zip_iterator(thrust::make_tuple(curr_hubs, prev_hubs)), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0}); if (diff_sum < epsilon) { final_iteration_count = iter; diff --git a/cpp/src/link_analysis/pagerank_impl.cuh b/cpp/src/link_analysis/pagerank_impl.cuh index c262d913f7c..5d4c7bde1ed 100644 --- a/cpp/src/link_analysis/pagerank_impl.cuh +++ b/cpp/src/link_analysis/pagerank_impl.cuh @@ -90,10 +90,11 @@ void pagerank( if (do_expensive_check) { if (precomputed_vertex_out_weight_sums) { - auto num_negative_precomputed_vertex_out_weight_sums = count_if_v( - handle, pull_graph_view, *precomputed_vertex_out_weight_sums, [] __device__(auto val) { - return val < result_t{0.0}; - }); + auto num_negative_precomputed_vertex_out_weight_sums = + count_if_v(handle, + pull_graph_view, + *precomputed_vertex_out_weight_sums, + [] __device__(auto, auto val) { return val < result_t{0.0}; }); CUGRAPH_EXPECTS( num_negative_precomputed_vertex_out_weight_sums == 0, "Invalid input argument: outgoing edge weight sum values should be non-negative."); @@ -112,7 +113,7 @@ void pagerank( if (has_initial_guess) { auto num_negative_values = count_if_v( - handle, pull_graph_view, pageranks, [] __device__(auto val) { return val < 0.0; }); + handle, pull_graph_view, pageranks, [] __device__(auto, auto val) { return val < 0.0; }); CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: initial guess values should be non-negative."); } @@ -121,21 +122,28 @@ void pagerank( auto vertex_partition = vertex_partition_device_view_t( pull_graph_view.local_vertex_partition_view()); auto num_invalid_vertices = - count_if_v(handle, - pull_graph_view, - *personalization_vertices, - *personalization_vertices + *personalization_vector_size, - [vertex_partition] __device__(auto val) { - return !(vertex_partition.is_valid_vertex(val) && - vertex_partition.in_local_vertex_partition_range_nocheck(val)); - }); + thrust::count_if(handle.get_thrust_policy(), + *personalization_vertices, + *personalization_vertices + *personalization_vector_size, + [vertex_partition] __device__(auto val) { + return !(vertex_partition.is_valid_vertex(val) && + vertex_partition.in_local_vertex_partition_range_nocheck(val)); + }); + if constexpr (GraphViewType::is_multi_gpu) { + num_invalid_vertices = host_scalar_allreduce( + handle.get_comms(), num_invalid_vertices, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_invalid_vertices == 0, "Invalid input argument: peresonalization vertices have invalid vertex IDs."); - auto num_negative_values = count_if_v(handle, - pull_graph_view, - *personalization_values, - *personalization_values + *personalization_vector_size, - [] __device__(auto val) { return val < 0.0; }); + auto num_negative_values = + thrust::count_if(handle.get_thrust_policy(), + *personalization_values, + *personalization_values + *personalization_vector_size, + [] __device__(auto val) { return val < 0.0; }); + if constexpr (GraphViewType::is_multi_gpu) { + num_negative_values = host_scalar_allreduce( + handle.get_comms(), num_negative_values, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_negative_values == 0, "Invalid input argument: peresonalization values should be non-negative."); } @@ -174,11 +182,14 @@ void pagerank( result_t personalization_sum{0.0}; if (aggregate_personalization_vector_size > 0) { - personalization_sum = reduce_v(handle, - pull_graph_view, - *personalization_values, - *personalization_values + *personalization_vector_size, - result_t{0.0}); + personalization_sum = thrust::reduce(handle.get_thrust_policy(), + *personalization_values, + *personalization_values + *personalization_vector_size, + result_t{0.0}); + if constexpr (GraphViewType::is_multi_gpu) { + personalization_sum = host_scalar_allreduce( + handle.get_comms(), personalization_sum, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(personalization_sum > 0.0, "Invalid input argument: sum of personalization valuese " "should be positive."); @@ -205,7 +216,7 @@ void pagerank( handle, pull_graph_view, vertex_val_first, - [] __device__(auto val) { + [] __device__(auto, auto val) { auto const pagerank = thrust::get<0>(val); auto const out_weight_sum = thrust::get<1>(val); return out_weight_sum == result_t{0.0} ? pagerank : result_t{0.0}; @@ -266,7 +277,7 @@ void pagerank( handle, pull_graph_view, thrust::make_zip_iterator(thrust::make_tuple(pageranks, old_pageranks.data())), - [] __device__(auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, + [] __device__(auto, auto val) { return std::abs(thrust::get<0>(val) - thrust::get<1>(val)); }, result_t{0.0}); iter++; diff --git a/cpp/src/traversal/bfs_impl.cuh b/cpp/src/traversal/bfs_impl.cuh index bdc1f8ff602..525328ab563 100644 --- a/cpp/src/traversal/bfs_impl.cuh +++ b/cpp/src/traversal/bfs_impl.cuh @@ -17,7 +17,6 @@ #include #include -#include #include #include #include @@ -127,14 +126,17 @@ void bfs(raft::handle_t const& handle, auto vertex_partition = vertex_partition_device_view_t( push_graph_view.local_vertex_partition_view()); auto num_invalid_vertices = - count_if_v(handle, - push_graph_view, - sources, - sources + n_sources, - [vertex_partition] __device__(auto val) { - return !(vertex_partition.is_valid_vertex(val) && - vertex_partition.in_local_vertex_partition_range_nocheck(val)); - }); + thrust::count_if(handle.get_thrust_policy(), + sources, + sources + n_sources, + [vertex_partition] __device__(auto val) { + return !(vertex_partition.is_valid_vertex(val) && + vertex_partition.in_local_vertex_partition_range_nocheck(val)); + }); + if constexpr (GraphViewType::is_multi_gpu) { + num_invalid_vertices = host_scalar_allreduce( + handle.get_comms(), num_invalid_vertices, raft::comms::op_t::SUM, handle.get_stream()); + } CUGRAPH_EXPECTS(num_invalid_vertices == 0, "Invalid input argument: sources have invalid vertex IDs."); } @@ -166,14 +168,15 @@ void bfs(raft::handle_t const& handle, } // 3. initialize BFS frontier - enum class Bucket { cur, next, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); - - vertex_frontier.get_bucket(static_cast(Bucket::cur)).insert(sources, sources + n_sources); + + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + vertex_frontier_t vertex_frontier(handle, + num_buckets); + + vertex_frontier.bucket(bucket_idx_cur).insert(sources, sources + n_sources); rmm::device_uvector visited_flags( (push_graph_view.local_vertex_partition_range_size() + (sizeof(uint32_t) * 8 - 1)) / (sizeof(uint32_t) * 8), @@ -188,9 +191,7 @@ void bfs(raft::handle_t const& handle, : edge_partition_dst_property_t( handle); // relevant only if GraphViewType::is_multi_gpu is true - if constexpr (GraphViewType::is_multi_gpu) { - dst_visited_flags.fill(uint8_t{0}, handle.get_stream()); - } + if constexpr (GraphViewType::is_multi_gpu) { dst_visited_flags.fill(handle, uint8_t{0}); } // 4. BFS iteration vertex_t depth{0}; @@ -199,13 +200,12 @@ void bfs(raft::handle_t const& handle, CUGRAPH_FAIL("unimplemented."); } else { if (GraphViewType::is_multi_gpu) { - update_edge_partition_dst_property( - handle, - push_graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::cur)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::cur)).end(), - thrust::make_constant_iterator(uint8_t{1}), - dst_visited_flags); + update_edge_partition_dst_property(handle, + push_graph_view, + vertex_frontier.bucket(bucket_idx_cur).begin(), + vertex_frontier.bucket(bucket_idx_cur).end(), + thrust::make_constant_iterator(uint8_t{1}), + dst_visited_flags); } else { thrust::copy(handle.get_thrust_policy(), visited_flags.begin(), @@ -226,8 +226,8 @@ void bfs(raft::handle_t const& handle, handle, push_graph_view, vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), #if 1 @@ -254,28 +254,19 @@ void bfs(raft::handle_t const& handle, return (v_val == invalid_distance) ? thrust::optional< thrust::tuple>>{thrust::make_tuple( - static_cast(Bucket::next), - thrust::make_tuple(depth + 1, pushed_val))} + bucket_idx_next, thrust::make_tuple(depth + 1, pushed_val))} : thrust::nullopt; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur)).shrink_to_fit(); - vertex_frontier.swap_buckets(static_cast(Bucket::cur), - static_cast(Bucket::next)); - if (vertex_frontier.get_bucket(static_cast(Bucket::cur)).aggregate_size() == 0) { - break; - } + vertex_frontier.bucket(bucket_idx_cur).clear(); + vertex_frontier.bucket(bucket_idx_cur).shrink_to_fit(); + vertex_frontier.swap_buckets(bucket_idx_cur, bucket_idx_next); + if (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() == 0) { break; } } depth++; if (depth >= depth_limit) { break; } } - - RAFT_CUDA_TRY(cudaStreamSynchronize( - handle.get_stream())); // this is as necessary vertex_frontier will become out-of-scope once - // this function returns (FIXME: should I stream sync in VertexFrontier - // destructor?) } } // namespace detail diff --git a/cpp/src/traversal/sssp_impl.cuh b/cpp/src/traversal/sssp_impl.cuh index 262a36aed4f..c7f1e8b3748 100644 --- a/cpp/src/traversal/sssp_impl.cuh +++ b/cpp/src/traversal/sssp_impl.cuh @@ -125,12 +125,13 @@ void sssp(raft::handle_t const& handle, // 4. initialize SSSP frontier - enum class Bucket { cur_near, next_near, far, num_buckets }; - VertexFrontier(Bucket::num_buckets)> - vertex_frontier(handle); + constexpr size_t bucket_idx_cur_near = 0; + constexpr size_t bucket_idx_next_near = 1; + constexpr size_t bucket_idx_far = 2; + constexpr size_t num_buckets = 3; + + vertex_frontier_t vertex_frontier(handle, + num_buckets); // 5. SSSP iteration @@ -139,23 +140,22 @@ void sssp(raft::handle_t const& handle, ? edge_partition_src_property_t(handle, push_graph_view) : edge_partition_src_property_t(handle); if (GraphViewType::is_multi_gpu) { - edge_partition_src_distances.fill(std::numeric_limits::max(), handle.get_stream()); + edge_partition_src_distances.fill(handle, std::numeric_limits::max()); } if (push_graph_view.in_local_vertex_partition_range_nocheck(source_vertex)) { - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).insert(source_vertex); + vertex_frontier.bucket(bucket_idx_cur_near).insert(source_vertex); } auto near_far_threshold = delta; while (true) { if (GraphViewType::is_multi_gpu) { - update_edge_partition_src_property( - handle, - push_graph_view, - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).begin(), - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).end(), - distances, - edge_partition_src_distances); + update_edge_partition_src_property(handle, + push_graph_view, + vertex_frontier.bucket(bucket_idx_cur_near).begin(), + vertex_frontier.bucket(bucket_idx_cur_near).end(), + distances, + edge_partition_src_distances); } auto vertex_partition = vertex_partition_device_view_t( @@ -165,8 +165,8 @@ void sssp(raft::handle_t const& handle, handle, push_graph_view, vertex_frontier, - static_cast(Bucket::cur_near), - std::vector{static_cast(Bucket::next_near), static_cast(Bucket::far)}, + bucket_idx_cur_near, + std::vector{bucket_idx_next_near, bucket_idx_far}, GraphViewType::is_multi_gpu ? edge_partition_src_distances.device_view() : detail::edge_partition_major_property_device_view_t(distances), @@ -193,23 +193,20 @@ void sssp(raft::handle_t const& handle, [near_far_threshold] __device__(auto v, auto v_val, auto pushed_val) { auto new_dist = thrust::get<0>(pushed_val); auto idx = new_dist < v_val - ? (new_dist < near_far_threshold ? static_cast(Bucket::next_near) - : static_cast(Bucket::far)) - : VertexFrontier::kInvalidBucketIdx; + ? (new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far) + : vertex_frontier_t::kInvalidBucketIdx; return new_dist < v_val ? thrust::optional>{thrust::make_tuple( - static_cast(new_dist < near_far_threshold ? Bucket::next_near - : Bucket::far), + new_dist < near_far_threshold ? bucket_idx_next_near : bucket_idx_far, pushed_val)} : thrust::nullopt; }); - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).clear(); - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).shrink_to_fit(); - if (vertex_frontier.get_bucket(static_cast(Bucket::next_near)).aggregate_size() > 0) { - vertex_frontier.swap_buckets(static_cast(Bucket::cur_near), - static_cast(Bucket::next_near)); - } else if (vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size() > + vertex_frontier.bucket(bucket_idx_cur_near).clear(); + vertex_frontier.bucket(bucket_idx_cur_near).shrink_to_fit(); + if (vertex_frontier.bucket(bucket_idx_next_near).aggregate_size() > 0) { + vertex_frontier.swap_buckets(bucket_idx_cur_near, bucket_idx_next_near); + } else if (vertex_frontier.bucket(bucket_idx_far).aggregate_size() > 0) { // near queue is empty, split the far queue auto old_near_far_threshold = near_far_threshold; near_far_threshold += delta; @@ -218,20 +215,19 @@ void sssp(raft::handle_t const& handle, size_t far_size{0}; while (true) { vertex_frontier.split_bucket( - static_cast(Bucket::far), - std::vector{static_cast(Bucket::cur_near)}, + bucket_idx_far, + std::vector{bucket_idx_cur_near}, [vertex_partition, distances, old_near_far_threshold, near_far_threshold] __device__( auto v) { auto dist = *(distances + vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v)); return dist >= old_near_far_threshold - ? thrust::optional{static_cast( - dist < near_far_threshold ? Bucket::cur_near : Bucket::far)} + ? thrust::optional{dist < near_far_threshold ? bucket_idx_cur_near + : bucket_idx_far} : thrust::nullopt; }); - near_size = - vertex_frontier.get_bucket(static_cast(Bucket::cur_near)).aggregate_size(); - far_size = vertex_frontier.get_bucket(static_cast(Bucket::far)).aggregate_size(); + near_size = vertex_frontier.bucket(bucket_idx_cur_near).aggregate_size(); + far_size = vertex_frontier.bucket(bucket_idx_far).aggregate_size(); if ((near_size > 0) || (far_size == 0)) { break; } else { @@ -243,11 +239,6 @@ void sssp(raft::handle_t const& handle, break; } } - - RAFT_CUDA_TRY(cudaStreamSynchronize( - handle.get_stream())); // this is as necessary vertex_frontier will become out-of-scope once - // this function returns (FIXME: should I stream sync in VertexFrontier - // destructor?) } } // namespace detail diff --git a/cpp/tests/prims/mg_count_if_v.cu b/cpp/tests/prims/mg_count_if_v.cu index 81512f0b832..de5d0a559c4 100644 --- a/cpp/tests/prims/mg_count_if_v.cu +++ b/cpp/tests/prims/mg_count_if_v.cu @@ -43,7 +43,7 @@ template struct test_predicate { int mod{}; test_predicate(int mod_count) : mod(mod_count) {} - __device__ bool operator()(const vertex_t& val) + __device__ bool operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; return (0 == (hash_func(val) % mod)); @@ -137,10 +137,10 @@ class Tests_MG_CountIfV cugraph::test::construct_graph( handle, input_usecase, true, false); auto sg_graph_view = sg_graph.view(); - auto expected_vertex_count = thrust::count_if( - handle.get_thrust_policy(), + auto expected_vertex_count = count_if_v( + handle, + sg_graph_view, thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), test_predicate(hash_bin_count)); ASSERT_TRUE(expected_vertex_count == vertex_count); } diff --git a/cpp/tests/prims/mg_reduce_v.cu b/cpp/tests/prims/mg_reduce_v.cu index 2c3109c4e01..16133ce18e2 100644 --- a/cpp/tests/prims/mg_reduce_v.cu +++ b/cpp/tests/prims/mg_reduce_v.cu @@ -255,12 +255,7 @@ class Tests_MG_ReduceV hr_clock.start(); } - results[op] = reduce_v(handle, - mg_graph_view, - property_iter, - property_iter + (*d_mg_renumber_map_labels).size(), - property_initial_value, - op); + results[op] = reduce_v(handle, mg_graph_view, property_iter, property_initial_value, op); if (cugraph::test::g_perf) { RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement diff --git a/cpp/tests/prims/mg_transform_reduce_v.cu b/cpp/tests/prims/mg_transform_reduce_v.cu index e3d75b2f4e4..4bd8bf5fe60 100644 --- a/cpp/tests/prims/mg_transform_reduce_v.cu +++ b/cpp/tests/prims/mg_transform_reduce_v.cu @@ -43,7 +43,7 @@ template struct property_transform : public thrust::unary_function { int mod{}; property_transform(int mod_count) : mod(mod_count) {} - constexpr __device__ auto operator()(const vertex_t& val) + constexpr __device__ auto operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; auto value = hash_func(val) % mod; @@ -56,7 +56,7 @@ struct property_transform> : public thrust::unary_function> { int mod{}; property_transform(int mod_count) : mod(mod_count) {} - constexpr __device__ auto operator()(const vertex_t& val) + constexpr __device__ auto operator()(vertex_t, const vertex_t& val) { cuco::detail::MurmurHash3_32 hash_func{}; auto value = hash_func(val) % mod; @@ -213,7 +213,7 @@ class Tests_MG_TransformReduceV } } - //// 4. compare SG & MG results + // 4. compare SG & MG results if (prims_usecase.check_correctness) { cugraph::graph_t sg_graph(handle); @@ -223,16 +223,13 @@ class Tests_MG_TransformReduceV auto sg_graph_view = sg_graph.view(); for (auto op : ops) { - auto expected_result = cugraph::op_dispatch( - op, [&handle, &sg_graph_view, prop, property_initial_value](auto op) { - return thrust::transform_reduce( - handle.get_thrust_policy(), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), - thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_last()), - prop, - property_initial_value, - op); - }); + auto expected_result = transform_reduce_v( + handle, + sg_graph_view, + thrust::make_counting_iterator(sg_graph_view.local_vertex_partition_range_first()), + prop, + property_initial_value, + op); result_compare compare{}; ASSERT_TRUE(compare(expected_result, results[op])); } diff --git a/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu b/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu index 4e065c8c119..18e70bd693f 100644 --- a/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu +++ b/cpp/tests/prims/mg_update_frontier_v_push_if_out_nbr.cu @@ -175,11 +175,13 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr cugraph::get_dataframe_buffer_cbegin(mg_property_buffer), mg_dst_properties); - enum class Bucket { cur, next, num_buckets }; - cugraph::VertexFrontier(Bucket::num_buckets)> - mg_vertex_frontier(handle); - mg_vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(sources.begin(), sources.end()); + constexpr size_t bucket_idx_cur = 0; + constexpr size_t bucket_idx_next = 1; + constexpr size_t num_buckets = 2; + + cugraph::vertex_frontier_t mg_vertex_frontier(handle, + num_buckets); + mg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); if (cugraph::test::g_perf) { RAFT_CUDA_TRY(cudaDeviceSynchronize()); // for consistent performance measurement @@ -192,8 +194,8 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr handle, mg_graph_view, mg_vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, mg_src_properties.device_view(), mg_dst_properties.device_view(), [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { @@ -206,7 +208,7 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr thrust::make_discard_iterator() /* dummy */, [] __device__(auto v, auto v_val, auto pushed_val) { return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; }); if (cugraph::test::g_perf) { @@ -223,7 +225,7 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr auto mg_aggregate_renumber_map_labels = cugraph::test::device_gatherv( handle, (*mg_renumber_map_labels).data(), (*mg_renumber_map_labels).size()); - auto& next_bucket = mg_vertex_frontier.get_bucket(static_cast(Bucket::next)); + auto& next_bucket = mg_vertex_frontier.bucket(bucket_idx_next); auto mg_aggregate_frontier_dsts = cugraph::test::device_gatherv(handle, next_bucket.begin(), next_bucket.size()); @@ -266,18 +268,16 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr sg_graph_view, cugraph::get_dataframe_buffer_cbegin(sg_property_buffer), sg_dst_properties); - cugraph:: - VertexFrontier(Bucket::num_buckets)> - sg_vertex_frontier(handle); - sg_vertex_frontier.get_bucket(static_cast(Bucket::cur)) - .insert(sources.begin(), sources.end()); + cugraph::vertex_frontier_t sg_vertex_frontier(handle, + num_buckets); + sg_vertex_frontier.bucket(bucket_idx_cur).insert(sources.begin(), sources.end()); update_frontier_v_push_if_out_nbr( handle, sg_graph_view, sg_vertex_frontier, - static_cast(Bucket::cur), - std::vector{static_cast(Bucket::next)}, + bucket_idx_cur, + std::vector{bucket_idx_next}, sg_src_properties.device_view(), sg_dst_properties.device_view(), [] __device__(vertex_t src, vertex_t dst, auto src_val, auto dst_val) { @@ -290,17 +290,16 @@ class Tests_MG_UpdateFrontierVPushIfOutNbr thrust::make_discard_iterator() /* dummy */, [] __device__(auto v, auto v_val, auto pushed_val) { return thrust::optional>{ - thrust::make_tuple(static_cast(Bucket::next), std::byte{0} /* dummy */)}; + thrust::make_tuple(bucket_idx_next, std::byte{0} /* dummy */)}; }); thrust::sort(handle.get_thrust_policy(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).end()); - bool passed = - thrust::equal(handle.get_thrust_policy(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).begin(), - sg_vertex_frontier.get_bucket(static_cast(Bucket::next)).end(), - mg_aggregate_frontier_dsts.begin()); + sg_vertex_frontier.bucket(bucket_idx_next).begin(), + sg_vertex_frontier.bucket(bucket_idx_next).end()); + bool passed = thrust::equal(handle.get_thrust_policy(), + sg_vertex_frontier.bucket(bucket_idx_next).begin(), + sg_vertex_frontier.bucket(bucket_idx_next).end(), + mg_aggregate_frontier_dsts.begin()); ASSERT_TRUE(passed); } }