-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix MG similarity issues #4741
Fix MG similarity issues #4741
Conversation
@@ -368,187 +368,196 @@ all_pairs_similarity(raft::handle_t const& handle, | |||
sum_two_hop_degrees, | |||
MAX_PAIRS_PER_BATCH); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the lines above,
top_v1.reserve(*topk, handle.get_stream());
top_v2.reserve(*topk, handle.get_stream());
top_score.reserve(*topk, handle.get_stream());
Shouldn't reserve
here be resize
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raft::update_host(&sum_two_hop_degrees,
two_hop_degree_offsets.data() + two_hop_degree_offsets.size() - 1,
1,
handle.get_stream());
We are missing handle.sync_stream()
after this to ensure that sum_two_hop_degrees
is ready to use in the following compute_offset_aligned_element_chunks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the big thing that I was tripping over. Added the sync (along with fixing the problem you note below) and the hang appears to be resolved.
raft::device_span<vertex_t const> batch_seeds{tmp_vertices.data(), size_t{0}}; | ||
|
||
if (((batch_number + 1) < batch_offsets.size()) && | ||
(batch_offsets[batch_number + 1] > batch_offsets[batch_number])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(batch_number + 1) < batch_offsets.size()
should always be true here, right? batch_number < num_batches
and batch_offsets.size()
is num_batches + 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the case, and was in fact the bug that triggered my PR.
The vertices can be specified as a parameter. The batches are constructed by looking at the size of the 2-hop neighborhood of selected vertices. The test case that triggered me investigating this was a situation where the number of batches on rank 0 was smaller than the number of batches on rank 1.
The above code, by computing the MAX value of the number of batches across all GPUs ensures that every vertex has the same number for num_batches
, but that means that if there are no batches on a particular GPU the batch_offsets
will not be long enough to do the second half of this computation.
I considered extending batch_offsets
and filling it with the last value, but this seemed better since it's the only use of that array.
if (top_score.size() == *topk) { | ||
raft::update_host( | ||
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream()); | ||
if (top_score.size() == *topk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Print top_score.size()
. It is 10 in rank0, 0 in rank 1. So, only rank0 participates in the host_scalar_bcast. This is causing the hang you see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I restructured the if statements. The code is structured to keep the topk
on only rank 0 (makes much of the computation easier). I moved the host_scalar_bcast
call outside if this if statement in the next push. Between that and the sync mentioned above, this got me unblocked. I'll push an update to the PR later today.
Thanks for the diagnosis @seunghwak!
thrust::copy( | ||
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin()); | ||
thrust::copy( | ||
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin()); | ||
thrust::copy(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.begin() + top_v1.size(), | ||
top_score.begin()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure top_v1 and top_v2 are properly re-sized here (not just reserved).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I resize them above on line 487.
I did the reserve to make sure we don't have to move anything as the array grows. I am doing the resizing as necessary so I can use the .size()
, .begin()
, and .end()
methods to reflect only what's actually used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will push an update to fix things and clean up the debugging code a bit.
raft::device_span<vertex_t const> batch_seeds{tmp_vertices.data(), size_t{0}}; | ||
|
||
if (((batch_number + 1) < batch_offsets.size()) && | ||
(batch_offsets[batch_number + 1] > batch_offsets[batch_number])) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the case, and was in fact the bug that triggered my PR.
The vertices can be specified as a parameter. The batches are constructed by looking at the size of the 2-hop neighborhood of selected vertices. The test case that triggered me investigating this was a situation where the number of batches on rank 0 was smaller than the number of batches on rank 1.
The above code, by computing the MAX value of the number of batches across all GPUs ensures that every vertex has the same number for num_batches
, but that means that if there are no batches on a particular GPU the batch_offsets
will not be long enough to do the second half of this computation.
I considered extending batch_offsets
and filling it with the last value, but this seemed better since it's the only use of that array.
thrust::copy( | ||
handle.get_thrust_policy(), v1.begin(), v1.begin() + top_v1.size(), top_v1.begin()); | ||
thrust::copy( | ||
handle.get_thrust_policy(), v2.begin(), v2.begin() + top_v1.size(), top_v2.begin()); | ||
thrust::copy(handle.get_thrust_policy(), | ||
score.begin(), | ||
score.begin() + top_v1.size(), | ||
top_score.begin()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I resize them above on line 487.
I did the reserve to make sure we don't have to move anything as the array grows. I am doing the resizing as necessary so I can use the .size()
, .begin()
, and .end()
methods to reflect only what's actually used.
if (top_score.size() == *topk) { | ||
raft::update_host( | ||
&similarity_threshold, top_score.data() + *topk - 1, 1, handle.get_stream()); | ||
if (top_score.size() == *topk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I restructured the if statements. The code is structured to keep the topk
on only rank 0 (makes much of the computation easier). I moved the host_scalar_bcast
call outside if this if statement in the next push. Between that and the sync mentioned above, this got me unblocked. I'll push an update to the PR later today.
Thanks for the diagnosis @seunghwak!
@@ -368,187 +368,196 @@ all_pairs_similarity(raft::handle_t const& handle, | |||
sum_two_hop_degrees, | |||
MAX_PAIRS_PER_BATCH); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the big thing that I was tripping over. Added the sync (along with fixing the problem you note below) and the hang appears to be resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// MAX_PAIRS_PER_BATCH{static_cast<size_t>(handle.get_device_properties().multiProcessorCount) * | ||
// (1 << 15)}; | ||
size_t const MAX_PAIRS_PER_BATCH{100}; | ||
// size_t const MAX_PAIRS_PER_BATCH{100}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better delete the commented out code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reviewed the updated Python code and had a few comments.
# | ||
join = df1.merge(df2, left_on=["src1", "dst1"], right_on=["src2", "dst2"]) | ||
|
||
if len(df1) != len(join): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming this if
block is just for debugging?
join2 = df1.merge( | ||
df2, how="left", left_on=["src1", "dst1"], right_on=["src2", "dst2"] | ||
) | ||
pd.set_option("display.max_rows", 500) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be good to restore the option afterwards by saving the original value with a pd.get_option
.
# Check to see if all pairs in the original data frame | ||
# still exist in the new data frame. If we join (merge) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is "original" df1 and "new" df2?
worst_coeff = all_pairs_jaccard_results["jaccard_coeff"].min() | ||
better_than_k = jaccard_results[jaccard_results["jaccard_coeff"] > worst_coeff] | ||
|
||
compare( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since compare
will raise an AssertionError, I think it would be better to name it to indicate that: assert_df_results_equal
or something like that.
@@ -153,6 +154,54 @@ def networkx_call(M, benchmark_callable=None): | |||
return src, dst, coeff | |||
|
|||
|
|||
# FIXME: This compare is shared across several tests... it should be | |||
# a general utility | |||
def compare(src1, dst1, val1, src2, dst2, val2): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like compare
is always being passed values as 3 individual series from a dataframe. Since compare
will just re-create these as a dataframe, can compare be written to be called like this: compare(all_pairs_jaccard_results, jaccard_results, ['first', 'second', 'jaccard_coeff'])
?
def compare(a, b, names):
df1 = a[names]
df2 = b[names]
join = df1.merge(...)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted with @ChuckHastings offline - I'll take care of refactoring this test utility in a separate PR since I'm looking at updates like this anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm addressing much of this feedback in #4776
I'm not rewriting as much of the compare
utility as I proposed since I can't tell how general purpose it was intended to be (ex. it currently supports passing in arrays, should that still be allowed?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest making it as general purpose as it makes sense to. I didn't write the original function... I was merely speculating about its potential reuse, at least within the similarity algorithms.
I'm not sure compare
is really the right name of the function here either. It's validating that the similarity results are correct by determining that the results are a valid subset of the entire result set.
I'm also not 100% certain this was the best way to modify the function to make that computation.
/merge |
This PR adds C++ tests for the all-pairs variation of similarity algorithms. Previously the all-pairs variation was only tested in SG mode. This also addresses an issue where the all-pairs implementation would crash when there was a load imbalance across the GPUs and one of the GPUs ran out of work before the others. Closes rapidsai#4704 Authors: - Chuck Hastings (https://github.com/ChuckHastings) Approvers: - Seunghwa Kang (https://github.com/seunghwak) - Joseph Nke (https://github.com/jnke2016) - Rick Ratzel (https://github.com/rlratzel) URL: rapidsai#4741
This PR adds C++ tests for the all-pairs variation of similarity algorithms. Previously the all-pairs variation was only tested in SG mode.
This also addresses an issue where the all-pairs implementation would crash when there was a load imbalance across the GPUs and one of the GPUs ran out of work before the others.
Closes #4704