Skip to content

Commit

Permalink
Revert "Apply modifications to account for RAFT changes (rapidsai#1707)"
Browse files Browse the repository at this point in the history
This reverts commit 65ca876.
  • Loading branch information
dantegd committed Aug 28, 2021
1 parent 65ca876 commit 9d606fa
Show file tree
Hide file tree
Showing 75 changed files with 598 additions and 499 deletions.
15 changes: 7 additions & 8 deletions cpp/include/cugraph/detail/graph_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,19 @@ rmm::device_uvector<edge_t> compute_major_degrees(
[(detail::num_sparse_segments_per_vertex_partition + 2) * i +
detail::num_sparse_segments_per_vertex_partition]
: major_last;
auto execution_policy = handle.get_thrust_policy();
thrust::transform(execution_policy,
thrust::transform(rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(major_hypersparse_first - major_first),
local_degrees.begin(),
[p_offsets] __device__(auto i) { return p_offsets[i + 1] - p_offsets[i]; });
if (use_dcs) {
auto p_dcs_nzd_vertices = (*adj_matrix_partition_dcs_nzd_vertices)[i];
auto dcs_nzd_vertex_count = (*adj_matrix_partition_dcs_nzd_vertex_counts)[i];
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
local_degrees.begin() + (major_hypersparse_first - major_first),
local_degrees.begin() + (major_last - major_first),
edge_t{0});
thrust::for_each(execution_policy,
thrust::for_each(rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(dcs_nzd_vertex_count),
[p_offsets,
Expand Down Expand Up @@ -124,10 +123,10 @@ rmm::device_uvector<edge_t> compute_major_degrees(raft::handle_t const& handle,
vertex_t number_of_vertices)
{
rmm::device_uvector<edge_t> degrees(number_of_vertices, handle.get_stream());
thrust::tabulate(
handle.get_thrust_policy(), degrees.begin(), degrees.end(), [offsets] __device__(auto i) {
return offsets[i + 1] - offsets[i];
});
thrust::tabulate(rmm::exec_policy(handle.get_stream()),
degrees.begin(),
degrees.end(),
[offsets] __device__(auto i) { return offsets[i + 1] - offsets[i]; });
return degrees;
}

Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
: graph_view.get_number_of_local_adj_matrix_partition_rows());
thrust::copy(handle.get_thrust_policy(),
thrust::copy(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
matrix_major_value_output_first);
Expand Down Expand Up @@ -169,7 +169,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
});
// FIXME: this gather (and temporary buffer) is unnecessary if NCCL directly takes a
// permutation iterator (and directly gathers to the internal buffer)
thrust::gather(handle.get_thrust_policy(),
thrust::gather(rmm::exec_policy(handle.get_stream()),
map_first,
map_first + thrust::distance(vertex_first, vertex_last),
vertex_value_input_first,
Expand All @@ -190,7 +190,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -203,7 +203,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -226,7 +226,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
? graph_view.get_number_of_local_adj_matrix_partition_cols()
: graph_view.get_number_of_local_adj_matrix_partition_rows());
auto val_first = thrust::make_permutation_iterator(vertex_value_input_first, vertex_first);
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
val_first,
val_first + thrust::distance(vertex_first, vertex_last),
vertex_first,
Expand Down Expand Up @@ -290,7 +290,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_rows()
: graph_view.get_number_of_local_adj_matrix_partition_cols());
thrust::copy(handle.get_thrust_policy(),
thrust::copy(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
matrix_minor_value_output_first);
Expand Down Expand Up @@ -360,7 +360,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this gather (and temporary buffer) is unnecessary if NCCL directly takes a
// permutation iterator (and directly gathers to the internal buffer)
thrust::gather(handle.get_thrust_policy(),
thrust::gather(rmm::exec_policy(handle.get_stream()),
map_first,
map_first + thrust::distance(vertex_first, vertex_last),
vertex_value_input_first,
Expand All @@ -380,7 +380,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -392,7 +392,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -414,7 +414,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() ==
graph_view.get_number_of_local_adj_matrix_partition_rows());
auto val_first = thrust::make_permutation_iterator(vertex_value_input_first, vertex_first);
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
val_first,
val_first + thrust::distance(vertex_first, vertex_last),
vertex_first,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,13 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
minor_init = (row_comm_rank == 0) ? init : T{};
}

auto execution_policy = handle.get_thrust_policy();
if (GraphViewType::is_multi_gpu) {
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
minor_buffer_first,
minor_buffer_first + minor_tmp_buffer_size,
minor_init);
} else {
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
minor_init);
Expand Down Expand Up @@ -547,7 +546,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
if constexpr (update_major) { // this is necessary as we don't visit every vertex in the
// hypersparse segment in
// for_all_major_for_all_nbr_hypersparse
thrust::fill(handle.get_thrust_policy(),
thrust::fill(rmm::exec_policy(handle.get_stream()),
output_buffer_first + (*segment_offsets)[3],
output_buffer_first + (*segment_offsets)[4],
major_init);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
vertex_t* majors,
std::optional<std::vector<vertex_t>> const& segment_offsets)
{
auto execution_policy = handle.get_thrust_policy();
if (segment_offsets) {
// FIXME: we may further improve performance by 1) concurrently running kernels on different
// segments; 2) individually tuning block sizes for different segments; and 3) adding one more
Expand Down Expand Up @@ -154,7 +153,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
}
if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(matrix_partition.get_major_first()) + (*segment_offsets)[2],
thrust::make_counting_iterator(matrix_partition.get_major_first()) + (*segment_offsets)[3],
[matrix_partition, majors] __device__(auto major) {
Expand All @@ -168,7 +167,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
if (matrix_partition.get_dcs_nzd_vertex_count() &&
(*(matrix_partition.get_dcs_nzd_vertex_count()) > 0)) {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(*(matrix_partition.get_dcs_nzd_vertex_count())),
[matrix_partition, major_start_offset = (*segment_offsets)[3], majors] __device__(
Expand All @@ -184,7 +183,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
}
} else {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(matrix_partition.get_major_first()),
thrust::make_counting_iterator(matrix_partition.get_major_first()) +
matrix_partition.get_major_size(),
Expand Down Expand Up @@ -341,13 +340,12 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
}
// FIXME: these copies are unnecessary, better fix RAFT comm's bcast to take separate input &
// output pointers
auto execution_policy = handle.get_thrust_policy();
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
map_key_first,
map_key_last,
map_keys.begin() + map_displacements[row_comm_rank]);
thrust::copy(
execution_policy,
rmm::exec_policy(handle.get_stream()),
map_value_first,
map_value_first + thrust::distance(map_key_first, map_key_last),
get_dataframe_buffer_begin<value_t>(map_value_buffer) + map_displacements[row_comm_rank]);
Expand Down Expand Up @@ -422,13 +420,12 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
matrix_partition.get_indices(),
detail::minor_to_key_t<VertexIterator0>{adj_matrix_col_key_first,
matrix_partition.get_minor_first()});
auto execution_policy = handle.get_thrust_policy();
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
minor_key_first,
minor_key_first + matrix_partition.get_number_of_edges(),
tmp_minor_keys.begin());
if (graph_view.is_weighted()) {
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
*(matrix_partition.get_weights()),
*(matrix_partition.get_weights()) + matrix_partition.get_number_of_edges(),
tmp_key_aggregated_edge_weights.begin());
Expand All @@ -451,24 +448,25 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto output_key_first = thrust::make_zip_iterator(
thrust::make_tuple(reduced_major_vertices.begin(), reduced_minor_keys.begin()));
if (graph_view.is_weighted()) {
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
tmp_key_aggregated_edge_weights.begin());
reduced_size = thrust::distance(
output_key_first,
thrust::get<0>(thrust::reduce_by_key(execution_policy,
thrust::get<0>(thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
tmp_key_aggregated_edge_weights.begin(),
output_key_first,
reduced_key_aggregated_edge_weights.begin())));
} else {
thrust::sort(
execution_policy, input_key_first, input_key_first + tmp_major_vertices.size());
thrust::sort(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size());
reduced_size = thrust::distance(
output_key_first,
thrust::get<0>(thrust::reduce_by_key(execution_policy,
thrust::get<0>(thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
thrust::make_constant_iterator(weight_t{1.0}),
Expand Down Expand Up @@ -517,15 +515,14 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(rx_major_vertices.begin(), rx_minor_keys.begin()));
auto execution_policy = handle.get_thrust_policy();
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
pair_first,
pair_first + rx_major_vertices.size(),
rx_key_aggregated_edge_weights.begin());
tmp_major_vertices.resize(rx_major_vertices.size(), handle.get_stream());
tmp_minor_keys.resize(tmp_major_vertices.size(), handle.get_stream());
tmp_key_aggregated_edge_weights.resize(tmp_major_vertices.size(), handle.get_stream());
auto pair_it = thrust::reduce_by_key(execution_policy,
auto pair_it = thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
pair_first,
pair_first + rx_major_vertices.size(),
rx_key_aggregated_edge_weights.begin(),
Expand All @@ -549,7 +546,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto triplet_first = thrust::make_zip_iterator(thrust::make_tuple(
tmp_major_vertices.begin(), tmp_minor_keys.begin(), tmp_key_aggregated_edge_weights.begin()));
thrust::transform(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
triplet_first,
triplet_first + tmp_major_vertices.size(),
tmp_e_op_result_buffer_first,
Expand Down Expand Up @@ -635,18 +632,17 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
#endif
}

auto execution_policy = handle.get_thrust_policy();
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
T{});
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
major_vertices.begin(),
major_vertices.end(),
get_dataframe_buffer_begin<T>(e_op_result_buffer));

auto num_uniques = thrust::count_if(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(major_vertices.size()),
[major_vertices = major_vertices.data()] __device__(auto i) {
Expand All @@ -662,13 +658,13 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
: invalid_vertex_id<vertex_t>::value;
});
thrust::copy_if(
execution_policy,
rmm::exec_policy(handle.get_stream()),
major_vertex_first,
major_vertex_first + major_vertices.size(),
unique_major_vertices.begin(),
[] __device__(auto major) { return major != invalid_vertex_id<vertex_t>::value; });
thrust::reduce_by_key(
execution_policy,
rmm::exec_policy(handle.get_stream()),
major_vertices.begin(),
major_vertices.end(),
get_dataframe_buffer_begin<T>(e_op_result_buffer),
Expand All @@ -684,7 +680,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
thrust::equal_to<vertex_t>{},
reduce_op);

thrust::transform(execution_policy,
thrust::transform(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
vertex_value_output_first,
Expand Down
5 changes: 3 additions & 2 deletions cpp/include/cugraph/prims/count_if_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle,
VertexOp v_op)
{
auto count =
thrust::count_if(handle.get_thrust_policy(),
thrust::count_if(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op);
Expand Down Expand Up @@ -92,7 +92,8 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle,
InputIterator input_last,
VertexOp v_op)
{
auto count = thrust::count_if(handle.get_thrust_policy(), input_first, input_last, v_op);
auto count =
thrust::count_if(rmm::exec_policy(handle.get_stream()), input_first, input_last, v_op);
if (GraphViewType::is_multi_gpu) {
count = host_scalar_allreduce(handle.get_comms(), count, handle.get_stream());
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cugraph/prims/reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ T reduce_v(raft::handle_t const& handle,
T init)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() == 0)) ? init : T{},
Expand Down Expand Up @@ -89,7 +89,7 @@ T reduce_v(raft::handle_t const& handle,
T init)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() == 0)) ? init : T{},
Expand Down
Loading

0 comments on commit 9d606fa

Please sign in to comment.