Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a vertex pair intersection primitive #2728

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
897ecc8
rename a prim file
seunghwak Sep 14, 2022
0f3e941
initial prim implementation
seunghwak Sep 14, 2022
fa9b867
Merge branch 'branch-22.10' of github.com:rapidsai/cugraph into fea_r…
seunghwak Sep 14, 2022
06598ae
refactor prim tests
seunghwak Sep 15, 2022
368740c
fix file name
seunghwak Sep 15, 2022
1c24e12
add to_thrust_tuple utility function
seunghwak Sep 19, 2022
193cb0d
fix compile error in vertex_frontier.cuh
seunghwak Sep 19, 2022
313dac9
fix compiler warnings
seunghwak Sep 19, 2022
f57568a
add test suit
seunghwak Sep 19, 2022
41abaf0
fix compile errors
seunghwak Sep 19, 2022
da83d3b
silence spurious may-be-used-uninitialized warnings
seunghwak Sep 23, 2022
d5e415b
bug fixes
seunghwak Sep 23, 2022
fe31099
Merge branch 'branch-22.10' of github.com:rapidsai/cugraph into fea_r…
seunghwak Sep 23, 2022
2a7d0c8
fix clang-format errors
seunghwak Sep 23, 2022
1fb6917
guard a cugraph-ops call inside ifdef
seunghwak Sep 23, 2022
1d73867
first draft implementation
seunghwak Sep 24, 2022
5607a32
Merge branch 'branch-22.10' of github.com:rapidsai/cugraph into fea_v…
seunghwak Sep 26, 2022
e8d3ce9
Merge branch 'branch-22.10' of github.com:rapidsai/cugraph into fea_r…
seunghwak Sep 26, 2022
0c01bc6
Merge branch 'upstream_pr2703' into fea_v_pair_intersection_prim
seunghwak Sep 26, 2022
bbe6fba
rename utility functors
seunghwak Sep 26, 2022
3420364
add a test and fix compile errors
seunghwak Sep 27, 2022
d8e647e
resolve merge conflicts
seunghwak Sep 27, 2022
789f15c
test MG prim test
seunghwak Sep 27, 2022
69f307c
clang-format
seunghwak Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 81 additions & 64 deletions cpp/src/prims/detail/nbr_intersection.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,77 @@ struct gatherv_indices_t {
}
};

template <typename GraphViewType, typename VertexPairIterator>
size_t count_invalid_vertex_pairs(raft::handle_t const& handle,
GraphViewType const& graph_view,
VertexPairIterator vertex_pair_first,
VertexPairIterator vertex_pair_last)
{
using vertex_t = typename GraphViewType::vertex_type;

std::vector<vertex_t> h_edge_partition_major_range_firsts(
graph_view.number_of_local_edge_partitions());
std::vector<vertex_t> h_edge_partition_major_range_lasts(
h_edge_partition_major_range_firsts.size());
vertex_t edge_partition_minor_range_first{};
vertex_t edge_partition_minor_range_last{};
if constexpr (GraphViewType::is_multi_gpu) {
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); i++) {
if constexpr (GraphViewType::is_storage_transposed) {
h_edge_partition_major_range_firsts[i] = graph_view.local_edge_partition_dst_range_first(i);
h_edge_partition_major_range_lasts[i] = graph_view.local_edge_partition_dst_range_last(i);
} else {
h_edge_partition_major_range_firsts[i] = graph_view.local_edge_partition_src_range_first(i);
h_edge_partition_major_range_lasts[i] = graph_view.local_edge_partition_src_range_last(i);
}
}
if constexpr (GraphViewType::is_storage_transposed) {
edge_partition_minor_range_first = graph_view.local_edge_partition_src_range_first();
edge_partition_minor_range_last = graph_view.local_edge_partition_src_range_last();
} else {
edge_partition_minor_range_first = graph_view.local_edge_partition_dst_range_first();
edge_partition_minor_range_last = graph_view.local_edge_partition_dst_range_last();
}
} else {
h_edge_partition_major_range_firsts[0] = vertex_t{0};
h_edge_partition_major_range_lasts[0] = graph_view.number_of_vertices();
edge_partition_minor_range_first = vertex_t{0};
edge_partition_minor_range_last = graph_view.number_of_vertices();
}
rmm::device_uvector<vertex_t> d_edge_partition_major_range_firsts(
h_edge_partition_major_range_firsts.size(), handle.get_stream());
rmm::device_uvector<vertex_t> d_edge_partition_major_range_lasts(
h_edge_partition_major_range_lasts.size(), handle.get_stream());
raft::update_device(d_edge_partition_major_range_firsts.data(),
h_edge_partition_major_range_firsts.data(),
h_edge_partition_major_range_firsts.size(),
handle.get_stream());
raft::update_device(d_edge_partition_major_range_lasts.data(),
h_edge_partition_major_range_lasts.data(),
h_edge_partition_major_range_lasts.size(),
handle.get_stream());

auto num_invalid_pairs = thrust::count_if(
handle.get_thrust_policy(),
vertex_pair_first,
vertex_pair_last,
is_invalid_input_vertex_pair_t<vertex_t>{
graph_view.number_of_vertices(),
raft::device_span<vertex_t const>(d_edge_partition_major_range_firsts.begin(),
d_edge_partition_major_range_firsts.end()),
raft::device_span<vertex_t const>(d_edge_partition_major_range_lasts.begin(),
d_edge_partition_major_range_lasts.end()),
edge_partition_minor_range_first,
edge_partition_minor_range_last});
if constexpr (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();
num_invalid_pairs =
host_scalar_allreduce(comm, num_invalid_pairs, raft::comms::op_t::SUM, handle.get_stream());
}

return num_invalid_pairs;
}

