Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams infra + support in egonet #1435

Merged
merged 17 commits into from
Mar 5, 2021
Merged
8 changes: 6 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} --expt-extended-lambda --expt-relaxed-
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Werror=cross-execution-space-call -Wno-deprecated-declarations -Xptxas --disable-warnings")
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Xcompiler -Wall,-Wno-error=sign-compare,-Wno-error=unused-but-set-variable")


# Option to enable line info in CUDA device compilation to allow introspection when profiling /
# memchecking
option(CMAKE_CUDA_LINEINFO "Enable the -lineinfo option for nvcc (useful for cuda-memcheck / profiler" OFF)
Expand Down Expand Up @@ -298,7 +297,8 @@ else(DEFINED ENV{RAFT_PATH})
FetchContent_Declare(
raft
GIT_REPOSITORY https://github.com/rapidsai/raft.git
GIT_TAG a3461b201ea1c9f61571f1927274f739e775d2d2
GIT_TAG b055cf862a599fd45537d21a309edd8a6e06da4c

SOURCE_SUBDIR raft
)

Expand Down Expand Up @@ -446,6 +446,10 @@ target_link_directories(cugraph
#
add_dependencies(cugraph gunrock_ext)

# Per-thread default stream option see https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html
# The per-thread default stream does not synchronize with other streams
target_compile_definitions(cugraph PUBLIC CUDA_API_PER_THREAD_DEFAULT_STREAM)

###################################################################################################
# - include paths ---------------------------------------------------------------------------------
target_include_directories(cugraph
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ void katz_centrality(raft::handle_t const &handle,
* @tparam multi_gpu Flag indicating whether template instantiation should target single-GPU (false)
* or multi-GPU (true).
* @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and
* handles to various CUDA libraries) to run graph algorithms.
* handles to various CUDA libraries) to run graph algorithms. Must have at least one worker stream.
* @param graph_view Graph view object of, we extract induced egonet subgraphs from @p graph_view.
* @param source_vertex Pointer to egonet center vertices (size == @p n_subgraphs).
* @param n_subgraphs Number of induced EgoNet subgraphs to extract (ie. number of elements in @p
Expand Down
107 changes: 81 additions & 26 deletions cpp/src/community/egonet.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <utility>

#include <rmm/thrust_rmm_allocator.h>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/transform.h>
#include <ctime>

Expand All @@ -34,6 +37,8 @@
#include <experimental/graph_functions.hpp>
#include <experimental/graph_view.hpp>

#include <utilities/high_res_timer.hpp>

namespace {

/*
Expand Down Expand Up @@ -63,56 +68,106 @@ extract(
{
auto v = csr_view.get_number_of_vertices();
auto e = csr_view.get_number_of_edges();
auto stream = handle.get_stream();
auto user_stream = handle.get_stream_view();
afender marked this conversation as resolved.
Show resolved Hide resolved
float avg_degree = e / v;
rmm::device_vector<size_t> neighbors_offsets(n_subgraphs + 1);
rmm::device_vector<vertex_t> neighbors;

// It is the right thing to accept device memory for source_vertex
// FIXME consider adding a device API to BFS (ie. accept source on the device)
std::vector<vertex_t> h_source_vertex(n_subgraphs);
raft::update_host(&h_source_vertex[0], source_vertex, n_subgraphs, stream);
std::vector<size_t> neighbors_offsets_h(n_subgraphs + 1);
afender marked this conversation as resolved.
Show resolved Hide resolved

raft::update_host(&h_source_vertex[0], source_vertex, n_subgraphs, user_stream.value());
afender marked this conversation as resolved.
Show resolved Hide resolved

// Streams will allocate concurrently later
std::vector<rmm::device_uvector<vertex_t>> reached{};
reached.reserve(handle.get_num_internal_streams());

#ifdef TIMING
HighResTimer hr_timer;
hr_timer.start("ego_neighbors");
#endif

// reserve some reasonable memory, but could grow larger than that
neighbors.reserve(v + avg_degree * n_subgraphs * radius);
neighbors_offsets[0] = 0;
// each source should be done concurently in the future
for (vertex_t i = 0; i < n_subgraphs; i++) {
// get light handle from worker pool
raft::handle_t light_handle(handle, i);
auto worker_stream = light_handle.get_stream_view();

// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, worker_stream);
reached.push_back(std::move(local_reach));

// BFS with cutoff
rmm::device_vector<vertex_t> reached(v);
rmm::device_vector<vertex_t> predecessors(v); // not used
// consider adding a device API to BFS (ie. accept source on the device)
rmm::device_uvector<vertex_t> predecessors(v, worker_stream); // not used
bool direction_optimizing = false;
cugraph::experimental::bfs<vertex_t, edge_t, weight_t, false>(handle,
thrust::fill(rmm::exec_policy(worker_stream),
reached[i].begin(),
reached[i].end(),
std::numeric_limits<vertex_t>::max());
thrust::fill(
rmm::exec_policy(worker_stream), reached[i].begin(), reached[i].begin() + 100, 1.0);

cugraph::experimental::bfs<vertex_t, edge_t, weight_t, false>(light_handle,
csr_view,
reached.data().get(),
predecessors.data().get(),
reached[i].data(),
predecessors.data(),
h_source_vertex[i],
direction_optimizing,
radius);

// identify reached vertex ids from distance array
thrust::transform(rmm::exec_policy(stream)->on(stream),
thrust::transform(rmm::exec_policy(worker_stream),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(v),
reached.begin(),
reached.begin(),
reached[i].begin(),
reached[i].begin(),
[sentinel = std::numeric_limits<vertex_t>::max()] __device__(
auto id, auto val) { return val < sentinel ? id : sentinel; });

// removes unreached data
auto reached_end = thrust::remove(rmm::exec_policy(stream)->on(stream),
reached.begin(),
reached.end(),
auto reached_end = thrust::remove(rmm::exec_policy(worker_stream),
reached[i].begin(),
reached[i].end(),
std::numeric_limits<vertex_t>::max());
// release temp storage
reached[i].resize(thrust::distance(reached[i].begin(), reached_end), worker_stream);
reached[i].shrink_to_fit(worker_stream);
}

// update extraction input
size_t n_reached = thrust::distance(reached.begin(), reached_end);
neighbors_offsets[i + 1] = neighbors_offsets[i] + n_reached;
if (neighbors_offsets[i + 1] > neighbors.capacity())
neighbors.reserve(neighbors_offsets[i + 1] * 2);
neighbors.insert(neighbors.end(), reached.begin(), reached_end);
// wait on every one to identify their neighboors before proceeding to concatenation
handle.wait_on_internal_streams();

// Construct neighboors offsets (just a scan on neighborhod vector sizes)
neighbors_offsets_h[0] = 0;
for (vertex_t i = 0; i < n_subgraphs; i++) {
neighbors_offsets_h[i + 1] = neighbors_offsets_h[i] + reached[i].size();
}
raft::update_device(
neighbors_offsets.data().get(), &neighbors_offsets_h[0], n_subgraphs + 1, user_stream.value());
neighbors.resize(neighbors_offsets_h[n_subgraphs]);

// Construct the neighboors list concurrently
for (vertex_t i = 0; i < n_subgraphs; i++) {
raft::handle_t light_handle(handle, i);
auto worker_stream = light_handle.get_stream_view();
thrust::copy(rmm::exec_policy(worker_stream),
reached[i].begin(),
reached[i].end(),
neighbors.begin() + neighbors_offsets_h[i]);

// reached info is not needed anymore
reached[i].resize(0, worker_stream);
reached[i].shrink_to_fit(worker_stream);
}

// wait on every one before proceeding to grouped extraction
handle.wait_on_internal_streams();

#ifdef TIMING
hr_timer.stop();
hr_timer.display(std::cout);
#endif

// extract
return cugraph::experimental::extract_induced_subgraphs(
handle, csr_view, neighbors_offsets.data().get(), neighbors.data().get(), n_subgraphs);
Expand Down Expand Up @@ -207,4 +262,4 @@ extract_ego(raft::handle_t const &,
int64_t,
int64_t);
} // namespace experimental
} // namespace cugraph
} // namespace cugraph
11 changes: 10 additions & 1 deletion cpp/src/experimental/induced_subgraph.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <tuple>

#include <utilities/high_res_timer.hpp>

namespace cugraph {
namespace experimental {

Expand All @@ -52,6 +54,10 @@ extract_induced_subgraphs(
size_t num_subgraphs,
bool do_expensive_check)
{
#ifdef TIMING
HighResTimer hr_timer;
hr_timer.start("extract_induced_subgraphs");
#endif
// FIXME: this code is inefficient for the vertices with their local degrees much larger than the
// number of vertices in the subgraphs (in this case, searching that the subgraph vertices are
// included in the local neighbors is more efficient than searching the local neighbors are
Expand Down Expand Up @@ -244,7 +250,10 @@ extract_induced_subgraphs(
subgraph_offsets + (num_subgraphs + 1),
subgraph_vertex_output_offsets.begin(),
subgraph_edge_offsets.begin());

#ifdef TIMING
hr_timer.stop();
hr_timer.display(std::cout);
#endif
return std::make_tuple(std::move(edge_majors),
std::move(edge_minors),
std::move(edge_weights),
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/utilities/high_res_timer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
#include <map>
#include <string>

//#define TIMING

class HighResTimer {
public:
HighResTimer() : timers() {}
Expand Down
Loading