Skip to content

Commit

Permalink
Enable concurrent broadcasts in update_edge_partition_minor_property() (
Browse files Browse the repository at this point in the history
#2413)

Dependent on rapidsai/raft#742

Place multiple broadcast operations inside ncclGroupStart and ncclGroupEnd wrappers to improve performance.

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)

URL: #2413
  • Loading branch information
seunghwak authored Jul 21, 2022
1 parent 1ae6133 commit 656638d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 57 deletions.
4 changes: 4 additions & 0 deletions cpp/include/cugraph/utilities/device_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1091,4 +1091,8 @@ device_gatherv(raft::comms::comms_t const& comm,
.run(comm, input_first, output_first, sendcount, recvcounts, displacements, root, stream_view);
}

inline void device_group_start(raft::comms::comms_t const& comm) { comm.group_start(); }

inline void device_group_end(raft::comms::comms_t const& comm) { comm.group_end(); }

} // namespace cugraph
40 changes: 3 additions & 37 deletions cpp/src/prims/update_edge_partition_src_dst_property.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,6 @@ void update_edge_partition_minor_property(
key_offsets = *(graph_view.local_sorted_unique_edge_dst_vertex_partition_offsets());
}

#if 0 // FIXME: should not directly call ncclGroupStart(), ncclGroupEnd(), enable after adding a
// raft::comms mechanism to group broadcast operations
// memory footprint vs parallelism trade-off
// memory requirement per loop is
// (V/comm_size) * sizeof(value_t)
Expand Down Expand Up @@ -327,10 +325,7 @@ void update_edge_partition_minor_property(
}

for (size_t round = 0; round < num_rounds; ++round) {
// FIXME: better use a higher-level interface than directly invoking NCCL functions.
if (ncclGroupStart() != ncclSuccess) {
CUGRAPH_FAIL("ncclGroupStart failure.");
}
device_group_start(row_comm);
for (size_t i = 0; i < num_concurrent_bcasts; ++i) {
auto j = num_rounds * i + round;
if (j < static_cast<size_t>(row_comm_size)) {
Expand All @@ -343,10 +338,8 @@ void update_edge_partition_minor_property(
handle.get_stream());
}
}
// FIXME: better use a higher-level interface than directly invoking NCCL functions.
if (ncclGroupEnd() != ncclSuccess) {
CUGRAPH_FAIL("ncclGroupEnd failure.");
}
device_group_end(row_comm);

for (size_t i = 0; i < num_concurrent_bcasts; ++i) {
auto j = num_rounds * i + round;
if (j < static_cast<size_t>(row_comm_size)) {
Expand All @@ -363,33 +356,6 @@ void update_edge_partition_minor_property(
}
}
}
#else
vertex_t max_rx_size{0};
for (int i = 0; i < row_comm_size; ++i) {
max_rx_size = std::max(
max_rx_size, graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + i));
}
auto rx_value_buffer = allocate_dataframe_buffer<value_t>(max_rx_size, handle.get_stream());
auto rx_value_first = get_dataframe_buffer_begin(rx_value_buffer);
for (int i = 0; i < row_comm_size; ++i) {
device_bcast(row_comm,
vertex_property_input_first,
rx_value_first,
graph_view.vertex_partition_range_size(col_comm_rank * row_comm_size + i),
i,
handle.get_stream());

auto v_offset_first = thrust::make_transform_iterator(
(*(edge_partition_minor_property_output.keys())).begin() + key_offsets[i],
[v_first = graph_view.vertex_partition_range_first(
col_comm_rank * row_comm_size + i)] __device__(auto v) { return v - v_first; });
thrust::gather(handle.get_thrust_policy(),
v_offset_first,
v_offset_first + (key_offsets[i + 1] - key_offsets[i]),
rx_value_first,
edge_partition_minor_property_output.value_first() + key_offsets[i]);
}
#endif
} else {
std::vector<size_t> rx_counts(row_comm_size, size_t{0});
std::vector<size_t> displacements(row_comm_size, size_t{0});
Expand Down
36 changes: 16 additions & 20 deletions cpp/src/structure/graph_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -999,20 +999,18 @@ graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu, std::enable_if_
for (size_t i = 0; i < edgelists.size(); ++i) {
auto [major_range_first, major_range_last] = partition_.local_edge_partition_major_range(i);
auto [minor_range_first, minor_range_last] = partition_.local_edge_partition_minor_range();
auto major_hypersparse_first =
auto [offsets, indices, weights, dcs_nzd_vertices] = compress_edgelist<store_transposed>(
edgelists[i],
major_range_first,
use_dcs ? std::optional<vertex_t>{major_range_first +
(*edge_partition_segment_offsets_)
[(*(meta.segment_offsets)).size() * i +
detail::num_sparse_segments_per_vertex_partition]}
: std::nullopt;
auto [offsets, indices, weights, dcs_nzd_vertices] =
compress_edgelist<store_transposed>(edgelists[i],
major_range_first,
major_hypersparse_first,
major_range_last,
minor_range_first,
minor_range_last,
handle.get_stream());
: std::nullopt,
major_range_last,
minor_range_first,
minor_range_last,
handle.get_stream());

edge_partition_offsets_.push_back(std::move(offsets));
edge_partition_indices_.push_back(std::move(indices));
Expand Down Expand Up @@ -1149,20 +1147,18 @@ graph_t<vertex_t, edge_t, weight_t, store_transposed, multi_gpu, std::enable_if_
for (size_t i = 0; i < edgelists.size(); ++i) {
auto [major_range_first, major_range_last] = partition_.local_edge_partition_major_range(i);
auto [minor_range_first, minor_range_last] = partition_.local_edge_partition_minor_range();
auto major_hypersparse_first =
auto [offsets, indices, weights, dcs_nzd_vertices] = compress_edgelist<store_transposed>(
edgelists[i],
major_range_first,
use_dcs ? std::optional<vertex_t>{major_range_first +
(*edge_partition_segment_offsets_)
[(*(meta.segment_offsets)).size() * i +
detail::num_sparse_segments_per_vertex_partition]}
: std::nullopt;
auto [offsets, indices, weights, dcs_nzd_vertices] =
compress_edgelist<store_transposed>(edgelists[i],
major_range_first,
major_hypersparse_first,
major_range_last,
minor_range_first,
minor_range_last,
handle.get_stream());
: std::nullopt,
major_range_last,
minor_range_first,
minor_range_last,
handle.get_stream());
edgelist_src_partitions[i].resize(0, handle.get_stream());
edgelist_src_partitions[i].shrink_to_fit(handle.get_stream());
edgelist_dst_partitions[i].resize(0, handle.get_stream());
Expand Down

0 comments on commit 656638d

Please sign in to comment.