diff --git a/cpp/include/cugraph/utilities/device_comm.cuh b/cpp/include/cugraph/utilities/device_comm.cuh index 6d342aad06e..27e3512ccf5 100644 --- a/cpp/include/cugraph/utilities/device_comm.cuh +++ b/cpp/include/cugraph/utilities/device_comm.cuh @@ -1091,4 +1091,8 @@ device_gatherv(raft::comms::comms_t const& comm, .run(comm, input_first, output_first, sendcount, recvcounts, displacements, root, stream_view); } +inline void device_group_start(raft::comms::comms_t const& comm) { comm.group_start(); } + +inline void device_group_end(raft::comms::comms_t const& comm) { comm.group_end(); } + } // namespace cugraph diff --git a/cpp/src/prims/update_edge_partition_src_dst_property.cuh b/cpp/src/prims/update_edge_partition_src_dst_property.cuh index 8cee40d3438..ffa1baf37fa 100644 --- a/cpp/src/prims/update_edge_partition_src_dst_property.cuh +++ b/cpp/src/prims/update_edge_partition_src_dst_property.cuh @@ -294,8 +294,6 @@ void update_edge_partition_minor_property( key_offsets = *(graph_view.local_sorted_unique_edge_dst_vertex_partition_offsets()); } -#if 0 // FIXME: should not directly call ncclGroupStart(), ncclGroupEnd(), enable after adding a - // raft::comms mechanism to group broadcast operations // memory footprint vs parallelism trade-off // memory requirement per loop is // (V/comm_size) * sizeof(value_t) @@ -327,10 +325,7 @@ void update_edge_partition_minor_property( } for (size_t round = 0; round < num_rounds; ++round) { - // FIXME: better use a higher-level interface than directly invoking NCCL functions. - if (ncclGroupStart() != ncclSuccess) { - CUGRAPH_FAIL("ncclGroupStart failure."); - } + device_group_start(row_comm); for (size_t i = 0; i < num_concurrent_bcasts; ++i) { auto j = num_rounds * i + round; if (j < static_cast(row_comm_size)) { @@ -343,10 +338,8 @@ void update_edge_partition_minor_property( handle.get_stream()); } } - // FIXME: better use a higher-level interface than directly invoking NCCL functions. - if (ncclGroupEnd() != ncclSuccess) { - CUGRAPH_FAIL("ncclGroupEnd failure."); - } + device_group_end(row_comm); + for (size_t i = 0; i < num_concurrent_bcasts; ++i) { auto j = num_rounds * i + round; if (j < static_cast(row_comm_size)) { @@ -363,33 +356,6 @@ void update_edge_partition_minor_property( } } } -#else - vertex_t max_rx_size{0}; - for (int i = 0; i < row_comm_size; ++i) { - max_rx_size = std::max( - max_rx_size, graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + i)); - } - auto rx_value_buffer = allocate_dataframe_buffer(max_rx_size, handle.get_stream()); - auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffer); - for (int i = 0; i < row_comm_size; ++i) { - device_bcast(row_comm, - vertex_property_input_first, - rx_value_first, - graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + i), - i, - handle.get_stream()); - - auto v_offset_first = thrust::make_transform_iterator( - (*(edge_partition_minor_property_output.keys())).begin() + key_offsets[i], - [v_first = graph_view.vertex_partition_range_first( - col_comm_rank * row_comm_size + i)] __device__(auto v) { return v - v_first; }); - thrust::gather(handle.get_thrust_policy(), - v_offset_first, - v_offset_first + (key_offsets[i + 1] - key_offsets[i]), - rx_value_first, - edge_partition_minor_property_output.value_first() + key_offsets[i]); - } -#endif } else { std::vector rx_counts(row_comm_size, size_t{0}); std::vector displacements(row_comm_size, size_t{0}); diff --git a/cpp/src/structure/graph_impl.cuh b/cpp/src/structure/graph_impl.cuh index 12e5a5dc234..1811ae196b7 100644 --- a/cpp/src/structure/graph_impl.cuh +++ b/cpp/src/structure/graph_impl.cuh @@ -999,20 +999,18 @@ graph_t( + edgelists[i], + major_range_first, use_dcs ? std::optional{major_range_first + (*edge_partition_segment_offsets_) [(*(meta.segment_offsets)).size() * i + detail::num_sparse_segments_per_vertex_partition]} - : std::nullopt; - auto [offsets, indices, weights, dcs_nzd_vertices] = - compress_edgelist(edgelists[i], - major_range_first, - major_hypersparse_first, - major_range_last, - minor_range_first, - minor_range_last, - handle.get_stream()); + : std::nullopt, + major_range_last, + minor_range_first, + minor_range_last, + handle.get_stream()); edge_partition_offsets_.push_back(std::move(offsets)); edge_partition_indices_.push_back(std::move(indices)); @@ -1149,20 +1147,18 @@ graph_t( + edgelists[i], + major_range_first, use_dcs ? std::optional{major_range_first + (*edge_partition_segment_offsets_) [(*(meta.segment_offsets)).size() * i + detail::num_sparse_segments_per_vertex_partition]} - : std::nullopt; - auto [offsets, indices, weights, dcs_nzd_vertices] = - compress_edgelist(edgelists[i], - major_range_first, - major_hypersparse_first, - major_range_last, - minor_range_first, - minor_range_last, - handle.get_stream()); + : std::nullopt, + major_range_last, + minor_range_first, + minor_range_last, + handle.get_stream()); edgelist_src_partitions[i].resize(0, handle.get_stream()); edgelist_src_partitions[i].shrink_to_fit(handle.get_stream()); edgelist_dst_partitions[i].resize(0, handle.get_stream());