Skip to content

Commit

Permalink
fix handling of fanout == -1 (rapidsai#2435)
Browse files Browse the repository at this point in the history
Uniform neighbor sampling isn't working properly with fanout specifying -1.

The `partially_decompress_edge_partition_to_fill_edgelist` function didn't handle SG properly.

closes rapidsai#2425

Authors:
  - Chuck Hastings (https://github.com/ChuckHastings)

Approvers:
  - Seunghwa Kang (https://github.com/seunghwak)

URL: rapidsai#2435
  • Loading branch information
ChuckHastings authored Jul 22, 2022
1 parent 5bf07fb commit 5788f02
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 116 deletions.
278 changes: 165 additions & 113 deletions cpp/include/cugraph/detail/decompress_edge_partition.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -295,68 +295,179 @@ void partially_decompress_edge_partition_to_fill_edgelist(
// major_hypersparse_first will be part of edge_partition
std::optional<std::vector<vertex_t>> local_edge_partition_segment_offsets)
{
auto execution_policy = handle.get_thrust_policy();
static_assert(detail::num_sparse_segments_per_vertex_partition == 3);
auto& comm = handle.get_comms();
auto const comm_rank = comm.get_rank();
if (segment_offsets[1] - segment_offsets[0] > 0) {
raft::grid_1d_block_t update_grid(segment_offsets[1] - segment_offsets[0],
detail::decompress_edge_partition_block_size,
handle.get_device_properties().maxGridSize[0]);
if constexpr (multi_gpu) {
auto execution_policy = handle.get_thrust_policy();
static_assert(detail::num_sparse_segments_per_vertex_partition == 3);
if (segment_offsets[1] - segment_offsets[0] > 0) {
raft::grid_1d_block_t update_grid(segment_offsets[1] - segment_offsets[0],
detail::decompress_edge_partition_block_size,
handle.get_device_properties().maxGridSize[0]);

detail::partially_decompress_to_edgelist_high_degree<<<update_grid.num_blocks,
update_grid.block_size,
0,
handle.get_stream()>>>(
edge_partition,
input_majors + segment_offsets[0],
input_major_start_offsets,
segment_offsets[1],
majors,
minors,
weights,
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[0], thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index);
}
if (segment_offsets[2] - segment_offsets[1] > 0) {
raft::grid_1d_warp_t update_grid(segment_offsets[2] - segment_offsets[1],
detail::decompress_edge_partition_block_size,
handle.get_device_properties().maxGridSize[0]);
detail::partially_decompress_to_edgelist_high_degree<<<update_grid.num_blocks,
update_grid.block_size,
0,
handle.get_stream()>>>(
edge_partition,
input_majors + segment_offsets[0],
input_major_start_offsets,
segment_offsets[1],
majors,
minors,
weights,
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[0], thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index);
}
if (segment_offsets[2] - segment_offsets[1] > 0) {
raft::grid_1d_warp_t update_grid(segment_offsets[2] - segment_offsets[1],
detail::decompress_edge_partition_block_size,
handle.get_device_properties().maxGridSize[0]);

detail::partially_decompress_to_edgelist_mid_degree<<<update_grid.num_blocks,
update_grid.block_size,
0,
handle.get_stream()>>>(
edge_partition,
input_majors + segment_offsets[1],
input_major_start_offsets + segment_offsets[1] - segment_offsets[0],
segment_offsets[2] - segment_offsets[1],
majors,
minors,
weights,
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[1], thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index);
}
if (segment_offsets[3] - segment_offsets[2] > 0) {
detail::partially_decompress_to_edgelist_mid_degree<<<update_grid.num_blocks,
update_grid.block_size,
0,
handle.get_stream()>>>(
edge_partition,
input_majors + segment_offsets[1],
input_major_start_offsets + segment_offsets[1] - segment_offsets[0],
segment_offsets[2] - segment_offsets[1],
majors,
minors,
weights,
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[1], thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index);
}
if (segment_offsets[3] - segment_offsets[2] > 0) {
thrust::for_each(
execution_policy,
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(segment_offsets[3] - segment_offsets[2]),
[edge_partition,
input_majors = input_majors + segment_offsets[2],
input_major_start_offsets =
input_major_start_offsets + segment_offsets[2] - segment_offsets[0],
majors,
minors,
output_weights = weights,
property =
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[2], thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index] __device__(auto idx) {
auto major = input_majors[idx];
auto major_offset = input_major_start_offsets[idx];
auto major_partition_offset =
static_cast<size_t>(major - edge_partition.major_range_first());
vertex_t const* indices{nullptr};
thrust::optional<weight_t const*> weights{thrust::nullopt};
edge_t local_degree{};
thrust::tie(indices, weights, local_degree) =
edge_partition.local_edges(major_partition_offset);

// FIXME: This can lead to thread divergence if local_degree varies significantly
// within threads in this warp
thrust::fill(
thrust::seq, majors + major_offset, majors + major_offset + local_degree, major);
thrust::copy(thrust::seq, indices, indices + local_degree, minors + major_offset);
if (output_weights)
thrust::copy(
thrust::seq, *weights, *weights + local_degree, *output_weights + major_offset);

if (property) {
auto major_input_property = thrust::get<0>(*property)[idx];
auto minor_output_property = thrust::get<1>(*property);
thrust::fill(thrust::seq,
minor_output_property + major_offset,
minor_output_property + major_offset + local_degree,
major_input_property);
}
if (global_edge_index) {
auto adjacency_list_offset = thrust::get<0>(*global_edge_index)[major_partition_offset];
auto minor_map = thrust::get<1>(*global_edge_index);
thrust::sequence(thrust::seq,
minor_map + major_offset,
minor_map + major_offset + local_degree,
adjacency_list_offset);
}
});
}
if (edge_partition.dcs_nzd_vertex_count() && (*(edge_partition.dcs_nzd_vertex_count()) > 0)) {
thrust::for_each(
execution_policy,
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(segment_offsets[4] - segment_offsets[3]),
[edge_partition,
input_majors = input_majors + segment_offsets[3],
input_major_start_offsets =
input_major_start_offsets + segment_offsets[3] - segment_offsets[0],
majors,
minors,
output_weights = weights,
property =
property ? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[3], thrust::get<1>(*property)))
: thrust::nullopt,
// FIXME: Once PR 2356 is merged, this parameter could go away because
// major_hypersparse_first will be part of edge_partition
segment_offsets_last = (*local_edge_partition_segment_offsets)
[detail::num_sparse_segments_per_vertex_partition],
global_edge_index] __device__(auto idx) {
auto major = input_majors[idx];
auto major_offset = input_major_start_offsets[idx];
auto major_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major);
if (major_idx) {
vertex_t const* indices{nullptr};
thrust::optional<weight_t const*> weights{thrust::nullopt};
edge_t local_degree{};
// FIXME: Once PR 2356 is merged, this computation should be changed to use
// major_hypersparse_first which will be part of edge_partition
thrust::tie(indices, weights, local_degree) =
edge_partition.local_edges(segment_offsets_last + *major_idx);
thrust::fill(
thrust::seq, majors + major_offset, majors + major_offset + local_degree, major);
thrust::copy(thrust::seq, indices, indices + local_degree, minors + major_offset);
if (output_weights)
thrust::copy(
thrust::seq, *weights, *weights + local_degree, *output_weights + major_offset);
if (property) {
auto major_input_property = thrust::get<0>(*property)[idx];
auto minor_output_property = thrust::get<1>(*property);
thrust::fill(thrust::seq,
minor_output_property + major_offset,
minor_output_property + major_offset + local_degree,
major_input_property);
}
if (global_edge_index) {
auto major_partition_offset =
static_cast<size_t>(*major_idx - edge_partition.major_range_first());
auto adjacency_list_offset =
thrust::get<0>(*global_edge_index)[major_partition_offset];
auto minor_map = thrust::get<1>(*global_edge_index);
thrust::sequence(thrust::seq,
minor_map + major_offset,
minor_map + major_offset + local_degree,
adjacency_list_offset);
}
}
});
}
} else {
thrust::for_each(
execution_policy,
handle.get_thrust_policy(),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(segment_offsets[3] - segment_offsets[2]),
thrust::make_counting_iterator(edge_partition.major_range_size()),
[edge_partition,
input_majors = input_majors + segment_offsets[2],
input_major_start_offsets =
input_major_start_offsets + segment_offsets[2] - segment_offsets[0],
input_majors,
input_major_start_offsets,
majors,
minors,
output_weights = weights,
property = property
? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[2], thrust::get<1>(*property)))
: thrust::nullopt,
property = property ? thrust::make_optional(thrust::make_tuple(thrust::get<0>(*property),
thrust::get<1>(*property)))
: thrust::nullopt,
global_edge_index] __device__(auto idx) {
auto major = input_majors[idx];
auto major_offset = input_major_start_offsets[idx];
Expand Down Expand Up @@ -395,65 +506,6 @@ void partially_decompress_edge_partition_to_fill_edgelist(
}
});
}
if (edge_partition.dcs_nzd_vertex_count() && (*(edge_partition.dcs_nzd_vertex_count()) > 0)) {
thrust::for_each(
execution_policy,
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(segment_offsets[4] - segment_offsets[3]),
[edge_partition,
input_majors = input_majors + segment_offsets[3],
input_major_start_offsets =
input_major_start_offsets + segment_offsets[3] - segment_offsets[0],
majors,
minors,
output_weights = weights,
property = property
? thrust::make_optional(thrust::make_tuple(
thrust::get<0>(*property) + segment_offsets[3], thrust::get<1>(*property)))
: thrust::nullopt,
// FIXME: Once PR 2356 is merged, this parameter could go away because
// major_hypersparse_first will be part of edge_partition
segment_offsets_last =
(*local_edge_partition_segment_offsets)[detail::num_sparse_segments_per_vertex_partition],
global_edge_index] __device__(auto idx) {
auto major = input_majors[idx];
auto major_offset = input_major_start_offsets[idx];
auto major_idx = edge_partition.major_hypersparse_idx_from_major_nocheck(major);
if (major_idx) {
vertex_t const* indices{nullptr};
thrust::optional<weight_t const*> weights{thrust::nullopt};
edge_t local_degree{};
// FIXME: Once PR 2356 is merged, this computation should be changed to use
// major_hypersparse_first which will be part of edge_partition
thrust::tie(indices, weights, local_degree) =
edge_partition.local_edges(segment_offsets_last + *major_idx);
thrust::fill(
thrust::seq, majors + major_offset, majors + major_offset + local_degree, major);
thrust::copy(thrust::seq, indices, indices + local_degree, minors + major_offset);
if (output_weights)
thrust::copy(
thrust::seq, *weights, *weights + local_degree, *output_weights + major_offset);
if (property) {
auto major_input_property = thrust::get<0>(*property)[idx];
auto minor_output_property = thrust::get<1>(*property);
thrust::fill(thrust::seq,
minor_output_property + major_offset,
minor_output_property + major_offset + local_degree,
major_input_property);
}
if (global_edge_index) {
auto major_partition_offset =
static_cast<size_t>(*major_idx - edge_partition.major_range_first());
auto adjacency_list_offset = thrust::get<0>(*global_edge_index)[major_partition_offset];
auto minor_map = thrust::get<1>(*global_edge_index);
thrust::sequence(thrust::seq,
minor_map + major_offset,
minor_map + major_offset + local_degree,
adjacency_list_offset);
}
}
});
}
}

