Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft Handle Updates to cuGraph #1894

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ci/cpu/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ conda list --show-channel-urls
# FIX Added to deal with Anancoda SSL verification issues during conda builds
conda config --set ssl_verify False

# FIXME: for now, force the building of all packages so they are built on a
# machine with a single CUDA version, then have the gpu/build.sh script simply
# install. This should eliminate a mismatch between different CUDA versions on
# cpu vs. gpu builds that is problematic with CUDA 11.5 Enhanced Compat.
BUILD_LIBCUGRAPH=1
BUILD_CUGRAPH=1

###############################################################################
# BUILD - Conda package builds
###############################################################################
Expand Down
1 change: 1 addition & 0 deletions conda/recipes/pylibcugraph/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ requirements:
- ucx-py 0.24
- ucx-proc=*=gpu
- cudatoolkit {{ cuda_version }}.*
- rmm {{ minor_version }}.*
run:
- python x.x
- libcugraph={{ version }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
handle.get_stream());
}

handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream
handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand All @@ -284,7 +284,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
thrust::make_tuple(map_keys.begin(), get_dataframe_buffer_begin(map_value_buffer)));
kv_map_ptr->insert(pair_first, pair_first + map_keys.size());
} else {
handle.get_stream_view().synchronize(); // cuco::static_map currently does not take stream
handle.sync_stream(); // cuco::static_map currently does not take stream

kv_map_ptr.reset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,7 @@ typename GraphViewType::edge_type compute_num_out_nbrs_from_frontier(
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();

rmm::device_uvector<vertex_t> frontier_vertices(local_frontier_sizes[i],
handle.get_stream_view());
rmm::device_uvector<vertex_t> frontier_vertices(local_frontier_sizes[i], handle.get_stream());
device_bcast(col_comm,
local_frontier_vertex_first,
frontier_vertices.data(),
Expand Down Expand Up @@ -1096,7 +1095,7 @@ void update_frontier_v_push_if_out_nbr(
d_tx_buffer_last_boundaries.data(),
d_tx_buffer_last_boundaries.size(),
handle.get_stream());
handle.get_stream_view().synchronize();
handle.sync_stream();
std::vector<size_t> tx_counts(h_tx_buffer_last_boundaries.size());
std::adjacent_difference(
h_tx_buffer_last_boundaries.begin(), h_tx_buffer_last_boundaries.end(), tx_counts.begin());
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/cugraph/prims/vertex_frontier.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ class VertexFrontier {
h_indices.data(), d_indices.data(), d_indices.size(), handle_ptr_->get_stream());
raft::update_host(
h_counts.data(), d_counts.data(), d_counts.size(), handle_ptr_->get_stream());
handle_ptr_->get_stream_view().synchronize();
handle_ptr_->sync_stream();

size_t offset{0};
for (size_t i = 0; i < h_indices.size(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/c_api/array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ extern "C" cugraph_error_code_t cugraph_type_erased_device_array_create(
size_t n_bytes = n_elems * (::data_type_sz[dtype]);

auto ret_value = new cugraph::c_api::cugraph_type_erased_device_array_t(
n_elems, n_bytes, dtype, raft_handle->get_stream_view());
n_elems, n_bytes, dtype, raft_handle->get_stream());

*array = reinterpret_cast<cugraph_type_erased_device_array_t*>(ret_value);
return CUGRAPH_SUCCESS;
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/community/legacy/ecg.cu
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,10 @@ class EcgLouvain : public cugraph::legacy::Louvain<graph_type> {

void initialize_dendrogram_level(vertex_t num_vertices) override
{
this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream_view());
this->dendrogram_->add_level(0, num_vertices, this->handle_.get_stream());

get_permutation_vector(num_vertices,
seed_,
this->dendrogram_->current_level_begin(),
this->handle_.get_stream_view());
get_permutation_vector(
num_vertices, seed_, this->dendrogram_->current_level_begin(), this->handle_.get_stream());
}

private:
Expand All @@ -147,7 +145,7 @@ void ecg(raft::handle_t const& handle,
"Invalid input argument: clustering is NULL, should be a device pointer to "
"memory for storing the result");

rmm::device_uvector<weight_t> ecg_weights_v(graph.number_of_edges, handle.get_stream_view());
rmm::device_uvector<weight_t> ecg_weights_v(graph.number_of_edges, handle.get_stream());

thrust::copy(handle.get_thrust_policy(),
graph.edge_data,
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/community/legacy/egonet.cu
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ extract(raft::handle_t const& handle,
vertex_t radius)
{
auto v = csr_view.get_number_of_vertices();
auto user_stream_view = handle.get_stream_view();
auto user_stream_view = handle.get_stream();
rmm::device_vector<size_t> neighbors_offsets(n_subgraphs + 1);
rmm::device_vector<vertex_t> neighbors;

Expand All @@ -77,7 +77,7 @@ extract(raft::handle_t const& handle,
reached.reserve(n_subgraphs);
for (vertex_t i = 0; i < n_subgraphs; i++) {
// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, handle.get_internal_stream_view(i));
rmm::device_uvector<vertex_t> local_reach(v, handle.get_next_usable_stream(i));
reached.push_back(std::move(local_reach));
}

Expand All @@ -89,8 +89,8 @@ extract(raft::handle_t const& handle,

for (vertex_t i = 0; i < n_subgraphs; i++) {
// get light handle from worker pool
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();
raft::handle_t light_handle(handle.get_next_usable_stream(i));
auto worker_stream_view = light_handle.get_stream();

// BFS with cutoff
// consider adding a device API to BFS (ie. accept source on the device)
Expand Down Expand Up @@ -132,7 +132,7 @@ extract(raft::handle_t const& handle,
}

// wait on every one to identify their neighboors before proceeding to concatenation
handle.wait_on_internal_streams();
handle.sync_stream_pool();

// Construct neighboors offsets (just a scan on neighborhod vector sizes)
h_neighbors_offsets[0] = 0;
Expand All @@ -148,7 +148,7 @@ extract(raft::handle_t const& handle,

// Construct the neighboors list concurrently
for (vertex_t i = 0; i < n_subgraphs; i++) {
auto worker_stream_view = handle.get_internal_stream_view(i);
auto worker_stream_view = handle.get_next_usable_stream(i);
thrust::copy(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
Expand All @@ -160,7 +160,7 @@ extract(raft::handle_t const& handle,
}

// wait on every one before proceeding to grouped extraction
handle.wait_on_internal_streams();
handle.sync_stream_pool();

#ifdef TIMING
hr_timer.stop();
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/community/legacy/leiden.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,11 @@ class Leiden : public Louvain<graph_type> {
this->timer_start("update_clustering_constrained");

rmm::device_uvector<vertex_t> next_cluster_v(this->dendrogram_->current_level_size(),
this->handle_.get_stream_view());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, this->handle_.get_stream_view());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges,
this->handle_.get_stream_view());
this->handle_.get_stream());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, this->handle_.get_stream());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, this->handle_.get_stream());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices,
this->handle_.get_stream_view());
this->handle_.get_stream());

vertex_t const* d_src_indices = this->src_indices_v_.data();
vertex_t const* d_dst_indices = graph.indices;
Expand Down Expand Up @@ -105,7 +104,7 @@ class Leiden : public Louvain<graph_type> {
}
}

this->timer_stop(this->handle_.get_stream_view());
this->timer_stop(this->handle_.get_stream());
return cur_Q;
}

Expand Down Expand Up @@ -134,8 +133,7 @@ class Leiden : public Louvain<graph_type> {
//
// Initialize every cluster to reference each vertex to itself
//
this->dendrogram_->add_level(
0, current_graph.number_of_vertices, this->handle_.get_stream_view());
this->dendrogram_->add_level(0, current_graph.number_of_vertices, this->handle_.get_stream());

thrust::sequence(this->handle_.get_thrust_policy(),
this->dendrogram_->current_level_begin(),
Expand Down
66 changes: 31 additions & 35 deletions cpp/src/community/legacy/louvain.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ class Louvain {
// to change the logic to populate this properly
// in generate_superverticies_graph.
//
offsets_v_(graph.number_of_vertices + 1, handle.get_stream_view()),
indices_v_(graph.number_of_edges, handle.get_stream_view()),
weights_v_(graph.number_of_edges, handle.get_stream_view()),
src_indices_v_(graph.number_of_edges, handle.get_stream_view()),
vertex_weights_v_(graph.number_of_vertices, handle.get_stream_view()),
cluster_weights_v_(graph.number_of_vertices, handle.get_stream_view()),
tmp_arr_v_(graph.number_of_vertices, handle.get_stream_view()),
cluster_inverse_v_(graph.number_of_vertices, handle.get_stream_view()),
offsets_v_(graph.number_of_vertices + 1, handle.get_stream()),
indices_v_(graph.number_of_edges, handle.get_stream()),
weights_v_(graph.number_of_edges, handle.get_stream()),
src_indices_v_(graph.number_of_edges, handle.get_stream()),
vertex_weights_v_(graph.number_of_vertices, handle.get_stream()),
cluster_weights_v_(graph.number_of_vertices, handle.get_stream()),
tmp_arr_v_(graph.number_of_vertices, handle.get_stream()),
cluster_inverse_v_(graph.number_of_vertices, handle.get_stream()),
number_of_vertices_(graph.number_of_vertices),
number_of_edges_(graph.number_of_edges)
{
Expand Down Expand Up @@ -90,8 +90,8 @@ class Louvain {
{
vertex_t n_verts = graph.number_of_vertices;

rmm::device_uvector<weight_t> inc(n_verts, handle_.get_stream_view());
rmm::device_uvector<weight_t> deg(n_verts, handle_.get_stream_view());
rmm::device_uvector<weight_t> inc(n_verts, handle_.get_stream());
rmm::device_uvector<weight_t> deg(n_verts, handle_.get_stream());

thrust::fill(handle_.get_thrust_policy(), inc.begin(), inc.end(), weight_t{0.0});
thrust::fill(handle_.get_thrust_policy(), deg.begin(), deg.end(), weight_t{0.0});
Expand Down Expand Up @@ -211,7 +211,7 @@ class Louvain {

virtual void initialize_dendrogram_level(vertex_t num_vertices)
{
dendrogram_->add_level(0, num_vertices, handle_.get_stream_view());
dendrogram_->add_level(0, num_vertices, handle_.get_stream());

thrust::sequence(handle_.get_thrust_policy(),
dendrogram_->current_level_begin(),
Expand Down Expand Up @@ -245,7 +245,7 @@ class Louvain {
d_cluster_weights[src] = sum;
});

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
}

virtual weight_t update_clustering(weight_t total_edge_weight,
Expand All @@ -255,11 +255,10 @@ class Louvain {
timer_start("update_clustering");

rmm::device_uvector<vertex_t> next_cluster_v(dendrogram_->current_level_size(),
handle_.get_stream_view());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices,
handle_.get_stream_view());
handle_.get_stream());
rmm::device_uvector<weight_t> delta_Q_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<vertex_t> cluster_hash_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<weight_t> old_cluster_sum_v(graph.number_of_vertices, handle_.get_stream());

vertex_t* d_cluster = dendrogram_->current_level_begin();
weight_t const* d_vertex_weights = vertex_weights_v_.data();
Expand Down Expand Up @@ -301,7 +300,7 @@ class Louvain {
}
}

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
return cur_Q;
}

Expand Down Expand Up @@ -409,12 +408,9 @@ class Louvain {
rmm::device_uvector<weight_t>& delta_Q_v,
bool up_down)
{
rmm::device_uvector<vertex_t> temp_vertices_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<vertex_t> temp_cluster_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<weight_t> temp_delta_Q_v(graph.number_of_vertices,
handle_.get_stream_view());
rmm::device_uvector<vertex_t> temp_vertices_v(graph.number_of_vertices, handle_.get_stream());
rmm::device_uvector<vertex_t> temp_cluster_v(graph.number_of_vertices, handle_.get_stream());
rmm::device_uvector<weight_t> temp_delta_Q_v(graph.number_of_vertices, handle_.get_stream());

thrust::fill(
handle_.get_thrust_policy(), temp_cluster_v.begin(), temp_cluster_v.end(), vertex_t{-1});
Expand Down Expand Up @@ -479,12 +475,12 @@ class Louvain {

// renumber the clusters to the range 0..(num_clusters-1)
vertex_t num_clusters = renumber_clusters();
cluster_weights_v_.resize(num_clusters, handle_.get_stream_view());
cluster_weights_v_.resize(num_clusters, handle_.get_stream());

// shrink our graph to represent the graph of supervertices
generate_superverticies_graph(graph, num_clusters);

timer_stop(handle_.get_stream_view());
timer_stop(handle_.get_stream());
}

vertex_t renumber_clusters()
Expand Down Expand Up @@ -526,7 +522,7 @@ class Louvain {
[d_cluster_inverse] __device__(const vertex_t idx) { return d_cluster_inverse[idx] == 1; });

vertex_t new_num_clusters = thrust::distance(tmp_arr_v_.begin(), copy_end);
tmp_arr_v_.resize(new_num_clusters, handle_.get_stream_view());
tmp_arr_v_.resize(new_num_clusters, handle_.get_stream());

//
// Now we can set each value in cluster_inverse of a cluster to its index
Expand All @@ -545,16 +541,16 @@ class Louvain {
d_cluster[i] = d_cluster_inverse[d_cluster[i]];
});

cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream_view());
cluster_inverse_v_.resize(new_num_clusters, handle_.get_stream());

return new_num_clusters;
}

void generate_superverticies_graph(graph_t& graph, vertex_t num_clusters)
{
rmm::device_uvector<vertex_t> new_src_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> new_dst_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<weight_t> new_weight_v(graph.number_of_edges, handle_.get_stream_view());
rmm::device_uvector<vertex_t> new_src_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<vertex_t> new_dst_v(graph.number_of_edges, handle_.get_stream());
rmm::device_uvector<weight_t> new_weight_v(graph.number_of_edges, handle_.get_stream());

//
// Renumber the COO
Expand Down Expand Up @@ -609,11 +605,11 @@ class Louvain {
graph.offsets,
num_clusters,
graph.number_of_edges,
handle_.get_stream_view());
handle_.get_stream());

src_indices_v_.resize(graph.number_of_edges, handle_.get_stream_view());
indices_v_.resize(graph.number_of_edges, handle_.get_stream_view());
weights_v_.resize(graph.number_of_edges, handle_.get_stream_view());
src_indices_v_.resize(graph.number_of_edges, handle_.get_stream());
indices_v_.resize(graph.number_of_edges, handle_.get_stream());
weights_v_.resize(graph.number_of_edges, handle_.get_stream());
}

protected:
Expand Down
Loading