-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix MG similarity issues #4741
Fix MG similarity issues #4741
Changes from 2 commits
969ec7d
1e5b5ce
bbdaf66
192bae0
52a57e1
f00d4cf
eed6afd
f2386a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -368,187 +368,196 @@ all_pairs_similarity(raft::handle_t const& handle, | |
sum_two_hop_degrees, | ||
MAX_PAIRS_PER_BATCH); | ||
|
||
for (size_t batch_number = 0; batch_number < (batch_offsets.size() - 1); ++batch_number) { | ||
if (batch_offsets[batch_number + 1] > batch_offsets[batch_number]) { | ||
auto [offsets, v2] = | ||
k_hop_nbrs(handle, | ||
graph_view, | ||
raft::device_span<vertex_t const>{ | ||
tmp_vertices.data() + batch_offsets[batch_number], | ||
batch_offsets[batch_number + 1] - batch_offsets[batch_number]}, | ||
2, | ||
do_expensive_check); | ||
|
||
auto v1 = cugraph::detail::expand_sparse_offsets( | ||
raft::device_span<size_t const>{offsets.data(), offsets.size()}, | ||
vertex_t{0}, | ||
handle.get_stream()); | ||
// FIXME: compute_offset_aligned_element_chunks can return duplicates. Should it? Should | ||
// explore | ||
// whether this functionality should be pushed into that function | ||
batch_offsets.resize(std::distance(batch_offsets.begin(), | ||
std::unique(batch_offsets.begin(), batch_offsets.end()))); | ||
|
||
cugraph::unrenumber_local_int_vertices( | ||
handle, | ||
v1.data(), | ||
v1.size(), | ||
size_t num_batches = batch_offsets.size() - 1; | ||
if constexpr (multi_gpu) { | ||
num_batches = cugraph::host_scalar_allreduce( | ||
handle.get_comms(), num_batches, raft::comms::op_t::MAX, handle.get_stream()); | ||
} | ||
|
||
for (size_t batch_number = 0; batch_number < num_batches; ++batch_number) { | ||
raft::device_span<vertex_t const> batch_seeds{tmp_vertices.data(), size_t{0}}; | ||
|
||
if (((batch_number + 1) < batch_offsets.size()) && | ||
(batch_offsets[batch_number + 1] > batch_offsets[batch_number])) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not the case, and was in fact the bug that triggered my PR. The vertices can be specified as a parameter. The batches are constructed by looking at the size of the 2-hop neighborhood of selected vertices. The test case that triggered me investigating this was a situation where the number of batches on rank 0 was smaller than the number of batches on rank 1. The above code, by computing the MAX value of the number of batches across all GPUs ensures that every vertex has the same number for I considered extending |
||
batch_seeds = raft::device_span<vertex_t const>{ | ||
tmp_vertices.data() + batch_offsets[batch_number], | ||
vertex_t{0}, | ||
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]), | ||
do_expensive_check); | ||
batch_offsets[batch_number + 1] - batch_offsets[batch_number]}; | ||
} | ||
|
||
auto [offsets, v2] = k_hop_nbrs(handle, graph_view, batch_seeds, 2, do_expensive_check); | ||
|
||
auto new_size = thrust::distance( | ||
auto v1 = cugraph::detail::expand_sparse_offsets( | ||
raft::device_span<size_t const>{offsets.data(), offsets.size()}, | ||
vertex_t{0}, | ||
handle.get_stream()); | ||
|
||
cugraph::unrenumber_local_int_vertices( | ||
handle, | ||
v1.data(), | ||
v1.size(), | ||
tmp_vertices.data() + batch_offsets[batch_number], | ||
vertex_t{0}, | ||
static_cast<vertex_t>(batch_offsets[batch_number + 1] - batch_offsets[batch_number]), | ||
do_expensive_check); | ||
|
||
auto new_size = thrust::distance( | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::remove_if( | ||
handle.get_thrust_policy(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::remove_if( | ||
handle.get_thrust_policy(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::make_zip_iterator(v1.end(), v2.end()), | ||
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); })); | ||
thrust::make_zip_iterator(v1.end(), v2.end()), | ||
[] __device__(auto tuple) { return thrust::get<0>(tuple) == thrust::get<1>(tuple); })); | ||
|
||
v1.resize(new_size, handle.get_stream()); | ||
v2.resize(new_size, handle.get_stream()); | ||
v1.resize(new_size, handle.get_stream()); | ||
v2.resize(new_size, handle.get_stream()); | ||
|
||
if constexpr (multi_gpu) { | ||
// shuffle vertex pairs | ||
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); | ||
|
||
std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) = | ||
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t, | ||
edge_t, | ||
weight_t, | ||
int>( | ||
handle, | ||
std::move(v1), | ||
std::move(v2), | ||
std::nullopt, | ||
std::nullopt, | ||
std::nullopt, | ||
vertex_partition_range_lasts); | ||
} | ||
if constexpr (multi_gpu) { | ||
// shuffle vertex pairs | ||
auto vertex_partition_range_lasts = graph_view.vertex_partition_range_lasts(); | ||
|
||
std::tie(v1, v2, std::ignore, std::ignore, std::ignore, std::ignore) = | ||
detail::shuffle_int_vertex_pairs_with_values_to_local_gpu_by_edge_partitioning<vertex_t, | ||
edge_t, | ||
weight_t, | ||
int>( | ||
handle, | ||
std::move(v1), | ||
std::move(v2), | ||
std::nullopt, | ||
std::nullopt, | ||
std::nullopt, | ||
vertex_partition_range_lasts); | ||
} | ||
|
||
auto score = | ||
similarity(handle, | ||
graph_view, | ||
edge_weight_view, | ||
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()}, | ||
raft::device_span<vertex_t const>{v2.data(), v2.size()}), | ||
functor, | ||
coeff, | ||
do_expensive_check); | ||
|
||
// Add a remove_if to remove items that are less than the last topk element | ||
new_size = thrust::distance( | ||
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()), | ||
thrust::remove_if(handle.get_thrust_policy(), | ||
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()), | ||
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()), | ||
[similarity_threshold] __device__(auto tuple) { | ||
return thrust::get<0>(tuple) < similarity_threshold; | ||
})); | ||
|
||
score.resize(new_size, handle.get_stream()); | ||
v1.resize(new_size, handle.get_stream()); | ||
v2.resize(new_size, handle.get_stream()); | ||
|
||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.end(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
size_t v1_keep = std::min(*topk, v1.size()); | ||
|
||
if (score.size() < (top_v1.size() + v1_keep)) { | ||
score.resize(top_v1.size() + v1_keep, handle.get_stream()); | ||
v1.resize(score.size(), handle.get_stream()); | ||
v2.resize(score.size(), handle.get_stream()); | ||
} | ||
auto score = | ||
similarity(handle, | ||
graph_view, | ||
edge_weight_view, | ||
std::make_tuple(raft::device_span<vertex_t const>{v1.data(), v1.size()}, | ||
raft::device_span<vertex_t const>{v2.data(), v2.size()}), | ||
functor, | ||
coeff, | ||
do_expensive_check); | ||
|
||
// Add a remove_if to remove items that are less than the last topk element | ||
new_size = thrust::distance( | ||
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()), | ||
thrust::remove_if(handle.get_thrust_policy(), | ||
thrust::make_zip_iterator(score.begin(), v1.begin(), v2.begin()), | ||
thrust::make_zip_iterator(score.end(), v1.end(), v2.end()), | ||
[similarity_threshold] __device__(auto tuple) { | ||
return thrust::get<0>(tuple) < similarity_threshold; | ||
})); | ||
|
||
score.resize(new_size, handle.get_stream()); | ||
v1.resize(new_size, handle.get_stream()); | ||
v2.resize(new_size, handle.get_stream()); | ||
|
||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.end(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
size_t v1_keep = std::min(*topk, v1.size()); | ||
|
||
if (score.size() < (top_v1.size() + v1_keep)) { | ||
score.resize(top_v1.size() + v1_keep, handle.get_stream()); | ||
v1.resize(score.size(), handle.get_stream()); | ||
v2.resize(score.size(), handle.get_stream()); | ||
} | ||
|
||
thrust::copy(handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep); | ||
thrust::copy(handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep); | ||
thrust::copy( | ||
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep); | ||
|
||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.end(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
if (top_v1.size() < std::min(*topk, v1.size())) { | ||
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream()); | ||
top_v2.resize(top_v1.size(), handle.get_stream()); | ||
top_score.resize(top_v1.size(), handle.get_stream()); | ||
} | ||
|
||
thrust::copy( | ||
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin()); | ||
thrust::copy( | ||
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin()); | ||
thrust::copy(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.begin() + top_v1.size(), | ||
top_score.begin()); | ||
Comment on lines
+493
to
+500
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure top_v1 and top_v2 are properly re-sized here (not just reserved). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I resize them above on line 487. I did the reserve to make sure we don't have to move anything as the array grows. I am doing the resizing as necessary so I can use the |
||
|
||
thrust::copy( | ||
handle.get_thrust_policy(), top_v1.begin(), top_v1.end(), v1.begin() + v1_keep); | ||
thrust::copy( | ||
handle.get_thrust_policy(), top_v2.begin(), top_v2.end(), v2.begin() + v1_keep); | ||
thrust::copy( | ||
handle.get_thrust_policy(), top_score.begin(), top_score.end(), score.begin() + v1_keep); | ||
|
||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.end(), | ||
thrust::make_zip_iterator(v1.begin(), v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
if (top_v1.size() < std::min(*topk, v1.size())) { | ||
top_v1.resize(std::min(*topk, v1.size()), handle.get_stream()); | ||
top_v2.resize(top_v1.size(), handle.get_stream()); | ||
top_score.resize(top_v1.size(), handle.get_stream()); | ||
if constexpr (multi_gpu) { | ||
bool is_root = handle.get_comms().get_rank() == int{0}; | ||
auto rx_sizes = cugraph::host_scalar_gather( | ||
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream()); | ||
std::vector<size_t> rx_displs; | ||
size_t gathered_size{0}; | ||
|
||
if (is_root) { | ||
rx_displs.resize(handle.get_comms().get_size()); | ||
rx_displs[0] = 0; | ||
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1); | ||
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end()); | ||
} | ||
|
||
thrust::copy( | ||
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin()); | ||
thrust::copy( | ||
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin()); | ||
thrust::copy(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.begin() + top_v1.size(), | ||
top_score.begin()); | ||
rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream()); | ||
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream()); | ||
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream()); | ||
|
||
cugraph::device_gatherv( | ||
handle.get_comms(), | ||
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()), | ||
thrust::make_zip_iterator( | ||
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()), | ||
top_v1.size(), | ||
rx_sizes, | ||
rx_displs, | ||
int{0}, | ||
handle.get_stream()); | ||
|
||
if constexpr (multi_gpu) { | ||
bool is_root = handle.get_comms().get_rank() == int{0}; | ||
auto rx_sizes = cugraph::host_scalar_gather( | ||
handle.get_comms(), top_v1.size(), int{0}, handle.get_stream()); | ||
std::vector<size_t> rx_displs; | ||
size_t gathered_size{0}; | ||
|
||
if (is_root) { | ||
rx_displs.resize(handle.get_comms().get_size()); | ||
rx_displs[0] = 0; | ||
std::partial_sum(rx_sizes.begin(), rx_sizes.end() - 1, rx_displs.begin() + 1); | ||
gathered_size = std::reduce(rx_sizes.begin(), rx_sizes.end()); | ||
if (is_root) { | ||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
gathered_score.begin(), | ||
gathered_score.end(), | ||
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
if (gathered_v1.size() > *topk) { | ||
gathered_v1.resize(*topk, handle.get_stream()); | ||
gathered_v2.resize(*topk, handle.get_stream()); | ||
gathered_score.resize(*topk, handle.get_stream()); | ||
} | ||
|
||
rmm::device_uvector<vertex_t> gathered_v1(gathered_size, handle.get_stream()); | ||
rmm::device_uvector<vertex_t> gathered_v2(gathered_size, handle.get_stream()); | ||
rmm::device_uvector<weight_t> gathered_score(gathered_size, handle.get_stream()); | ||
|
||
cugraph::device_gatherv( | ||
handle.get_comms(), | ||
thrust::make_zip_iterator(top_v1.begin(), top_v2.begin(), top_score.begin()), | ||
thrust::make_zip_iterator( | ||
gathered_v1.begin(), gathered_v2.begin(), gathered_score.begin()), | ||
|
||
top_v1.size(), | ||
rx_sizes, | ||
rx_displs, | ||
int{0}, | ||
handle.get_stream()); | ||
|
||
if (is_root) { | ||
thrust::sort_by_key(handle.get_thrust_policy(), | ||
gathered_score.begin(), | ||
gathered_score.end(), | ||
thrust::make_zip_iterator(gathered_v1.begin(), gathered_v2.begin()), | ||
thrust::greater<weight_t>{}); | ||
|
||
if (gathered_v1.size() > *topk) { | ||
gathered_v1.resize(*topk, handle.get_stream()); | ||
gathered_v2.resize(*topk, handle.get_stream()); | ||
gathered_score.resize(*topk, handle.get_stream()); | ||
} | ||
|
||
top_v1 = std::move(gathered_v1); | ||
top_v2 = std::move(gathered_v2); | ||
top_score = std::move(gathered_score); | ||
} else { | ||
top_v1.resize(0, handle.get_stream()); | ||
top_v2.resize(0, handle.get_stream()); | ||
top_score.resize(0, handle.get_stream()); | ||
} | ||
top_v1 = std::move(gathered_v1); | ||
top_v2 = std::move(gathered_v2); | ||
top_score = std::move(gathered_score); | ||
} else { | ||
top_v1.resize(0, handle.get_stream()); | ||
top_v2.resize(0, handle.get_stream()); | ||
top_score.resize(0, handle.get_stream()); | ||
} | ||
} | ||
|
||
if (top_score.size() == *topk) { | ||
raft::update_host( | ||
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream()); | ||
if (top_score.size() == *topk) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Print There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I restructured the if statements. The code is structured to keep the Thanks for the diagnosis @seunghwak! |
||
raft::update_host( | ||
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream()); | ||
|
||
if constexpr (multi_gpu) { | ||
similarity_threshold = host_scalar_bcast( | ||
handle.get_comms(), similarity_threshold, int{0}, handle.get_stream()); | ||
} | ||
if constexpr (multi_gpu) { | ||
similarity_threshold = host_scalar_bcast( | ||
handle.get_comms(), similarity_threshold, int{0}, handle.get_stream()); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the lines above,
Shouldn't
reserve
here beresize
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are missing
handle.sync_stream()
after this to ensure thatsum_two_hop_degrees
is ready to use in the followingcompute_offset_aligned_element_chunks
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the big thing that I was tripping over. Added the sync (along with fixing the problem you note below) and the hang appears to be resolved.