Skip to content

Commit

Permalink
Raft Handle Updates to cuGraph (#1894)
Browse files Browse the repository at this point in the history
  • Loading branch information
divyegala authored Dec 14, 2021
1 parent 5e41117 commit 7197258
Show file tree
Hide file tree
Showing 56 changed files with 290 additions and 296 deletions.
7 changes: 7 additions & 0 deletions ci/cpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ conda list --show-channel-urls
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

# FIXME: for now, force the building of all packages so they are built on a
# machine with a single CUDA version, then have the gpu/build.sh script simply
# install. This should eliminate a mismatch between different CUDA versions on
# cpu vs. gpu builds that is problematic with CUDA 11.5 Enhanced Compat.
BUILD_LIBCUGRAPH=1
BUILD_CUGRAPH=1

###############################################################################
# BUILD - Conda package builds
###############################################################################
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/pylibcugraph/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ requirements:
- ucx-py 0.24
- ucx-proc=*=gpu
- cudatoolkit {{ cuda_version }}.*
- rmm {{ minor_version }}.*
run:
- python x.x
- libcugraph={{ version }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
handle.get_stream());
}

handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream
handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand All @@ -284,7 +284,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer)));
kv_map_ptr->insert(pair_first, pair_first + map_keys.size());
} else {
handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream
handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();

rmm::device_uvector<vertex_t> frontier_vertices(local_frontier_sizes[i],
handle.get_stream_view());
rmm::device_uvector<vertex_t> frontier_vertices(local_frontier_sizes[i], handle.get_stream());
device_bcast(col_comm,
local_frontier_vertex_first,
frontier_vertices.data(),
Expand Down Expand Up @@ -1096,7 +1095,7 @@ void update_frontier_v_push_if_out_nbr(
d_tx_buffer_last_boundaries.data(),
d_tx_buffer_last_boundaries.size(),
handle.get_stream());
handle.get_stream_view().synchronize();
handle.sync_stream();
std::vector<size_t> tx_counts(h_tx_buffer_last_boundaries.size());
std::adjacent_difference(
h_tx_buffer_last_boundaries.begin(), h_tx_buffer_last_boundaries.end(), tx_counts.begin());
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cugraph/prims/vertex_frontier.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class VertexFrontier {
h_indices.data(), d_indices.data(), d_indices.size(), handle_ptr_->get_stream());
raft::update_host(
h_counts.data(), d_counts.data(), d_counts.size(), handle_ptr_->get_stream());
handle_ptr_->get_stream_view().synchronize();
handle_ptr_->sync_stream();

size_t offset{0};
for (size_t i = 0; i < h_indices.size(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/c_api/array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ extern "C" cugraph_error_code_t cugraph_type_erased_device_array_create(
size_t n_bytes = n_elems * (::data_type_sz[dtype]);

auto ret_value = new cugraph::c_api::cugraph_type_erased_device_array_t(
n_elems, n_bytes, dtype, raft_handle->get_stream_view());
n_elems, n_bytes, dtype, raft_handle->get_stream());

*array = reinterpret_cast<cugraph_type_erased_device_array_t*>(ret_value);
return CUGRAPH_SUCCESS;
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/community/legacy/ecg.cu
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ class EcgLouvain : public cugraph::legacy::Louvain<graph_type> {

void initialize_dendrogram_level(vertex_t num_vertices) override
{
this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream_view());
this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream());

get_permutation_vector(num_vertices,
seed_,
this->dendrogram_->current_level_begin(),
this->handle_.get_stream_view());
get_permutation_vector(
num_vertices, seed_, this->dendrogram_->current_level_begin(), this->handle_.get_stream());
}

private:
Expand All @@ -147,7 +145,7 @@ void ecg(raft::handle_t const& handle,
"Invalid input argument: clustering is NULL, should be a device pointer to "
"memory for storing the result");

rmm::device_uvector<weight_t> ecg_weights_v(graph.number_of_edges, handle.get_stream_view());
rmm::device_uvector<weight_t> ecg_weights_v(graph.number_of_edges, handle.get_stream());

thrust::copy(handle.get_thrust_policy(),
graph.edge_data,
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/community/legacy/egonet.cu
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ extract(raft::handle_t const& handle,
vertex_t radius)
{
auto v = csr_view.get_number_of_vertices();
auto user_stream_view = handle.get_stream_view();
auto user_stream_view = handle.get_stream();
rmm::device_vector<size_t> neighbors_offsets(n_subgraphs + 1);
rmm::device_vector<vertex_t> neighbors;

Expand All @@ -77,7 +77,7 @@ extract(raft::handle_t const& handle,
reached.reserve(n_subgraphs);
for (vertex_t i = 0; i < n_subgraphs; i++) {
// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, handle.get_internal_stream_view(i));
rmm::device_uvector<vertex_t> local_reach(v, handle.get_next_usable_stream(i));
reached.push_back(std::move(local_reach));
}

Expand All @@ -89,8 +89,8 @@ extract(raft::handle_t const& handle,

for (vertex_t i = 0; i < n_subgraphs; i++) {
// get light handle from worker pool
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();
raft::handle_t light_handle(handle.get_next_usable_stream(i));
auto worker_stream_view = light_handle.get_stream();

// BFS with cutoff
// consider adding a device API to BFS (ie. accept source on the device)
Expand Down Expand Up @@ -132,7 +132,7 @@ extract(raft::handle_t const& handle,
}

// wait on every one to identify their neighboors before proceeding to concatenation
handle.wait_on_internal_streams();
handle.sync_stream_pool();

// Construct neighboors offsets (just a scan on neighborhod vector sizes)
h_neighbors_offsets[0] = 0;
Expand All @@ -148,7 +148,7 @@ extract(raft::handle_t const& handle,

// Construct the neighboors list concurrently
for (vertex_t i = 0; i < n_subgraphs; i++) {
auto worker_stream_view = handle.get_internal_stream_view(i);
auto worker_stream_view = handle.get_next_usable_stream(i);
thrust::copy(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
Expand All @@ -160,7 +160,7 @@ extract(raft::handle_t const& handle,
}

// wait on every one before proceeding to grouped extraction
handle.wait_on_internal_streams();
handle.sync_stream_pool();

#ifdef TIMING
hr_timer.stop();
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/community/legacy/leiden.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ class Leiden : public Louvain<graph_type> {
this->timer_start("update_clustering_constrained");

rmm::device_uvector<vertex_t> next_cluster_v(this->dendrogram_->current_level_size(),
this->handle_.get_stream_view());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, this->handle_.get_stream_view());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges,
this->handle_.get_stream_view());
this->handle_.get_stream());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, this->handle_.get_stream());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, this->handle_.get_stream());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices,
this->handle_.get_stream_view());
this->handle_.get_stream());

vertex_t const* d_src_indices = this->src_indices_v_.data();
vertex_t const* d_dst_indices = graph.indices;
Expand Down Expand Up @@ -105,7 +104,7 @@ class Leiden : public Louvain<graph_type> {
}
}

this->timer_stop(this->handle_.get_stream_view());
this->timer_stop(this->handle_.get_stream());
return cur_Q;
}

Expand Down Expand Up @@ -134,8 +133,7 @@ class Leiden : public Louvain<graph_type> {
//
// Initialize every cluster to reference each vertex to itself
//
this->dendrogram_->add_level(
0, current_graph.number_of_vertices, this->handle_.get_stream_view());
this->dendrogram_->add_level(0, current_graph.number_of_vertices, this->handle_.get_stream());

thrust::sequence(this->handle_.get_thrust_policy(),
this->dendrogram_->current_level_begin(),
Expand Down
66 changes: 31 additions & 35 deletions cpp/src/community/legacy/louvain.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class Louvain {
// to change the logic to populate this properly
// in generate_superverticies_graph.
//
offsets_v_(graph.number_of_vertices + 1, handle.get_stream_view()),
indices_v_(graph.number_of_edges, handle.get_stream_view()),
weights_v_(graph.number_of_edges, handle.get_stream_view()),
src_indices_v_(graph.number_of_edges, handle.get_stream_view()),
vertex_weights_v_(graph.number_of_vertices, handle.get_stream_view()),
cluster_weights_v_(graph.number_of_vertices, handle.get_stream_view()),
tmp_arr_v_(graph.number_of_vertices, handle.get_stream_view()),
cluster_inverse_v_(graph.number_of_vertices, handle.get_stream_view()),
offsets_v_(graph.number_of_vertices + 1, handle.get_stream()),
indices_v_(graph.number_of_edges, handle.get_stream()),
weights_v_(graph.number_of_edges, handle.get_stream()),
src_indices_v_(graph.number_of_edges, handle.get_stream()),
vertex_weights_v_(graph.number_of_vertices, handle.get_stream()),
cluster_weights_v_(graph.number_of_vertices, handle.get_stream()),
tmp_arr_v_(graph.number_of_vertices, handle.get_stream()),
cluster_inverse_v_(graph.number_of_vertices, handle.get_stream()),
number_of_vertices_(graph.number_of_vertices),
number_of_edges_(graph.number_of_edges)
{
Expand Down Expand Up @@ -90,8 +90,8 @@ class Louvain {
{
vertex_t n_verts = graph.number_of_vertices;

rmm::device_uvector<weight_t> inc(n_verts, handle_.get_stream_view());
rmm::device_uvector<weight_t> deg(n_verts, handle_.get_stream_view());
rmm::device_uvector<weight_t> inc(n_verts, handle_.get_stream());
rmm::device_uvector<weight_t> deg(n_verts, handle_.get_stream());

thrust::fill(handle_.get_thrust_policy(), inc.begin(), inc.end(), weight_t{0.0});
thrust::fill(handle_.get_thrust_policy(), deg.begin(), deg.end(), weight_t{0.0});
Expand Down Expand Up @@ -211,7 +211,7 @@ class Louvain {

virtual void initialize_dendrogram_level(vertex_t num_vertices)
{
dendrogram_->add_level(0, num_vertices, handle_.get_stream_view());
dendrogram_->add_level(0, num_vertices, handle_.get_stream());

thrust::sequence(handle_.get_thrust_policy(),
dendrogram_->current_level_begin(),
Expand Down Expand Up @@ -245,7 +245,7 @@ class Louvain {
d_cluster_weights[src] = sum;
});

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
}

virtual weight_t update_clustering(weight_t total_edge_weight,
Expand All @@ -255,11 +255,10 @@ class Louvain {
timer_start("update_clustering");

rmm::device_uvector<vertex_t> next_cluster_v(dendrogram_->current_level_size(),
handle_.get_stream_view());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices,
handle_.get_stream_view());
handle_.get_stream());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices, handle_.get_stream());

vertex_t* d_cluster = dendrogram_->current_level_begin();
weight_t const* d_vertex_weights = vertex_weights_v_.data();
Expand Down Expand Up @@ -301,7 +300,7 @@ class Louvain {
}
}

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
return cur_Q;
}

Expand Down Expand Up @@ -409,12 +408,9 @@ class Louvain {
rmm::device_uvector<weight_t>& delta_Q_v,
bool up_down)
{
rmm::device_uvector<vertex_t> temp_vertices_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<vertex_t> temp_cluster_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<weight_t> temp_delta_Q_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<vertex_t> temp_vertices_v(graph.number_of_vertices, handle_.get_stream());
rmm::device_uvector<vertex_t> temp_cluster_v(graph.number_of_vertices, handle_.get_stream());
rmm::device_uvector<weight_t> temp_delta_Q_v(graph.number_of_vertices, handle_.get_stream());

thrust::fill(
handle_.get_thrust_policy(), temp_cluster_v.begin(), temp_cluster_v.end(), vertex_t{-1});
Expand Down Expand Up @@ -479,12 +475,12 @@ class Louvain {

// renumber the clusters to the range 0..(num_clusters-1)
vertex_t num_clusters = renumber_clusters();
cluster_weights_v_.resize(num_clusters, handle_.get_stream_view());
cluster_weights_v_.resize(num_clusters, handle_.get_stream());

// shrink our graph to represent the graph of supervertices
generate_superverticies_graph(graph, num_clusters);

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
}

vertex_t renumber_clusters()
Expand Down Expand Up @@ -526,7 +522,7 @@ class Louvain {
[d_cluster_inverse] __device__(const vertex_t idx) { return d_cluster_inverse[idx] == 1; });

vertex_t new_num_clusters = thrust::distance(tmp_arr_v_.begin(), copy_end);
tmp_arr_v_.resize(new_num_clusters, handle_.get_stream_view());
tmp_arr_v_.resize(new_num_clusters, handle_.get_stream());

//
// Now we can set each value in cluster_inverse of a cluster to its index
Expand All @@ -545,16 +541,16 @@ class Louvain {
d_cluster[i] = d_cluster_inverse[d_cluster[i]];
});

cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream_view());
cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream());

return new_num_clusters;
}

void generate_superverticies_graph(graph_t& graph, vertex_t num_clusters)
{
rmm::device_uvector<vertex_t> new_src_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> new_dst_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<weight_t> new_weight_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> new_src_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<vertex_t> new_dst_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<weight_t> new_weight_v(graph.number_of_edges, handle_.get_stream());

//
// Renumber the COO
Expand Down Expand Up @@ -609,11 +605,11 @@ class Louvain {
graph.offsets,
num_clusters,
graph.number_of_edges,
handle_.get_stream_view());
handle_.get_stream());

src_indices_v_.resize(graph.number_of_edges, handle_.get_stream_view());
indices_v_.resize(graph.number_of_edges, handle_.get_stream_view());
weights_v_.resize(graph.number_of_edges, handle_.get_stream_view());
src_indices_v_.resize(graph.number_of_edges, handle_.get_stream());
indices_v_.resize(graph.number_of_edges, handle_.get_stream());
weights_v_.resize(graph.number_of_edges, handle_.get_stream());
}

protected:
Expand Down
Loading

0 comments on commit 7197258

Please sign in to comment.