// In multi-GPU, the first element of every vertex pair in [vertex_pair_first, vertex_pair) should
// be within the valid edge partition major range assigned to this process and the second element
// should be within the valid edge partition minor range assigned to this process.
Expand All @@ -483,7 +554,7 @@ struct gatherv_indices_t {
// one can limit the number of unique vertices (aggregated over column communicator in multi-GPU) to
// build neighbor list; we need to bulid neighbor lists for the first element of every input vertex
// pair if intersect_dst_nbr[0] == GraphViewType::is_storage_transposed and build neighbor lists for
// the second element of every input vertex pair if single-GPU and intersect_dst_nbr[0] ==
// the second element of every input vertex pair if single-GPU and intersect_dst_nbr[1] ==
// GraphViewType::is_storage_transposed or multi-GPU. For load balancing,
// thrust::distance(vertex_pair_first, vertex_pair_last) should be comparable across the global
// communicator. If we need to build the neighbor lists, grouping based on applying "vertex ID %
Expand Down Expand Up @@ -517,79 +588,22 @@ nbr_intersection(raft::handle_t const& handle,
if (do_expensive_check) {
auto is_sorted =
thrust::is_sorted(handle.get_thrust_policy(), vertex_pair_first, vertex_pair_last);

std::vector<vertex_t> h_edge_partition_major_range_firsts(
graph_view.number_of_local_edge_partitions());
std::vector<vertex_t> h_edge_partition_major_range_lasts(
h_edge_partition_major_range_firsts.size());
vertex_t edge_partition_minor_range_first{};
vertex_t edge_partition_minor_range_last{};
if constexpr (GraphViewType::is_multi_gpu) {
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); i++) {
if constexpr (GraphViewType::is_storage_transposed) {
h_edge_partition_major_range_firsts[i] =
graph_view.local_edge_partition_dst_range_first(i);
h_edge_partition_major_range_lasts[i] = graph_view.local_edge_partition_dst_range_last(i);
} else {
h_edge_partition_major_range_firsts[i] =
graph_view.local_edge_partition_src_range_first(i);
h_edge_partition_major_range_lasts[i] = graph_view.local_edge_partition_src_range_last(i);
}
}
if constexpr (GraphViewType::is_storage_transposed) {
edge_partition_minor_range_first = graph_view.local_edge_partition_src_range_first();
edge_partition_minor_range_last = graph_view.local_edge_partition_src_range_last();
} else {
edge_partition_minor_range_first = graph_view.local_edge_partition_dst_range_first();
edge_partition_minor_range_last = graph_view.local_edge_partition_dst_range_last();
}
} else {
h_edge_partition_major_range_firsts[0] = vertex_t{0};
h_edge_partition_major_range_lasts[0] = graph_view.number_of_vertices();
edge_partition_minor_range_first = vertex_t{0};
edge_partition_minor_range_last = graph_view.number_of_vertices();
}
rmm::device_uvector<vertex_t> d_edge_partition_major_range_firsts(
h_edge_partition_major_range_firsts.size(), handle.get_stream());
rmm::device_uvector<vertex_t> d_edge_partition_major_range_lasts(
h_edge_partition_major_range_lasts.size(), handle.get_stream());
raft::update_device(d_edge_partition_major_range_firsts.data(),
h_edge_partition_major_range_firsts.data(),
h_edge_partition_major_range_firsts.size(),
handle.get_stream());
raft::update_device(d_edge_partition_major_range_lasts.data(),
h_edge_partition_major_range_lasts.data(),
h_edge_partition_major_range_lasts.size(),
handle.get_stream());

auto num_invalid_pairs = thrust::count_if(
handle.get_thrust_policy(),
vertex_pair_first,
vertex_pair_last,
is_invalid_input_vertex_pair_t<vertex_t>{
graph_view.number_of_vertices(),
raft::device_span<vertex_t const>(d_edge_partition_major_range_firsts.begin(),
d_edge_partition_major_range_firsts.end()),
raft::device_span<vertex_t const>(d_edge_partition_major_range_lasts.begin(),
d_edge_partition_major_range_lasts.end()),
edge_partition_minor_range_first,
edge_partition_minor_range_last});
if constexpr (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

is_sorted = static_cast<bool>(host_scalar_allreduce(
is_sorted = static_cast<bool>(host_scalar_allreduce(
comm, static_cast<int>(is_sorted), raft::comms::op_t::MIN, handle.get_stream()));
num_invalid_pairs =
host_scalar_allreduce(comm, num_invalid_pairs, raft::comms::op_t::SUM, handle.get_stream());
}

CUGRAPH_EXPECTS(is_sorted, "Invalid input arguments: input vertex pairs should be sorted.");

auto num_invalid_pairs =
count_invalid_vertex_pairs(handle, graph_view, vertex_pair_first, vertex_pair_last);
CUGRAPH_EXPECTS(num_invalid_pairs == 0,
"Invalid input arguments: there are invalid input vertex pairs.");
}

// 2. Collect neighbor lists for unique second pair elements (for the neighbors within the minor
// range for this GPU)
// range for this GPU); Note that no need to collect for first pair elements as they already
// locally reside.

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
[[maybe_unused]] auto stream_adapter =
Expand Down Expand Up @@ -634,6 +648,9 @@ nbr_intersection(raft::handle_t const& handle,
unique_majors.shrink_to_fit(handle.get_stream());

if (col_comm_size > 1) {
// FIXME: We may refactor this code to improve scalability. We may call multiple gatherv
// calls, perform local sort and unique, and call multiple broadcasts rather than
// performing sort and unique for the entire range in every GPU in col_comm.
auto rx_counts =
host_scalar_allgather(col_comm, unique_majors.size(), handle.get_stream());
std::vector<size_t> rx_displacements(rx_counts.size());
Expand Down
Loading