Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MG similarity issues #4741

Merged
merged 8 commits into from
Nov 18, 2024
355 changes: 182 additions & 173 deletions cpp/src/link_prediction/similarity_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,9 @@ all_pairs_similarity(raft::handle_t const& handle,
// computing/updating topk with each batch

// FIXME: Experiment with this and adjust as necessary
// size_t const
// MAX_PAIRS_PER_BATCH{static_cast<size_t>(handle.get_device_properties().multiProcessorCount) *
// (1 << 15)};
size_t const MAX_PAIRS_PER_BATCH{100};
// size_t const MAX_PAIRS_PER_BATCH{100};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better delete the commented out code?

size_t const MAX_PAIRS_PER_BATCH{
static_cast<size_t>(handle.get_device_properties().multiProcessorCount) * (1 << 15)};

rmm::device_uvector<edge_t> degrees = graph_view.compute_out_degrees(handle);
rmm::device_uvector<size_t> two_hop_degrees(degrees.size() + 1, handle.get_stream());
Expand Down Expand Up @@ -362,195 +361,205 @@ all_pairs_similarity(raft::handle_t const& handle,
1,
handle.get_stream());

handle.sync_stream();

std::tie(batch_offsets, std::ignore) = compute_offset_aligned_element_chunks(
handle,
raft::device_span<size_t const>{two_hop_degree_offsets.data(), two_hop_degree_offsets.size()},
sum_two_hop_degrees,
MAX_PAIRS_PER_BATCH);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the lines above,

    top_v1.reserve(*topk, handle.get_stream());
    top_v2.reserve(*topk, handle.get_stream());
    top_score.reserve(*topk, handle.get_stream());

Shouldn't reserve here be resize?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    raft::update_host(&sum_two_hop_degrees,
                      two_hop_degree_offsets.data() + two_hop_degree_offsets.size() - 1,
                      1,
                      handle.get_stream());

We are missing handle.sync_stream() after this to ensure that sum_two_hop_degrees is ready to use in the following compute_offset_aligned_element_chunks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the big thing that I was tripping over. Added the sync (along with fixing the problem you note below) and the hang appears to be resolved.

for (size_t batch_number = 0; batch_number < (batch_offsets.size() - 1); ++batch_number) {
if (batch_offsets[batch_number + 1] > batch_offsets[batch_number]) {
auto [offsets, v2] =
k_hop_nbrs(handle,
graph_view,
raft::device_span<vertex_t const>{
tmp_vertices.data() + batch_offsets[batch_number],
batch_offsets[batch_number + 1] - batch_offsets[batch_number]},
2,
do_expensive_check);

auto v1 = cugraph::detail::expand_sparse_offsets(
raft::device_span<size_t const>{offsets.data(), offsets.size()},
vertex_t{0},
handle.get_stream());
// FIXME: compute_offset_aligned_element_chunks can return duplicates. Should it? Should
// explore
// whether this functionality should be pushed into that function
batch_offsets.resize(std::distance(batch_offsets.begin(),
std::unique(batch_offsets.begin(), batch_offsets.end())));

cugraph::unrenumber_local_int_vertices(
handle,
v1.data(),
v1.size(),
size_t num_batches = batch_offsets.size() - 1;
if constexpr (multi_gpu) {
num_batches = cugraph::host_scalar_allreduce(
handle.get_comms(), num_batches, raft::comms::op_t::MAX, handle.get_stream());
}

for (size_t batch_number = 0; batch_number < num_batches; ++batch_number) {
raft::device_span<vertex_t const> batch_seeds{tmp_vertices.data(), size_t{0}};

if (((batch_number + 1) < batch_offsets.size()) &&
(batch_offsets[batch_number + 1] > batch_offsets[batch_number])) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(batch_number + 1) < batch_offsets.size() should always be true here, right? batch_number < num_batches and batch_offsets.size() is num_batches + 1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the case, and was in fact the bug that triggered my PR.

The vertices can be specified as a parameter. The batches are constructed by looking at the size of the 2-hop neighborhood of selected vertices. The test case that triggered me investigating this was a situation where the number of batches on rank 0 was smaller than the number of batches on rank 1.

The above code, by computing the MAX value of the number of batches across all GPUs ensures that every vertex has the same number for num_batches, but that means that if there are no batches on a particular GPU the batch_offsets will not be long enough to do the second half of this computation.

I considered extending batch_offsets and filling it with the last value, but this seemed better since it's the only use of that array.

batch_seeds = raft::device_span<vertex_t const>{
tmp_vertices.data() + batch_offsets[batch_number],
vertex_t{0},
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]),
do_expensive_check);
batch_offsets[batch_number + 1] - batch_offsets[batch_number]};
}

