From 8ddc7d41de44d93d8102938f92762114373f1d3d Mon Sep 17 00:00:00 2001 From: Seunghwa Kang <45857425+seunghwak@users.noreply.github.com> Date: Thu, 14 Jul 2022 11:46:35 -0700 Subject: [PATCH] Skip reduction for zero (in|out-)degree vertices. (#2365) * Added a zero major degree segment when segmenting a vertex partition based on major degrees. * Added use_dcs() utility function to graph_view_t. * Added major_hypersparse_first() utility function to edge_partition_device_view_t. * Skip reduction for zero degree partitions in per_v_transform_reduce_incoming_e (if major == dst) or per_v_transform_reduce_outgoing_e (if major == src). Authors: - Seunghwa Kang (https://github.com/seunghwak) Approvers: - Chuck Hastings (https://github.com/ChuckHastings) - Kumar Aatish (https://github.com/kaatish) - Joseph Nke (https://github.com/jnke2016) URL: https://github.com/rapidsai/cugraph/pull/2365 --- .../cugraph/edge_partition_device_view.cuh | 29 +++- cpp/include/cugraph/edge_partition_view.hpp | 9 +- cpp/include/cugraph/graph.hpp | 2 +- cpp/include/cugraph/graph_view.hpp | 20 +++ .../cugraph/utilities/device_functors.cuh | 14 ++ cpp/src/detail/utility_wrappers.cu | 2 +- cpp/src/link_analysis/pagerank_impl.cuh | 8 +- cpp/src/prims/detail/nbr_intersection.cuh | 130 +++++++------- .../prims/edge_partition_src_dst_property.cuh | 2 +- ...v_transform_reduce_incoming_outgoing_e.cuh | 160 +++++++++++++++--- cpp/src/prims/transform_reduce_e.cuh | 10 +- .../transform_reduce_e_by_src_dst_key.cuh | 10 +- ...rm_reduce_v_frontier_outgoing_e_by_dst.cuh | 111 ++++++------ ...update_edge_partition_src_dst_property.cuh | 79 ++++++++- .../sampling/detail/sampling_utils_impl.cuh | 68 ++++---- cpp/src/structure/graph_impl.cuh | 14 +- cpp/src/structure/graph_view_impl.cuh | 36 ++-- cpp/src/structure/renumber_edgelist_impl.cuh | 23 ++- cpp/tests/link_analysis/mg_hits_test.cpp | 2 +- cpp/tests/link_analysis/mg_pagerank_test.cpp | 2 +- ..._count_self_loops_and_multi_edges_test.cpp | 23 +-- 21 files changed, 476 insertions(+), 278 deletions(-) diff --git a/cpp/include/cugraph/edge_partition_device_view.cuh b/cpp/include/cugraph/edge_partition_device_view.cuh index c6088e92575..cee524ce29c 100644 --- a/cpp/include/cugraph/edge_partition_device_view.cuh +++ b/cpp/include/cugraph/edge_partition_device_view.cuh @@ -112,6 +112,9 @@ class edge_partition_device_view_t{*(view.dcs_nzd_vertex_count())} : thrust::nullopt), + major_hypersparse_first_(view.major_hypersparse_first() + ? thrust::optional{*(view.major_hypersparse_first())} + : thrust::nullopt), major_range_first_(view.major_range_first()), major_range_last_(view.major_range_last()), minor_range_first_(view.minor_range_first()), @@ -120,6 +123,16 @@ class edge_partition_device_view_t major_hypersparse_first() const noexcept + { + return major_hypersparse_first_; + } + __host__ __device__ vertex_t major_range_first() const noexcept { return major_range_first_; } __host__ __device__ vertex_t major_range_last() const noexcept { return major_range_last_; } @@ -186,11 +199,6 @@ class edge_partition_device_view_t dcs_nzd_vertices() const { return dcs_nzd_vertices_; @@ -203,8 +211,9 @@ class edge_partition_device_view_t dcs_nzd_vertices_{nullptr}; - thrust::optional dcs_nzd_vertex_count_{0}; + thrust::optional dcs_nzd_vertices_{thrust::nullopt}; + thrust::optional dcs_nzd_vertex_count_{thrust::nullopt}; + thrust::optional major_hypersparse_first_{thrust::nullopt}; vertex_t major_range_first_{0}; vertex_t major_range_last_{0}; @@ -232,6 +241,12 @@ class edge_partition_device_view_t major_hypersparse_first() const noexcept + { + assert(false); + return thrust::nullopt; + } + __host__ __device__ constexpr vertex_t major_range_first() const noexcept { return vertex_t{0}; } __host__ __device__ vertex_t major_range_last() const noexcept { return number_of_vertices_; } diff --git a/cpp/include/cugraph/edge_partition_view.hpp b/cpp/include/cugraph/edge_partition_view.hpp index 4b41415fdef..965ded72b05 100644 --- a/cpp/include/cugraph/edge_partition_view.hpp +++ b/cpp/include/cugraph/edge_partition_view.hpp @@ -65,6 +65,7 @@ class edge_partition_view_t weights, std::optional dcs_nzd_vertices, std::optional dcs_nzd_vertex_count, + std::optional major_hypersparse_first, edge_t number_of_edge_partition_edges, vertex_t major_range_first, vertex_t major_range_last, @@ -75,6 +76,7 @@ class edge_partition_view_t dcs_nzd_vertices() const { return dcs_nzd_vertices_; } std::optional dcs_nzd_vertex_count() const { return dcs_nzd_vertex_count_; } + std::optional major_hypersparse_first() const { return major_hypersparse_first_; } vertex_t major_range_first() const { return major_range_first_; } vertex_t major_range_last() const { return major_range_last_; } @@ -95,8 +98,9 @@ class edge_partition_view_t dcs_nzd_vertices_{}; - std::optional dcs_nzd_vertex_count_{}; + std::optional dcs_nzd_vertices_{std::nullopt}; + std::optional dcs_nzd_vertex_count_{std::nullopt}; + std::optional major_hypersparse_first_{std::nullopt}; vertex_t major_range_first_{0}; vertex_t major_range_last_{0}; @@ -124,6 +128,7 @@ class edge_partition_view_t dcs_nzd_vertices() const { return std::nullopt; } std::optional dcs_nzd_vertex_count() const { return std::nullopt; } + std::optional major_hypersparse_first() const { return std::nullopt; } vertex_t major_range_first() const { return vertex_t{0}; } vertex_t major_range_last() const { return number_of_vertices_; } diff --git a/cpp/include/cugraph/graph.hpp b/cpp/include/cugraph/graph.hpp index 7ab539c0cb8..4ea1c3e4365 100644 --- a/cpp/include/cugraph/graph.hpp +++ b/cpp/include/cugraph/graph.hpp @@ -248,7 +248,7 @@ class graph_t>( (*local_sorted_unique_edge_dst_chunk_start_offsets_).size()); - for (size_t i = 0; (*local_sorted_unique_edge_dsts).size(); ++i) { + for (size_t i = 0; i < (*local_sorted_unique_edge_dsts).size(); ++i) { (*local_sorted_unique_edge_dsts)[i] = raft::device_span((*local_sorted_unique_edge_dsts_)[i].begin(), (*local_sorted_unique_edge_dsts_)[i].end()); diff --git a/cpp/include/cugraph/graph_view.hpp b/cpp/include/cugraph/graph_view.hpp index 00283f48106..ec3e231ef96 100644 --- a/cpp/include/cugraph/graph_view.hpp +++ b/cpp/include/cugraph/graph_view.hpp @@ -582,6 +582,17 @@ class graph_view_t (detail::num_sparse_segments_per_vertex_partition + size_t{2}); + } else { + return false; + } + } + std::optional> local_edge_partition_segment_offsets( size_t partition_idx) const { @@ -626,6 +637,12 @@ class graph_view_tlocal_edge_partition_src_value_start_offset(partition_idx); } + std::optional major_hypersparse_first{std::nullopt}; + if (this->use_dcs()) { + major_hypersparse_first = + major_range_first + (*(this->local_edge_partition_segment_offsets( + partition_idx)))[detail::num_sparse_segments_per_vertex_partition]; + } return edge_partition_view_t( edge_partition_offsets_[partition_idx], edge_partition_indices_[partition_idx], @@ -638,6 +655,7 @@ class graph_view_t{(*edge_partition_dcs_nzd_vertex_counts_)[partition_idx]} : std::nullopt, + major_hypersparse_first, edge_partition_number_of_edges_[partition_idx], major_range_first, major_range_last, @@ -926,6 +944,8 @@ class graph_view_t> local_edge_partition_segment_offsets( size_t partition_idx = 0) const { diff --git a/cpp/include/cugraph/utilities/device_functors.cuh b/cpp/include/cugraph/utilities/device_functors.cuh index faa238bc6ab..d525b9f3b52 100644 --- a/cpp/include/cugraph/utilities/device_functors.cuh +++ b/cpp/include/cugraph/utilities/device_functors.cuh @@ -67,6 +67,20 @@ struct check_bit_set_t { } }; +template +struct shift_left_t { + T offset{}; + + __device__ T operator()(T input) const { return input - offset; } +}; + +template +struct shift_right_t { + T offset{}; + + __device__ T operator()(T input) const { return input + offset; } +}; + template struct multiplier_t { T multiplier{}; diff --git a/cpp/src/detail/utility_wrappers.cu b/cpp/src/detail/utility_wrappers.cu index c012658eef7..a2d172f9744 100644 --- a/cpp/src/detail/utility_wrappers.cu +++ b/cpp/src/detail/utility_wrappers.cu @@ -115,7 +115,7 @@ std::tuple, rmm::device_uvector> filter_de auto zip_iter = thrust::make_zip_iterator(thrust::make_tuple(d_vertices.begin(), d_out_degs.begin())); - CUGRAPH_EXPECTS(d_vertices.size() < std::numeric_limits::max(), + CUGRAPH_EXPECTS(d_vertices.size() < static_cast(std::numeric_limits::max()), "remove_if will fail, d_vertices.size() is too large"); // FIXME: remove_if has a 32-bit overflow issue (https://github.com/NVIDIA/thrust/issues/1302) diff --git a/cpp/src/link_analysis/pagerank_impl.cuh b/cpp/src/link_analysis/pagerank_impl.cuh index fce8b6743e2..4cb002abe9b 100644 --- a/cpp/src/link_analysis/pagerank_impl.cuh +++ b/cpp/src/link_analysis/pagerank_impl.cuh @@ -105,14 +105,14 @@ void pagerank( } if (pull_graph_view.is_weighted()) { - auto num_nonpositive_edge_weights = + auto num_negative_edge_weights = count_if_e(handle, pull_graph_view, dummy_property_t{}.device_view(), dummy_property_t{}.device_view(), - [] __device__(vertex_t, vertex_t, weight_t w, auto, auto) { return w <= 0.0; }); - CUGRAPH_EXPECTS(num_nonpositive_edge_weights == 0, - "Invalid input argument: input graph should have postive edge weights."); + [] __device__(vertex_t, vertex_t, weight_t w, auto, auto) { return w < 0.0; }); + CUGRAPH_EXPECTS(num_negative_edge_weights == 0, + "Invalid input argument: input graph should have non-negative edge weights."); } if (has_initial_guess) { diff --git a/cpp/src/prims/detail/nbr_intersection.cuh b/cpp/src/prims/detail/nbr_intersection.cuh index a2797c32fae..fe956407bbb 100644 --- a/cpp/src/prims/detail/nbr_intersection.cuh +++ b/cpp/src/prims/detail/nbr_intersection.cuh @@ -124,7 +124,6 @@ struct update_rx_major_local_degree_t { int col_comm_size{}; edge_partition_device_view_t edge_partition{}; - thrust::optional major_hypersparse_first{}; size_t reordered_idx_first{}; size_t local_partition_idx{}; @@ -146,12 +145,13 @@ struct update_rx_major_local_degree_t { auto major = rx_majors[rx_group_firsts[row_comm_rank * col_comm_size + local_partition_idx] + offset_in_local_edge_partition]; edge_t local_degree{}; - if (multi_gpu && (major_hypersparse_first && (major >= *major_hypersparse_first))) { + if (multi_gpu && (edge_partition.major_hypersparse_first() && + (major >= *(edge_partition.major_hypersparse_first())))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major); local_degree = major_hypersparse_idx - ? edge_partition.local_degree( - (*major_hypersparse_first - edge_partition.major_range_first()) + - *major_hypersparse_idx) + ? edge_partition.local_degree((*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()) + + *major_hypersparse_idx) : edge_t{0}; } else { local_degree = @@ -169,7 +169,6 @@ struct update_rx_major_local_nbrs_t { int col_comm_size{}; edge_partition_device_view_t edge_partition{}; - thrust::optional major_hypersparse_first{}; size_t reordered_idx_first{}; size_t local_partition_idx{}; @@ -194,11 +193,13 @@ struct update_rx_major_local_nbrs_t { vertex_t const* indices{nullptr}; [[maybe_unused]] thrust::optional weights{thrust::nullopt}; edge_t local_degree{0}; - if (multi_gpu && (major_hypersparse_first && (major >= *major_hypersparse_first))) { + if (multi_gpu && (edge_partition.major_hypersparse_first() && + (major >= *(edge_partition.major_hypersparse_first())))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major); if (major_hypersparse_idx) { thrust::tie(indices, weights, local_degree) = edge_partition.local_edges( - (*major_hypersparse_first - edge_partition.major_range_first()) + *major_hypersparse_idx); + (*(edge_partition.major_hypersparse_first()) - edge_partition.major_range_first()) + + *major_hypersparse_idx); } } else { thrust::tie(indices, weights, local_degree) = @@ -242,7 +243,6 @@ struct pick_min_degree_t { size_t const* second_element_offsets{nullptr}; edge_partition_device_view_t edge_partition{}; - thrust::optional major_hypersparse_first{}; __device__ edge_t operator()(thrust::tuple pair) const { @@ -250,14 +250,16 @@ struct pick_min_degree_t { vertex_t major0 = thrust::get<0>(pair); if constexpr (std::is_same_v) { if constexpr (multi_gpu) { - if (major_hypersparse_first && (major0 >= *major_hypersparse_first)) { + if (edge_partition.major_hypersparse_first() && + (major0 >= *(edge_partition.major_hypersparse_first()))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major0); - local_degree0 = major_hypersparse_idx - ? edge_partition.local_degree( - (*major_hypersparse_first - edge_partition.major_range_first()) + - *major_hypersparse_idx) - : edge_t{0}; + local_degree0 = + major_hypersparse_idx + ? edge_partition.local_degree((*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()) + + *major_hypersparse_idx) + : edge_t{0}; } else { local_degree0 = edge_partition.local_degree(edge_partition.major_offset_from_major_nocheck(major0)); @@ -279,14 +281,16 @@ struct pick_min_degree_t { vertex_t major1 = thrust::get<1>(pair); if constexpr (std::is_same_v) { if constexpr (multi_gpu) { - if (major_hypersparse_first && (major1 >= *major_hypersparse_first)) { + if (edge_partition.major_hypersparse_first() && + (major1 >= *(edge_partition.major_hypersparse_first()))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major1); - local_degree1 = major_hypersparse_idx - ? edge_partition.local_degree( - (*major_hypersparse_first - edge_partition.major_range_first()) + - *major_hypersparse_idx) - : edge_t{0}; + local_degree1 = + major_hypersparse_idx + ? edge_partition.local_degree((*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()) + + *major_hypersparse_idx) + : edge_t{0}; } else { local_degree1 = edge_partition.local_degree(edge_partition.major_offset_from_major_nocheck(major1)); @@ -325,7 +329,6 @@ struct copy_intersecting_nbrs_and_update_intersection_size_t { vertex_t const* second_element_indices{nullptr}; edge_partition_device_view_t edge_partition{}; - thrust::optional major_hypersparse_first{}; VertexPairIterator vertex_pair_first; size_t const* nbr_intersection_offsets{nullptr}; @@ -343,12 +346,13 @@ struct copy_intersecting_nbrs_and_update_intersection_size_t { if constexpr (std::is_same_v) { vertex_t major = thrust::get<0>(pair); if constexpr (multi_gpu) { - if (major_hypersparse_first && (major >= *major_hypersparse_first)) { + if (edge_partition.major_hypersparse_first() && + (major >= *(edge_partition.major_hypersparse_first()))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major); if (major_hypersparse_idx) { thrust::tie(indices0, weights0, local_degree0) = edge_partition.local_edges( - (*major_hypersparse_first - edge_partition.major_range_first()) + + (*(edge_partition.major_hypersparse_first()) - edge_partition.major_range_first()) + *major_hypersparse_idx); } } else { @@ -376,12 +380,13 @@ struct copy_intersecting_nbrs_and_update_intersection_size_t { if constexpr (std::is_same_v) { vertex_t major = thrust::get<1>(pair); if constexpr (multi_gpu) { - if (major_hypersparse_first && (major >= *major_hypersparse_first)) { + if (edge_partition.major_hypersparse_first() && + (major >= *(edge_partition.major_hypersparse_first()))) { auto major_hypersparse_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major); if (major_hypersparse_idx) { thrust::tie(indices1, weights1, local_degree1) = edge_partition.local_edges( - (*major_hypersparse_first - edge_partition.major_range_first()) + + (*(edge_partition.major_hypersparse_first()) - edge_partition.major_range_first()) + *major_hypersparse_idx); } } else { @@ -752,10 +757,6 @@ nbr_intersection(raft::handle_t const& handle, row_comm_size, col_comm_size, edge_partition, - edge_partition.dcs_nzd_vertex_count() - ? thrust::optional{edge_partition.major_range_first() + - (*segment_offsets)[3]} - : thrust::nullopt, reordered_idx_first, i, d_rx_reordered_group_lasts.data() + i * row_comm_size, @@ -793,10 +794,6 @@ nbr_intersection(raft::handle_t const& handle, row_comm_size, col_comm_size, edge_partition, - edge_partition.dcs_nzd_vertex_count() - ? thrust::optional{edge_partition.major_range_first() + - (*segment_offsets)[3]} - : thrust::nullopt, reordered_idx_first, i, d_rx_reordered_group_lasts.data() + i * row_comm_size, @@ -979,16 +976,11 @@ nbr_intersection(raft::handle_t const& handle, vertex_t, edge_t, weight_t, - true>{ - nullptr, - nullptr, - second_element_to_idx_map, - (*major_nbr_offsets).data(), - edge_partition, - edge_partition.dcs_nzd_vertex_count() - ? thrust::optional{edge_partition.major_range_first() + - (*segment_offsets)[3]} - : thrust::nullopt}); + true>{nullptr, + nullptr, + second_element_to_idx_map, + (*major_nbr_offsets).data(), + edge_partition}); } else { CUGRAPH_FAIL("unimplemented."); } @@ -1007,32 +999,27 @@ nbr_intersection(raft::handle_t const& handle, handle.get_stream()); if (intersect_minor_nbr[0] && intersect_minor_nbr[1]) { auto second_element_to_idx_map = (*major_to_idx_map_ptr)->get_device_view(); - thrust::tabulate( - handle.get_thrust_policy(), - rx_v_pair_nbr_intersection_sizes.begin(), - rx_v_pair_nbr_intersection_sizes.end(), - copy_intersecting_nbrs_and_update_intersection_size_t< - void*, - decltype(second_element_to_idx_map), - decltype(get_dataframe_buffer_begin(vertex_pair_buffer)), - vertex_t, - edge_t, - weight_t, - true>{nullptr, - nullptr, - nullptr, - second_element_to_idx_map, - (*major_nbr_offsets).data(), - (*major_nbr_indices).data(), - edge_partition, - edge_partition.dcs_nzd_vertex_count() - ? thrust::optional{edge_partition.major_range_first() + - (*segment_offsets)[3]} - : thrust::nullopt, - get_dataframe_buffer_begin(vertex_pair_buffer), - rx_v_pair_nbr_intersection_offsets.data(), - rx_v_pair_nbr_intersection_indices.data(), - invalid_vertex_id::value}); + thrust::tabulate(handle.get_thrust_policy(), + rx_v_pair_nbr_intersection_sizes.begin(), + rx_v_pair_nbr_intersection_sizes.end(), + copy_intersecting_nbrs_and_update_intersection_size_t< + void*, + decltype(second_element_to_idx_map), + decltype(get_dataframe_buffer_begin(vertex_pair_buffer)), + vertex_t, + edge_t, + weight_t, + true>{nullptr, + nullptr, + nullptr, + second_element_to_idx_map, + (*major_nbr_offsets).data(), + (*major_nbr_indices).data(), + edge_partition, + get_dataframe_buffer_begin(vertex_pair_buffer), + rx_v_pair_nbr_intersection_offsets.data(), + rx_v_pair_nbr_intersection_indices.data(), + invalid_vertex_id::value}); } else { CUGRAPH_FAIL("unimplemented."); } @@ -1252,7 +1239,7 @@ nbr_intersection(raft::handle_t const& handle, vertex_pair_first + input_size, nbr_intersection_sizes.begin(), pick_min_degree_t{ - nullptr, nullptr, nullptr, nullptr, edge_partition, thrust::nullopt}); + nullptr, nullptr, nullptr, nullptr, edge_partition}); } else { CUGRAPH_FAIL("unimplemented."); } @@ -1287,7 +1274,6 @@ nbr_intersection(raft::handle_t const& handle, nullptr, nullptr, edge_partition, - thrust::nullopt, vertex_pair_first, nbr_intersection_offsets.data(), nbr_intersection_indices.data(), diff --git a/cpp/src/prims/edge_partition_src_dst_property.cuh b/cpp/src/prims/edge_partition_src_dst_property.cuh index 3091a68889c..ac6a5fc0bad 100644 --- a/cpp/src/prims/edge_partition_src_dst_property.cuh +++ b/cpp/src/prims/edge_partition_src_dst_property.cuh @@ -94,7 +94,7 @@ class edge_partition_major_property_device_view_t { } assert((partition_idx == size_t{0}) || edge_partition_major_value_start_offsets_); - assert((partition_idx == size_t{0}) || edge_partition_value_firsts_); + assert((partition_idx == size_t{0}) || edge_partition_major_range_firsts_); this_edge_partition_value_first_ = value_first_ + (edge_partition_major_value_start_offsets_ ? (*edge_partition_major_value_start_offsets_)[partition_idx] diff --git a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh index 5d19cc5f01c..0eb766fe04d 100644 --- a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh +++ b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh @@ -25,9 +25,11 @@ #include #include #include +#include #include #include +#include #include #include @@ -54,6 +56,18 @@ namespace detail { int32_t constexpr per_v_transform_reduce_e_kernel_block_size = 512; +template +struct scatter_reduce_t { + OutputValueIterator output_value_first{}; + ReduceOp reduce_op{}; + + __device__ void operator()(thrust::tuple pair) const + { + *(output_value_first + thrust::get<0>(pair)) = + reduce_op(*(output_value_first + thrust::get<0>(pair)), thrust::get<1>(pair)); + } +}; + template edge_partition, - typename GraphViewType::vertex_type major_hypersparse_first, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, ResultValueOutputIteratorOrWrapper result_value_output, @@ -80,10 +93,10 @@ __global__ void per_v_transform_reduce_e_hypersparse( using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto major_start_offset = - static_cast(major_hypersparse_first - edge_partition.major_range_first()); - auto idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto major_start_offset = static_cast(*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()); + auto idx = static_cast(tid); auto dcs_nzd_vertex_count = *(edge_partition.dcs_nzd_vertex_count()); @@ -130,7 +143,7 @@ __global__ void per_v_transform_reduce_e_hypersparse( }; if constexpr (update_major) { - *(result_value_output + (major - major_hypersparse_first)) = + *(result_value_output + (major - *(edge_partition.major_hypersparse_first()))) = thrust::transform_reduce(thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), @@ -501,7 +514,21 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, } } - if constexpr (!update_major) { + if constexpr (update_major) { + size_t partition_idx = 0; + if constexpr (GraphViewType::is_multi_gpu) { + auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + auto const col_comm_rank = col_comm.get_rank(); + partition_idx = static_cast(col_comm_rank); + } + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(partition_idx); + if (segment_offsets) { // no vertices in the zero degree segment are visited + thrust::fill(handle.get_thrust_policy(), + vertex_value_output_first + *((*segment_offsets).rbegin() + 1), + vertex_value_output_first + *((*segment_offsets).rbegin()), + init); + } + } else { auto minor_init = init; if constexpr (GraphViewType::is_multi_gpu) { auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); @@ -509,11 +536,10 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, minor_init = (row_comm_rank == 0) ? init : T{}; } - auto execution_policy = handle.get_thrust_policy(); if constexpr (GraphViewType::is_multi_gpu) { minor_tmp_buffer.fill(handle, minor_init); } else { - thrust::fill(execution_policy, + thrust::fill(handle.get_thrust_policy(), vertex_value_output_first, vertex_value_output_first + graph_view.local_vertex_partition_range_size(), minor_init); @@ -536,8 +562,9 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, // update_major ? V / comm_size * sizeof(T) : 0 // and limit memory requirement to (E / comm_size) * sizeof(vertex_t) - size_t num_streams = std::min(static_cast(col_comm_size) * max_segments, - (handle.get_stream_pool_size() / max_segments) * max_segments); + size_t num_streams = + std::min(static_cast(col_comm_size) * max_segments, + raft::round_down_safe(handle.get_stream_pool_size(), max_segments)); if constexpr (update_major) { size_t value_size{0}; if constexpr (is_thrust_tuple_of_arithmetic::value) { @@ -573,10 +600,16 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, std::vector major_tmp_buffer_sizes(graph_view.number_of_local_edge_partitions(), size_t{0}); for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { - if constexpr (GraphViewType::is_storage_transposed) { - major_tmp_buffer_sizes[i] = graph_view.local_edge_partition_dst_range_size(i); + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); + if (segment_offsets) { + major_tmp_buffer_sizes[i] = + *((*segment_offsets).rbegin() + 1); // exclude the zero degree segment } else { - major_tmp_buffer_sizes[i] = graph_view.local_edge_partition_src_range_size(i); + if constexpr (GraphViewType::is_storage_transposed) { + major_tmp_buffer_sizes[i] = graph_view.local_edge_partition_dst_range_size(i); + } else { + major_tmp_buffer_sizes[i] = graph_view.local_edge_partition_src_range_size(i); + } } } if (stream_pool_indices) { @@ -658,14 +691,15 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, stream_pool_indices ? handle.get_stream_from_stream_pool((i * max_segments) % (*stream_pool_indices).size()) : handle.get_stream(); + if constexpr (update_major) { // this is necessary as we don't visit every vertex in the - // hypersparse segment in - // per_v_transform_reduce_e_hypersparse + // hypersparse segment thrust::fill(rmm::exec_policy(exec_stream), output_buffer + (*segment_offsets)[3], output_buffer + (*segment_offsets)[4], major_init); } + if (*(edge_partition.dcs_nzd_vertex_count()) > 0) { raft::grid_1d_thread_t update_grid(*(edge_partition.dcs_nzd_vertex_count()), detail::per_v_transform_reduce_e_kernel_block_size, @@ -675,7 +709,6 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, detail::per_v_transform_reduce_e_hypersparse <<>>( edge_partition, - edge_partition.major_range_first() + (*segment_offsets)[3], edge_partition_src_value_input_copy, edge_partition_dst_value_input_copy, segment_output_buffer, @@ -772,15 +805,89 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, auto const col_comm_size = col_comm.get_size(); if (segment_offsets && stream_pool_indices) { - if ((*segment_offsets).back() - (*segment_offsets)[3] > 0) { + if (edge_partition.dcs_nzd_vertex_count()) { +#if 0 // FIXME: P2P when expected local degree << col_comm_size, haven't confirmed this actually + // improves performance + auto exec_stream = handle.get_stream_from_stream_pool((i * max_segments) % + (*stream_pool_indices).size()); + + auto tx_size = (col_comm_rank == static_cast(i)) + ? size_t{0} + : static_cast(*(edge_partition.dcs_nzd_vertex_count())); + auto tx_value_buffer = allocate_dataframe_buffer(tx_size, exec_stream); + + if (tx_size > size_t{0}) { + auto map_first = thrust::make_transform_iterator( + *(edge_partition.dcs_nzd_vertices()), + shift_left_t{edge_partition.major_range_first()}); + thrust::gather(rmm::exec_policy(exec_stream), + map_first, + map_first + tx_size, + major_buffer_first, + get_dataframe_buffer_begin(tx_value_buffer)); + } + + auto rx_counts = + host_scalar_gather(col_comm, tx_size, static_cast(i), exec_stream); + std::vector rx_displs(rx_counts.size()); + std::exclusive_scan(rx_counts.begin(), rx_counts.end(), rx_displs.begin(), size_t{0}); + + // we may do this in multiple rounds if allocating the rx buffer becomes the memory + // bottleneck. + auto rx_size = (col_comm_rank == static_cast(i)) + ? rx_displs.back() + rx_counts.back() + : size_t{0}; + + rmm::device_uvector rx_vertices(rx_size, exec_stream); + device_gatherv(col_comm, + *(edge_partition.dcs_nzd_vertices()), + rx_vertices.begin(), + tx_size, + rx_counts, + rx_displs, + static_cast(i), + exec_stream); + auto rx_value_buffer = allocate_dataframe_buffer(rx_size, exec_stream); + device_gatherv(col_comm, + get_dataframe_buffer_begin(tx_value_buffer), + get_dataframe_buffer_begin(rx_value_buffer), + tx_size, + rx_counts, + rx_displs, + static_cast(i), + exec_stream); + + if (rx_size > size_t{0}) { + auto pair_first = thrust::make_zip_iterator(thrust::make_tuple( + thrust::make_transform_iterator( + rx_vertices.begin(), shift_left_t{edge_partition.major_range_first()}), + get_dataframe_buffer_begin(rx_value_buffer))); + thrust::for_each(rmm::exec_policy(exec_stream), + pair_first, + pair_first + rx_size, + scatter_reduce_t>{ + major_buffer_first, property_op{}}); + } + + if (col_comm_rank == static_cast(i)) { + thrust::copy(rmm::exec_policy(exec_stream), + major_buffer_first + (*segment_offsets)[3], + major_buffer_first + (*segment_offsets)[4], + vertex_value_output_first + (*segment_offsets)[3]); + } +#else device_reduce( col_comm, major_buffer_first + (*segment_offsets)[3], vertex_value_output_first + (*segment_offsets)[3], - (*segment_offsets).back() - (*segment_offsets)[3], + (*segment_offsets)[4] - (*segment_offsets)[3], raft::comms::op_t::SUM, - i, + static_cast(i), handle.get_stream_from_stream_pool((i * max_segments) % (*stream_pool_indices).size())); +#endif } if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) { device_reduce(col_comm, @@ -788,7 +895,7 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, vertex_value_output_first + (*segment_offsets)[2], (*segment_offsets)[3] - (*segment_offsets)[2], raft::comms::op_t::SUM, - i, + static_cast(i), handle.get_stream_from_stream_pool((i * max_segments + 1) % (*stream_pool_indices).size())); } @@ -798,7 +905,7 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, vertex_value_output_first + (*segment_offsets)[1], (*segment_offsets)[2] - (*segment_offsets)[1], raft::comms::op_t::SUM, - i, + static_cast(i), handle.get_stream_from_stream_pool((i * max_segments + 2) % (*stream_pool_indices).size())); } @@ -808,17 +915,20 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, vertex_value_output_first, (*segment_offsets)[1], raft::comms::op_t::SUM, - i, + static_cast(i), handle.get_stream_from_stream_pool((i * max_segments + 3) % (*stream_pool_indices).size())); } } else { + size_t reduction_size = static_cast( + segment_offsets ? *((*segment_offsets).rbegin() + 1) /* exclude the zero degree segment */ + : edge_partition.major_range_size()); device_reduce(col_comm, major_buffer_first, vertex_value_output_first, - edge_partition.major_range_size(), + reduction_size, raft::comms::op_t::SUM, - i, + static_cast(i), handle.get_stream()); } } diff --git a/cpp/src/prims/transform_reduce_e.cuh b/cpp/src/prims/transform_reduce_e.cuh index d3ec9d238c0..faad93d9f22 100644 --- a/cpp/src/prims/transform_reduce_e.cuh +++ b/cpp/src/prims/transform_reduce_e.cuh @@ -57,7 +57,6 @@ __global__ void trasnform_reduce_e_hypersparse( typename GraphViewType::edge_type, typename GraphViewType::weight_type, GraphViewType::is_multi_gpu> edge_partition, - typename GraphViewType::vertex_type major_hypersparse_first, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, ResultIterator result_iter /* size 1 */, @@ -68,10 +67,10 @@ __global__ void trasnform_reduce_e_hypersparse( using weight_t = typename GraphViewType::weight_type; using e_op_result_t = typename std::iterator_traits::value_type; - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto major_start_offset = - static_cast(major_hypersparse_first - edge_partition.major_range_first()); - size_t idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto major_start_offset = static_cast(*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()); + size_t idx = static_cast(tid); auto dcs_nzd_vertex_count = *(edge_partition.dcs_nzd_vertex_count()); @@ -494,7 +493,6 @@ T transform_reduce_e(raft::handle_t const& handle, detail::trasnform_reduce_e_hypersparse <<>>( edge_partition, - edge_partition.major_range_first() + (*segment_offsets)[3], edge_partition_src_value_input_copy, edge_partition_dst_value_input_copy, get_dataframe_buffer_begin(result_buffer), diff --git a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh index b099bc07cc8..ca754798490 100644 --- a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh +++ b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh @@ -102,7 +102,6 @@ __global__ void transform_reduce_by_src_dst_key_hypersparse( typename GraphViewType::edge_type, typename GraphViewType::weight_type, GraphViewType::is_multi_gpu> edge_partition, - typename GraphViewType::vertex_type major_hypersparse_first, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionSrcDstKeyInputWrapper edge_partition_src_dst_key_input, @@ -114,10 +113,10 @@ __global__ void transform_reduce_by_src_dst_key_hypersparse( using edge_t = typename GraphViewType::edge_type; using weight_t = typename GraphViewType::weight_type; - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto major_start_offset = - static_cast(major_hypersparse_first - edge_partition.major_range_first()); - auto idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto major_start_offset = static_cast(*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()); + auto idx = static_cast(tid); auto dcs_nzd_vertex_count = *(edge_partition.dcs_nzd_vertex_count()); @@ -480,7 +479,6 @@ transform_reduce_e_by_src_dst_key( detail::transform_reduce_by_src_dst_key_hypersparse <<>>( edge_partition, - edge_partition.major_range_first() + (*segment_offsets)[3], edge_partition_src_value_input_copy, edge_partition_dst_value_input_copy, edge_partition_src_dst_key_input_copy, diff --git a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh index f9eda71cb2d..8632787db91 100644 --- a/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh +++ b/cpp/src/prims/transform_reduce_v_frontier_outgoing_e_by_dst.cuh @@ -190,7 +190,6 @@ __global__ void update_v_frontier_from_outgoing_e_hypersparse( typename GraphViewType::edge_type, typename GraphViewType::weight_type, GraphViewType::is_multi_gpu> edge_partition, - typename GraphViewType::vertex_type major_hypersparse_first, KeyIterator key_first, KeyIterator key_last, EdgePartitionSrcValueInputWrapper edge_partition_src_value_input, @@ -217,12 +216,12 @@ __global__ void update_v_frontier_from_outgoing_e_hypersparse( static_assert(!GraphViewType::is_storage_transposed, "GraphViewType should support the push model."); - auto const tid = threadIdx.x + blockIdx.x * blockDim.x; - auto const warp_id = threadIdx.x / raft::warp_size(); - auto const lane_id = tid % raft::warp_size(); - auto src_start_offset = - static_cast(major_hypersparse_first - edge_partition.major_range_first()); - auto idx = static_cast(tid); + auto const tid = threadIdx.x + blockIdx.x * blockDim.x; + auto const warp_id = threadIdx.x / raft::warp_size(); + auto const lane_id = tid % raft::warp_size(); + auto src_start_offset = static_cast(*(edge_partition.major_hypersparse_first()) - + edge_partition.major_range_first()); + auto idx = static_cast(tid); __shared__ edge_t warp_local_degree_inclusive_sums[update_v_frontier_from_outgoing_e_kernel_block_size]; @@ -830,7 +829,6 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); - auto execution_policy = handle.get_thrust_policy(); if constexpr (GraphViewType::is_multi_gpu) { auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); @@ -844,21 +842,14 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( handle.get_stream()); auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); - auto use_dcs = - segment_offsets - ? ((*segment_offsets).size() > (detail::num_sparse_segments_per_vertex_partition + 1)) - : false; - - ret += use_dcs + ret += graph_view.use_dcs() ? thrust::transform_reduce( - execution_policy, + handle.get_thrust_policy(), frontier_vertices.begin(), frontier_vertices.end(), [edge_partition, major_hypersparse_first = - edge_partition.major_range_first() + - (*segment_offsets) - [detail::num_sparse_segments_per_vertex_partition]] __device__(auto major) { + *(edge_partition.major_hypersparse_first())] __device__(auto major) { if (major < major_hypersparse_first) { auto major_offset = edge_partition.major_offset_from_major_nocheck(major); return edge_partition.local_degree(major_offset); @@ -876,7 +867,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( edge_t{0}, thrust::plus()) : thrust::transform_reduce( - execution_policy, + handle.get_thrust_policy(), frontier_vertices.begin(), frontier_vertices.end(), [edge_partition] __device__(auto major) { @@ -888,7 +879,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier( } else { assert(i == 0); ret += thrust::transform_reduce( - execution_policy, + handle.get_thrust_policy(), local_frontier_vertex_first, local_frontier_vertex_first + cur_frontier_bucket.size(), [edge_partition] __device__(auto major) { @@ -1060,47 +1051,40 @@ transform_reduce_v_frontier_outgoing_e_by_dst( } auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); - auto use_dcs = - segment_offsets - ? ((*segment_offsets).size() > (detail::num_sparse_segments_per_vertex_partition + 1)) - : false; - - auto execution_policy = handle.get_thrust_policy(); auto max_pushes = - use_dcs ? thrust::transform_reduce( - execution_policy, - edge_partition_frontier_src_first, - edge_partition_frontier_src_last, - [edge_partition, - major_hypersparse_first = - edge_partition.major_range_first() + - (*segment_offsets) - [detail::num_sparse_segments_per_vertex_partition]] __device__(auto src) { - if (src < major_hypersparse_first) { - auto src_offset = edge_partition.major_offset_from_major_nocheck(src); - return edge_partition.local_degree(src_offset); - } else { - auto src_hypersparse_idx = - edge_partition.major_hypersparse_idx_from_major_nocheck(src); - return src_hypersparse_idx ? edge_partition.local_degree( - edge_partition.major_offset_from_major_nocheck( - major_hypersparse_first) + - *src_hypersparse_idx) - : edge_t{0}; - } - }, - edge_t{0}, - thrust::plus()) - : thrust::transform_reduce( - execution_policy, - edge_partition_frontier_src_first, - edge_partition_frontier_src_last, - [edge_partition] __device__(auto src) { - auto src_offset = edge_partition.major_offset_from_major_nocheck(src); - return edge_partition.local_degree(src_offset); - }, - edge_t{0}, - thrust::plus()); + graph_view.use_dcs() + ? thrust::transform_reduce( + handle.get_thrust_policy(), + edge_partition_frontier_src_first, + edge_partition_frontier_src_last, + [edge_partition, + major_hypersparse_first = + *(edge_partition.major_hypersparse_first())] __device__(auto src) { + if (src < major_hypersparse_first) { + auto src_offset = edge_partition.major_offset_from_major_nocheck(src); + return edge_partition.local_degree(src_offset); + } else { + auto src_hypersparse_idx = + edge_partition.major_hypersparse_idx_from_major_nocheck(src); + return src_hypersparse_idx ? edge_partition.local_degree( + edge_partition.major_offset_from_major_nocheck( + major_hypersparse_first) + + *src_hypersparse_idx) + : edge_t{0}; + } + }, + edge_t{0}, + thrust::plus()) + : thrust::transform_reduce( + handle.get_thrust_policy(), + edge_partition_frontier_src_first, + edge_partition_frontier_src_last, + [edge_partition] __device__(auto src) { + auto src_offset = edge_partition.major_offset_from_major_nocheck(src); + return edge_partition.local_degree(src_offset); + }, + edge_t{0}, + thrust::plus()); auto new_buffer_size = buffer_idx.value(handle.get_stream()) + max_pushes; resize_dataframe_buffer(key_buffer, new_buffer_size, handle.get_stream()); @@ -1115,10 +1099,12 @@ transform_reduce_v_frontier_outgoing_e_by_dst( if (segment_offsets) { static_assert(detail::num_sparse_segments_per_vertex_partition == 3); std::vector h_thresholds(detail::num_sparse_segments_per_vertex_partition + - (use_dcs ? 1 : 0) - 1); + (graph_view.use_dcs() ? 1 : 0) - 1); h_thresholds[0] = edge_partition.major_range_first() + (*segment_offsets)[1]; h_thresholds[1] = edge_partition.major_range_first() + (*segment_offsets)[2]; - if (use_dcs) { h_thresholds[2] = edge_partition.major_range_first() + (*segment_offsets)[3]; } + if (graph_view.use_dcs()) { + h_thresholds[2] = edge_partition.major_range_first() + (*segment_offsets)[3]; + } rmm::device_uvector d_thresholds(h_thresholds.size(), handle.get_stream()); raft::update_device( d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), handle.get_stream()); @@ -1195,7 +1181,6 @@ transform_reduce_v_frontier_outgoing_e_by_dst( detail::update_v_frontier_from_outgoing_e_hypersparse <<>>( edge_partition, - edge_partition.major_range_first() + (*segment_offsets)[3], get_dataframe_buffer_begin(edge_partition_frontier_key_buffer) + h_offsets[2], get_dataframe_buffer_begin(edge_partition_frontier_key_buffer) + h_offsets[3], edge_partition_src_value_input_copy, diff --git a/cpp/src/prims/update_edge_partition_src_dst_property.cuh b/cpp/src/prims/update_edge_partition_src_dst_property.cuh index 3b13bf192b5..8cee40d3438 100644 --- a/cpp/src/prims/update_edge_partition_src_dst_property.cuh +++ b/cpp/src/prims/update_edge_partition_src_dst_property.cuh @@ -274,9 +274,11 @@ void update_edge_partition_minor_property( { if constexpr (GraphViewType::is_multi_gpu) { using vertex_t = typename GraphViewType::vertex_type; + using value_t = typename thrust::iterator_traits::value_type; auto& comm = handle.get_comms(); auto const comm_rank = comm.get_rank(); + auto const comm_size = comm.get_size(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); @@ -292,15 +294,83 @@ void update_edge_partition_minor_property( key_offsets = *(graph_view.local_sorted_unique_edge_dst_vertex_partition_offsets()); } +#if 0 // FIXME: should not directly call ncclGroupStart(), ncclGroupEnd(), enable after adding a + // raft::comms mechanism to group broadcast operations + // memory footprint vs parallelism trade-off + // memory requirement per loop is + // (V/comm_size) * sizeof(value_t) + // and limit memory requirement to (E / comm_size) * sizeof(vertex_t) + auto num_concurrent_bcasts = + (static_cast(graph_view.number_of_edges() / comm_size) * sizeof(vertex_t)) / + std::max(static_cast(graph_view.number_of_vertices() / comm_size) * sizeof(value_t), + size_t{1}); + num_concurrent_bcasts = std::max(num_concurrent_bcasts, size_t{1}); + num_concurrent_bcasts = std::min(num_concurrent_bcasts, static_cast(row_comm_size)); + auto num_rounds = (static_cast(row_comm_size) + num_concurrent_bcasts - size_t{1}) / + num_concurrent_bcasts; + + std::vector(size_t{0}, handle.get_stream()))> + rx_value_buffers{}; + rx_value_buffers.reserve(num_concurrent_bcasts); + for (size_t i = 0; i < num_concurrent_bcasts; ++i) { + size_t max_size{0}; + for (size_t round = 0; round < num_rounds; ++round) { + auto j = num_rounds * i + round; + if (j < static_cast(row_comm_size)) { + max_size = std::max(max_size, + static_cast(graph_view.vertex_partition_range_size( + col_comm_rank * row_comm_size + j))); + } + } + rx_value_buffers.push_back( + allocate_dataframe_buffer(max_size, handle.get_stream())); + } + + for (size_t round = 0; round < num_rounds; ++round) { + // FIXME: better use a higher-level interface than directly invoking NCCL functions. + if (ncclGroupStart() != ncclSuccess) { + CUGRAPH_FAIL("ncclGroupStart failure."); + } + for (size_t i = 0; i < num_concurrent_bcasts; ++i) { + auto j = num_rounds * i + round; + if (j < static_cast(row_comm_size)) { + auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffers[i]); + device_bcast(row_comm, + vertex_property_input_first, + rx_value_first, + graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + j), + j, + handle.get_stream()); + } + } + // FIXME: better use a higher-level interface than directly invoking NCCL functions. + if (ncclGroupEnd() != ncclSuccess) { + CUGRAPH_FAIL("ncclGroupEnd failure."); + } + for (size_t i = 0; i < num_concurrent_bcasts; ++i) { + auto j = num_rounds * i + round; + if (j < static_cast(row_comm_size)) { + auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffers[i]); + auto v_offset_first = thrust::make_transform_iterator( + (*(edge_partition_minor_property_output.keys())).begin() + key_offsets[j], + [v_first = graph_view.vertex_partition_range_first( + col_comm_rank * row_comm_size + j)] __device__(auto v) { return v - v_first; }); + thrust::gather(handle.get_thrust_policy(), + v_offset_first, + v_offset_first + (key_offsets[j + 1] - key_offsets[j]), + rx_value_first, + edge_partition_minor_property_output.value_first() + key_offsets[j]); + } + } + } +#else vertex_t max_rx_size{0}; for (int i = 0; i < row_comm_size; ++i) { max_rx_size = std::max( max_rx_size, graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + i)); } - auto rx_value_buffer = allocate_dataframe_buffer< - typename std::iterator_traits::value_type>( - max_rx_size, handle.get_stream()); - auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffer); + auto rx_value_buffer = allocate_dataframe_buffer(max_rx_size, handle.get_stream()); + auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffer); for (int i = 0; i < row_comm_size; ++i) { device_bcast(row_comm, vertex_property_input_first, @@ -319,6 +389,7 @@ void update_edge_partition_minor_property( rx_value_first, edge_partition_minor_property_output.value_first() + key_offsets[i]); } +#endif } else { std::vector rx_counts(row_comm_size, size_t{0}); std::vector displacements(row_comm_size, size_t{0}); diff --git a/cpp/src/sampling/detail/sampling_utils_impl.cuh b/cpp/src/sampling/detail/sampling_utils_impl.cuh index 793df64a8d6..8a88b274c94 100644 --- a/cpp/src/sampling/detail/sampling_utils_impl.cuh +++ b/cpp/src/sampling/detail/sampling_utils_impl.cuh @@ -80,14 +80,11 @@ rmm::device_uvector compute_local_major_degre graph_view.local_edge_partition_view(i)); // Check if hypersparse segment is present in the partition - auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); - auto use_dcs = segment_offsets - ? ((*segment_offsets).size() > (num_sparse_segments_per_vertex_partition + 1)) - : false; + if (graph_view.use_dcs()) { + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); - if (use_dcs) { auto num_sparse_vertices = (*segment_offsets)[num_sparse_segments_per_vertex_partition]; - auto major_hypersparse_first = edge_partition.major_range_first() + num_sparse_vertices; + auto major_hypersparse_first = *(edge_partition.major_hypersparse_first()); // Calculate degrees in sparse region auto sparse_begin = local_degrees.begin() + partial_offset; @@ -332,13 +329,9 @@ partition_information(raft::handle_t const& handle, GraphViewType const& graph_v id_begin.push_back(edge_partition.major_range_first()); id_end.push_back(edge_partition.major_range_last()); - auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); - auto use_dcs = segment_offsets - ? ((*segment_offsets).size() > (num_sparse_segments_per_vertex_partition + 1)) - : false; - if (use_dcs) { - auto major_hypersparse_first = edge_partition.major_range_first() + - (*segment_offsets)[num_sparse_segments_per_vertex_partition]; + if (graph_view.use_dcs()) { + auto segment_offsets = graph_view.local_edge_partition_segment_offsets(i); + auto major_hypersparse_first = *(edge_partition.major_hypersparse_first()); hypersparse_begin.push_back(major_hypersparse_first); } else { hypersparse_begin.push_back(edge_partition.major_range_last()); @@ -548,7 +541,7 @@ gather_local_edges( auto input_iter = thrust::make_zip_iterator( thrust::make_tuple(majors.begin(), minors.begin(), weights->begin())); - CUGRAPH_EXPECTS(minors.size() < std::numeric_limits::max(), + CUGRAPH_EXPECTS(minors.size() < static_cast(std::numeric_limits::max()), "remove_if will fail, minors.size() is too large"); // FIXME: remove_if has a 32-bit overflow issue (https://github.com/NVIDIA/thrust/issues/1302) @@ -569,7 +562,7 @@ gather_local_edges( } else { auto input_iter = thrust::make_zip_iterator(thrust::make_tuple(majors.begin(), minors.begin())); - CUGRAPH_EXPECTS(minors.size() < std::numeric_limits::max(), + CUGRAPH_EXPECTS(minors.size() < static_cast(std::numeric_limits::max()), "remove_if will fail, minors.size() is too large"); auto compacted_length = thrust::distance( @@ -666,7 +659,7 @@ std::vector get_active_major_segments(raft::handle_t const& handle, template void local_major_degree( raft::handle_t const& handle, - edge_partition_device_view_t partition, + edge_partition_device_view_t edge_partition, rmm::device_uvector const& active_majors, std::vector const& majors_segments, std::vector const& partition_segments, @@ -679,30 +672,31 @@ void local_major_degree( active_majors.cbegin() + majors_segments[0], active_majors.cbegin() + majors_segments[3], out_degrees, - [partition] __device__(auto major) { - auto major_offset = partition.major_offset_from_major_nocheck(major); - return partition.local_degree(major_offset); + [edge_partition] __device__(auto major) { + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); + return edge_partition.local_degree(major_offset); }); } // Hypersparse region - if (majors_segments[4] - majors_segments[3] > 0) { - auto major_hypersparse_first = - partition.major_range_first() + - partition_segments[detail::num_sparse_segments_per_vertex_partition]; - auto major_offset = - static_cast(major_hypersparse_first - partition.major_range_first()); - thrust::transform(handle.get_thrust_policy(), - active_majors.cbegin() + majors_segments[3], - active_majors.cbegin() + majors_segments[4], - out_degrees + majors_segments[3] - majors_segments[0], - [partition, major_offset] __device__(auto major) { - auto major_idx = partition.major_hypersparse_idx_from_major_nocheck(major); - if (major_idx) { - return partition.local_degree(major_offset + *major_idx); - } else { - return edge_t{0}; - } - }); + if (edge_partition.dcs_nzd_vertex_count()) { + if (majors_segments[4] - majors_segments[3] > 0) { + auto major_hypersparse_first = *(edge_partition.major_hypersparse_first()); + auto major_offset = + static_cast(major_hypersparse_first - edge_partition.major_range_first()); + thrust::transform(handle.get_thrust_policy(), + active_majors.cbegin() + majors_segments[3], + active_majors.cbegin() + majors_segments[4], + out_degrees + majors_segments[3] - majors_segments[0], + [edge_partition, major_offset] __device__(auto major) { + auto major_idx = + edge_partition.major_hypersparse_idx_from_major_nocheck(major); + if (major_idx) { + return edge_partition.local_degree(major_offset + *major_idx); + } else { + return edge_t{0}; + } + }); + } } } diff --git a/cpp/src/structure/graph_impl.cuh b/cpp/src/structure/graph_impl.cuh index 271aece43c0..12e5a5dc234 100644 --- a/cpp/src/structure/graph_impl.cuh +++ b/cpp/src/structure/graph_impl.cuh @@ -304,8 +304,8 @@ std::enable_if_t check_graph_constructor_input_arguments( CUGRAPH_EXPECTS( !(meta.segment_offsets).has_value() || ((*(meta.segment_offsets)).size() == - (detail::num_sparse_segments_per_vertex_partition + 1)) || - ((*(meta.segment_offsets)).size() == (detail::num_sparse_segments_per_vertex_partition + 2)), + (detail::num_sparse_segments_per_vertex_partition + 2)) || + ((*(meta.segment_offsets)).size() == (detail::num_sparse_segments_per_vertex_partition + 3)), "Invalid input argument: (*(meta.segment_offsets)).size() returns an invalid value."); auto is_weighted = edgelists[0].p_edge_weights.has_value(); @@ -401,7 +401,7 @@ std::enable_if_t check_graph_constructor_input_arguments( CUGRAPH_EXPECTS( !meta.segment_offsets.has_value() || - ((*(meta.segment_offsets)).size() == (detail::num_sparse_segments_per_vertex_partition + 1)), + ((*(meta.segment_offsets)).size() == (detail::num_sparse_segments_per_vertex_partition + 2)), "Invalid input argument: (*(meta.segment_offsets)).size() returns an invalid value."); // optional expensive checks @@ -491,7 +491,7 @@ update_local_sorted_unique_edge_majors_minors( auto use_dcs = meta.segment_offsets - ? ((*(meta.segment_offsets)).size() > (detail::num_sparse_segments_per_vertex_partition + 1)) + ? ((*(meta.segment_offsets)).size() > (detail::num_sparse_segments_per_vertex_partition + 2)) : false; std::optional>> local_sorted_unique_edge_majors{ @@ -823,7 +823,7 @@ compress_edgelist(edgelist_t const& edgelist, thrust::make_tuple((*dcs_nzd_vertices).begin(), offsets.begin() + (*major_hypersparse_first - major_range_first))); CUGRAPH_EXPECTS( - (*dcs_nzd_vertices).size() < std::numeric_limits::max(), + (*dcs_nzd_vertices).size() < static_cast(std::numeric_limits::max()), "remove_if will fail (https://github.com/NVIDIA/thrust/issues/1302), work-around required."); (*dcs_nzd_vertices) .resize(thrust::distance(pair_first, @@ -972,7 +972,7 @@ graph_t (detail::num_sparse_segments_per_vertex_partition + 1)) + ? ((*(meta.segment_offsets)).size() > (detail::num_sparse_segments_per_vertex_partition + 2)) : false; check_graph_constructor_input_arguments( @@ -1111,7 +1111,7 @@ graph_t (detail::num_sparse_segments_per_vertex_partition + 1)) + ? ((*(meta.segment_offsets)).size() > (detail::num_sparse_segments_per_vertex_partition + 2)) : false; std::vector> edgelists(edgelist_src_partitions.size()); diff --git a/cpp/src/structure/graph_view_impl.cuh b/cpp/src/structure/graph_view_impl.cuh index fcb58a77ec2..ce562e009db 100644 --- a/cpp/src/structure/graph_view_impl.cuh +++ b/cpp/src/structure/graph_view_impl.cuh @@ -80,15 +80,18 @@ std::vector update_edge_partition_edge_counts( auto use_dcs = edge_partition_dcs_nzd_vertex_counts.has_value(); for (size_t i = 0; i < edge_partition_offsets.size(); ++i) { auto [major_range_first, major_range_last] = partition.local_edge_partition_major_range(i); - raft::update_host(&(edge_partition_edge_counts[i]), - edge_partition_offsets[i] + - (use_dcs ? ((*edge_partition_segment_offsets) - [(detail::num_sparse_segments_per_vertex_partition + 2) * i + - detail::num_sparse_segments_per_vertex_partition] + - (*edge_partition_dcs_nzd_vertex_counts)[i]) - : (major_range_last - major_range_first)), - 1, - stream); + auto segment_offset_size_per_partition = + (*edge_partition_segment_offsets).size() / edge_partition_offsets.size(); + raft::update_host( + &(edge_partition_edge_counts[i]), + edge_partition_offsets[i] + + (use_dcs + ? ((*edge_partition_segment_offsets)[segment_offset_size_per_partition * i + + detail::num_sparse_segments_per_vertex_partition] + + (*edge_partition_dcs_nzd_vertex_counts)[i]) + : (major_range_last - major_range_first)), + 1, + stream); } RAFT_CUDA_TRY(cudaStreamSynchronize(stream)); return edge_partition_edge_counts; @@ -132,11 +135,14 @@ rmm::device_uvector compute_major_degrees( std::tie(major_range_first, major_range_last) = partition.vertex_partition_range(vertex_partition_idx); auto p_offsets = edge_partition_offsets[i]; + auto segment_offset_size_per_partition = + (*edge_partition_segment_offsets).size() / static_cast(col_comm_size); auto major_hypersparse_first = - use_dcs ? major_range_first + (*edge_partition_segment_offsets) - [(detail::num_sparse_segments_per_vertex_partition + 2) * i + - detail::num_sparse_segments_per_vertex_partition] - : major_range_last; + use_dcs + ? major_range_first + + (*edge_partition_segment_offsets)[segment_offset_size_per_partition * i + + detail::num_sparse_segments_per_vertex_partition] + : major_range_last; auto execution_policy = handle.get_thrust_policy(); thrust::transform(execution_policy, thrust::make_counting_iterator(vertex_t{0}), @@ -512,7 +518,7 @@ graph_view_t, std::vector> compute_renumbe static_assert(detail::num_sparse_segments_per_vertex_partition == 3); static_assert((detail::low_degree_threshold <= detail::mid_degree_threshold) && (detail::mid_degree_threshold <= std::numeric_limits::max())); + static_assert(detail::low_degree_threshold >= 1); + static_assert((detail::hypersparse_threshold_ratio >= 0.0) && + (detail::hypersparse_threshold_ratio <= 1.0)); size_t mid_degree_threshold{detail::mid_degree_threshold}; size_t low_degree_threshold{detail::low_degree_threshold}; size_t hypersparse_degree_threshold{0}; @@ -407,24 +410,26 @@ std::tuple, std::vector> compute_renumbe } auto num_segments_per_vertex_partition = detail::num_sparse_segments_per_vertex_partition + - (hypersparse_degree_threshold > 0 ? size_t{1} : size_t{0}); + (hypersparse_degree_threshold > 0 ? size_t{2} : size_t{1}); // last is 0-degree segment rmm::device_uvector d_thresholds(num_segments_per_vertex_partition - 1, handle.get_stream()); - auto h_thresholds = hypersparse_degree_threshold > 0 - ? std::vector{static_cast(mid_degree_threshold), - static_cast(low_degree_threshold), - static_cast(hypersparse_degree_threshold)} - : std::vector{static_cast(mid_degree_threshold), - static_cast(low_degree_threshold)}; + auto h_thresholds = + hypersparse_degree_threshold > 0 + ? std::vector{static_cast(mid_degree_threshold), + static_cast(low_degree_threshold), + static_cast(hypersparse_degree_threshold), + std::min(static_cast(hypersparse_degree_threshold), edge_t{1})} + : std::vector{static_cast(mid_degree_threshold), + static_cast(low_degree_threshold), + edge_t{1}}; raft::update_device( d_thresholds.data(), h_thresholds.data(), h_thresholds.size(), handle.get_stream()); rmm::device_uvector d_segment_offsets(num_segments_per_vertex_partition + 1, handle.get_stream()); - auto zero_vertex = vertex_t{0}; auto vertex_count = static_cast(sorted_local_vertices.size()); - d_segment_offsets.set_element_async(0, zero_vertex, handle.get_stream()); + d_segment_offsets.set_element_to_zero_async(0, handle.get_stream()); d_segment_offsets.set_element_async( num_segments_per_vertex_partition, vertex_count, handle.get_stream()); diff --git a/cpp/tests/link_analysis/mg_hits_test.cpp b/cpp/tests/link_analysis/mg_hits_test.cpp index 6facb48f242..acd12a4015c 100644 --- a/cpp/tests/link_analysis/mg_hits_test.cpp +++ b/cpp/tests/link_analysis/mg_hits_test.cpp @@ -289,7 +289,7 @@ INSTANTIATE_TEST_SUITE_P( Tests_MGHits_Rmat, ::testing::Combine( // disable correctness checks for large graphs - ::testing::Values(Hits_Usecase{false, false}, Hits_Usecase{true, false}), + ::testing::Values(Hits_Usecase{false, false}), ::testing::Values( cugraph::test::Rmat_Usecase(20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); diff --git a/cpp/tests/link_analysis/mg_pagerank_test.cpp b/cpp/tests/link_analysis/mg_pagerank_test.cpp index 27f608b9f8b..75dba1202ce 100644 --- a/cpp/tests/link_analysis/mg_pagerank_test.cpp +++ b/cpp/tests/link_analysis/mg_pagerank_test.cpp @@ -95,7 +95,7 @@ class Tests_MGPageRank handle.get_comms().barrier(); double elapsed_time{0.0}; hr_clock.stop(&elapsed_time); - std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s.\n"; + std::cout << "MG construct_graph took " << elapsed_time * 1e-6 << " s." << std::endl; } auto mg_graph_view = mg_graph.view(); diff --git a/cpp/tests/structure/mg_count_self_loops_and_multi_edges_test.cpp b/cpp/tests/structure/mg_count_self_loops_and_multi_edges_test.cpp index 4a56b4c0946..df0183b61f6 100644 --- a/cpp/tests/structure/mg_count_self_loops_and_multi_edges_test.cpp +++ b/cpp/tests/structure/mg_count_self_loops_and_multi_edges_test.cpp @@ -171,21 +171,21 @@ TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_File, CheckInt32Int32FloatTransposeFa run_current_test(std::get<0>(param), std::get<1>(param)); } -TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int32FloaTransposeFalse) +TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int32FloatTransposeFalse) { auto param = GetParam(); run_current_test( std::get<0>(param), override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); } -TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int64FloaTransposeFalse) +TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int64FloatTransposeFalse) { auto param = GetParam(); run_current_test( std::get<0>(param), override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); } -TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt64Int64FloaTransposeFalse) +TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt64Int64FloatTransposeFalse) { auto param = GetParam(); run_current_test( @@ -198,7 +198,7 @@ TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_File, CheckInt32Int32FloatTransposeTr run_current_test(std::get<0>(param), std::get<1>(param)); } -TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int32FloaTransposeTrue) +TEST_P(Tests_MGCountSelfLoopsAndMultiEdges_Rmat, CheckInt32Int32FloatTransposeTrue) { auto param = GetParam(); run_current_test( @@ -210,20 +210,14 @@ INSTANTIATE_TEST_SUITE_P( Tests_MGCountSelfLoopsAndMultiEdges_File, ::testing::Combine( // enable correctness checks - ::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}), + ::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{}), ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx"), cugraph::test::File_Usecase("test/datasets/webbase-1M.mtx")))); INSTANTIATE_TEST_SUITE_P( rmat_small_tests, Tests_MGCountSelfLoopsAndMultiEdges_Rmat, - ::testing::Combine(::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}), + ::testing::Combine(::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{}), ::testing::Values(cugraph::test::Rmat_Usecase( 10, 16, 0.57, 0.19, 0.19, 0, false, false, 0, true)))); @@ -234,10 +228,7 @@ INSTANTIATE_TEST_SUITE_P( include more than one Rmat_Usecase that differ only in scale or edge factor (to avoid running same benchmarks more than once) */ Tests_MGCountSelfLoopsAndMultiEdges_Rmat, - ::testing::Combine(::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}, - CountSelfLoopsAndMultiEdges_Usecase{}), + ::testing::Combine(::testing::Values(CountSelfLoopsAndMultiEdges_Usecase{false}), ::testing::Values(cugraph::test::Rmat_Usecase( 20, 32, 0.57, 0.19, 0.19, 0, false, false, 0, true))));