template <typename vertex_t, typename edge_t, typename weight_t, bool multi_gpu>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/sampling/uniform_neighbor_sampling_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ uniform_nbr_sample_impl(
rmm::device_uvector<vertex_t> d_out_dst(0, handle.get_stream());
auto d_out_indices = std::make_optional(rmm::device_uvector<weight_t>(0, handle.get_stream()));

if (k_level != 0) {
if (k_level > 0) {
// extract out-degs(sources):
auto&& d_out_degs =
get_active_major_global_degrees(handle, graph_view, d_in, global_out_degrees);
Expand Down
32 changes: 30 additions & 2 deletions cpp/tests/c_api/uniform_neighbor_sample_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ int generic_uniform_neighbor_sample_test(vertex_t* h_src,

for (int i = 0; (i < result_size) && (test_ret_value == 0); ++i) {
TEST_ASSERT(test_ret_value,
M[h_srcs[i]][h_dsts[i]] > 0,
M[h_srcs[i]][h_dsts[i]] == h_index[i],
"uniform_neighbor_sample got edge that doesn't exist");
}

Expand All @@ -231,7 +231,7 @@ int test_uniform_neighbor_sample()

vertex_t src[] = {0, 1, 1, 2, 2, 2, 3, 4};
vertex_t dst[] = {1, 3, 4, 0, 1, 3, 5, 5};
edge_t edge_ids[] = {0, 1, 2, 3, 4, 5, 6, 7};
edge_t edge_ids[] = {1, 2, 3, 4, 5, 6, 7, 8};
vertex_t start[] = {2, 2};
int fan_out[] = {1, 2};

Expand All @@ -249,9 +249,37 @@ int test_uniform_neighbor_sample()
FALSE);
}

int test_uniform_neighbor_sample_all_neighbors()
{
size_t num_edges = 8;
size_t num_vertices = 6;
size_t fan_out_size = 1;
size_t num_starts = 2;

vertex_t src[] = {0, 1, 1, 2, 2, 2, 3, 4};
vertex_t dst[] = {1, 3, 4, 0, 1, 3, 5, 5};
edge_t edge_ids[] = {0, 1, 2, 3, 4, 5, 6, 7};
vertex_t start[] = {2};
int fan_out[] = {-1};

return generic_uniform_neighbor_sample_test(src,
dst,
edge_ids,
num_vertices,
num_edges,
start,
num_starts,
fan_out,
fan_out_size,
TRUE,
FALSE,
FALSE);
}

int main(int argc, char** argv)
{
int result = 0;
result |= RUN_TEST(test_uniform_neighbor_sample);
result |= RUN_TEST(test_uniform_neighbor_sample_all_neighbors);
return result;
}

0 comments on commit 5788f02

Please sign in to comment.