auto [offsets, v2] = k_hop_nbrs(handle, graph_view, batch_seeds, 2, do_expensive_check);

auto new_size = thrust::distance(
auto v1 = cugraph::detail::expand_sparse_offsets(
raft::device_span<size_t const>{offsets.data(), offsets.size()},
vertex_t{0},
handle.get_stream());

cugraph::unrenumber_local_int_vertices(
handle,
v1.data(),
v1.size(),
tmp_vertices.data() + batch_offsets[batch_number],
vertex_t{0},
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]),
do_expensive_check);

auto new_size = thrust::distance(
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::remove_if(
handle.get_thrust_policy(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::remove_if(
handle.get_thrust_policy(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::make_zip_iterator(v1.end(), v2.end()),
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); }));

v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

if constexpr (multi_gpu) {
// shuffle vertex pairs
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts();

std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) =
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t,
edge_t,
weight_t,
int>(
handle,
std::move(v1),
std::move(v2),
std::nullopt,
std::nullopt,
std::nullopt,
vertex_partition_range_lasts);
}
thrust::make_zip_iterator(v1.end(), v2.end()),
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); }));

auto score =
similarity(handle,
graph_view,
edge_weight_view,
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()},
raft::device_span<vertex_t const>{v2.data(), v2.size()}),
functor,
coeff,
do_expensive_check);

// Add a remove_if to remove items that are less than the last topk element
new_size = thrust::distance(
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::remove_if(handle.get_thrust_policy(),
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()),
[similarity_threshold] __device__(auto tuple) {
return thrust::get<0>(tuple) < similarity_threshold;
}));

score.resize(new_size, handle.get_stream());
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

size_t v1_keep = std::min(*topk, v1.size());

if (score.size() < (top_v1.size() + v1_keep)) {
score.resize(top_v1.size() + v1_keep, handle.get_stream());
v1.resize(score.size(), handle.get_stream());
v2.resize(score.size(), handle.get_stream());
}
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

thrust::copy(
handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep);

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

if (top_v1.size() < std::min(*topk, v1.size())) {
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream());
top_v2.resize(top_v1.size(), handle.get_stream());
top_score.resize(top_v1.size(), handle.get_stream());
}
if constexpr (multi_gpu) {
// shuffle vertex pairs
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts();

std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) =
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t,
edge_t,
weight_t,
int>(
handle,
std::move(v1),
std::move(v2),
std::nullopt,
std::nullopt,
std::nullopt,
vertex_partition_range_lasts);
}

thrust::copy(
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin());
thrust::copy(
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin());
thrust::copy(handle.get_thrust_policy(),
score.begin(),
score.begin() + top_v1.size(),
top_score.begin());

if constexpr (multi_gpu) {
bool is_root = handle.get_comms().get_rank() == int{0};
auto rx_sizes = cugraph::host_scalar_gather(
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream());
std::vector<size_t> rx_displs;
size_t gathered_size{0};

if (is_root) {
rx_displs.resize(handle.get_comms().get_size());
rx_displs[0] = 0;
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1);
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end());
}
auto score =
similarity(handle,
graph_view,
edge_weight_view,
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()},
raft::device_span<vertex_t const>{v2.data(), v2.size()}),
functor,
coeff,
do_expensive_check);

// Add a remove_if to remove items that are less than the last topk element
new_size = thrust::distance(
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::remove_if(handle.get_thrust_policy(),
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()),
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()),
[similarity_threshold] __device__(auto tuple) {
return thrust::get<0>(tuple) < similarity_threshold;
}));

score.resize(new_size, handle.get_stream());
v1.resize(new_size, handle.get_stream());
v2.resize(new_size, handle.get_stream());

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

size_t v1_keep = std::min(*topk, v1.size());

if (score.size() < (top_v1.size() + v1_keep)) {
score.resize(top_v1.size() + v1_keep, handle.get_stream());
v1.resize(score.size(), handle.get_stream());
v2.resize(score.size(), handle.get_stream());
}

rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream());
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream());
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream());

cugraph::device_gatherv(
handle.get_comms(),
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()),
thrust::make_zip_iterator(
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()),

top_v1.size(),
rx_sizes,
rx_displs,
int{0},
handle.get_stream());

if (is_root) {
thrust::sort_by_key(handle.get_thrust_policy(),
gathered_score.begin(),
gathered_score.end(),
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()),
thrust::greater<weight_t>{});

if (gathered_v1.size() > *topk) {
gathered_v1.resize(*topk, handle.get_stream());
gathered_v2.resize(*topk, handle.get_stream());
gathered_score.resize(*topk, handle.get_stream());
}

top_v1 = std::move(gathered_v1);
top_v2 = std::move(gathered_v2);
top_score = std::move(gathered_score);
} else {
top_v1.resize(0, handle.get_stream());
top_v2.resize(0, handle.get_stream());
top_score.resize(0, handle.get_stream());
}
thrust::copy(handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep);
thrust::copy(handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep);
thrust::copy(
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep);

thrust::sort_by_key(handle.get_thrust_policy(),
score.begin(),
score.end(),
thrust::make_zip_iterator(v1.begin(), v2.begin()),
thrust::greater<weight_t>{});

if (top_v1.size() < std::min(*topk, v1.size())) {
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream());
top_v2.resize(top_v1.size(), handle.get_stream());
top_score.resize(top_v1.size(), handle.get_stream());
}

thrust::copy(
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin());
thrust::copy(
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin());
thrust::copy(handle.get_thrust_policy(),
score.begin(),
score.begin() + top_v1.size(),
top_score.begin());
Comment on lines +493 to +500
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure top_v1 and top_v2 are properly re-sized here (not just reserved).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I resize them above on line 487.

I did the reserve to make sure we don't have to move anything as the array grows. I am doing the resizing as necessary so I can use the .size(), .begin(), and .end() methods to reflect only what's actually used.


if constexpr (multi_gpu) {
bool is_root = handle.get_comms().get_rank() == int{0};
auto rx_sizes = cugraph::host_scalar_gather(
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream());
std::vector<size_t> rx_displs;
size_t gathered_size{0};

if (is_root) {
rx_displs.resize(handle.get_comms().get_size());
rx_displs[0] = 0;
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1);
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end());
}

if (top_score.size() == *topk) {
raft::update_host(
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream());
rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream());
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream());
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream());

cugraph::device_gatherv(
handle.get_comms(),
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()),
thrust::make_zip_iterator(
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()),
top_v1.size(),
rx_sizes,
rx_displs,
int{0},
handle.get_stream());

if constexpr (multi_gpu) {
similarity_threshold = host_scalar_bcast(
handle.get_comms(), similarity_threshold, int{0}, handle.get_stream());
if (is_root) {
thrust::sort_by_key(handle.get_thrust_policy(),
gathered_score.begin(),
gathered_score.end(),
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()),
thrust::greater<weight_t>{});

if (gathered_v1.size() > *topk) {
gathered_v1.resize(*topk, handle.get_stream());
gathered_v2.resize(*topk, handle.get_stream());
gathered_score.resize(*topk, handle.get_stream());
}

top_v1 = std::move(gathered_v1);
top_v2 = std::move(gathered_v2);
top_score = std::move(gathered_score);
} else {
top_v1.resize(0, handle.get_stream());
top_v2.resize(0, handle.get_stream());
top_score.resize(0, handle.get_stream());
}
}

if (top_score.size() == *topk) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Print top_score.size(). It is 10 in rank0, 0 in rank 1. So, only rank0 participates in the host_scalar_bcast. This is causing the hang you see.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I restructured the if statements. The code is structured to keep the topk on only rank 0 (makes much of the computation easier). I moved the host_scalar_bcast call outside if this if statement in the next push. Between that and the sync mentioned above, this got me unblocked. I'll push an update to the PR later today.

Thanks for the diagnosis @seunghwak!

raft::update_host(
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream());
}
if constexpr (multi_gpu) {
similarity_threshold =
host_scalar_bcast(handle.get_comms(), similarity_threshold, int{0}, handle.get_stream());
}
}

return std::make_tuple(std::move(top_v1), std::move(top_v2), std::move(top_score));
Expand Down
Loading
Loading