Skip to content

Commit

Permalink
Streams infra + support in egonet (#1435)
Browse files Browse the repository at this point in the history
- Stream synchronization behavior: switched to per-thread default stream instead of the legacy default stream
- Update raft tag
- EgoNet upgrade to use `uvector` instead of `device_vector`
- EgoNet upgrade to execute on a different stream for each seed
- Perf analysis timers/app for EgoNet

Concurrency is limited by the number of available blocks on the device. Thrust-based codes may request a lot of blocks without a way to control this. In practice, smaller graphs leverage concurrency better than larger ones where tasks may end up waiting for available resources.

We may wait on #1407 before reconciling and merging this
Close #957

Authors:
  - Alex Fender (@afender)

Approvers:
  - Seunghwa Kang (@seunghwak)
  - Andrei Schaffer (@aschaffer)

URL: #1435
  • Loading branch information
afender authored Mar 5, 2021
1 parent b7e6809 commit e525040
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 46 deletions.
8 changes: 6 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} --expt-extended-lambda --expt-relaxed-
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Werror=cross-execution-space-call -Wno-deprecated-declarations -Xptxas --disable-warnings")
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Xcompiler -Wall,-Wno-error=sign-compare,-Wno-error=unused-but-set-variable")


# Option to enable line info in CUDA device compilation to allow introspection when profiling /
# memchecking
option(CMAKE_CUDA_LINEINFO "Enable the -lineinfo option for nvcc (useful for cuda-memcheck / profiler" OFF)
Expand Down Expand Up @@ -298,7 +297,8 @@ else(DEFINED ENV{RAFT_PATH})
FetchContent_Declare(
raft
GIT_REPOSITORY https://github.com/rapidsai/raft.git
GIT_TAG a3461b201ea1c9f61571f1927274f739e775d2d2
GIT_TAG b055cf862a599fd45537d21a309edd8a6e06da4c

SOURCE_SUBDIR raft
)

Expand Down Expand Up @@ -446,6 +446,10 @@ target_link_directories(cugraph
#
add_dependencies(cugraph gunrock_ext)

# Per-thread default stream option see https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html
# The per-thread default stream does not synchronize with other streams
target_compile_definitions(cugraph PUBLIC CUDA_API_PER_THREAD_DEFAULT_STREAM)

###################################################################################################
# - include paths ---------------------------------------------------------------------------------
target_include_directories(cugraph
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ void katz_centrality(raft::handle_t const &handle,
* @tparam multi_gpu Flag indicating whether template instantiation should target single-GPU (false)
* or multi-GPU (true).
* @param handle RAFT handle object to encapsulate resources (e.g. CUDA stream, communicator, and
* handles to various CUDA libraries) to run graph algorithms.
* handles to various CUDA libraries) to run graph algorithms. Must have at least one worker stream.
* @param graph_view Graph view object of, we extract induced egonet subgraphs from @p graph_view.
* @param source_vertex Pointer to egonet center vertices (size == @p n_subgraphs).
* @param n_subgraphs Number of induced EgoNet subgraphs to extract (ie. number of elements in @p
Expand Down
116 changes: 87 additions & 29 deletions cpp/src/community/egonet.cu
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <utility>

#include <rmm/thrust_rmm_allocator.h>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/transform.h>
#include <ctime>

Expand All @@ -34,6 +37,8 @@
#include <experimental/graph_functions.hpp>
#include <experimental/graph_view.hpp>

#include <utilities/high_res_timer.hpp>

namespace {

/*
Expand Down Expand Up @@ -61,58 +66,111 @@ extract(
vertex_t n_subgraphs,
vertex_t radius)
{
auto v = csr_view.get_number_of_vertices();
auto e = csr_view.get_number_of_edges();
auto stream = handle.get_stream();
float avg_degree = e / v;
auto v = csr_view.get_number_of_vertices();
auto e = csr_view.get_number_of_edges();
auto user_stream_view = handle.get_stream_view();
rmm::device_vector<size_t> neighbors_offsets(n_subgraphs + 1);
rmm::device_vector<vertex_t> neighbors;

// It is the right thing to accept device memory for source_vertex
// FIXME consider adding a device API to BFS (ie. accept source on the device)
std::vector<vertex_t> h_source_vertex(n_subgraphs);
raft::update_host(&h_source_vertex[0], source_vertex, n_subgraphs, stream);
std::vector<size_t> h_neighbors_offsets(n_subgraphs + 1);

raft::update_host(&h_source_vertex[0], source_vertex, n_subgraphs, user_stream_view.value());

// Streams will allocate concurrently later
std::vector<rmm::device_uvector<vertex_t>> reached{};
reached.reserve(handle.get_num_internal_streams());

// reserve some reasonable memory, but could grow larger than that
neighbors.reserve(v + avg_degree * n_subgraphs * radius);
neighbors_offsets[0] = 0;
// each source should be done concurently in the future
// h_source_vertex[i] is used by other streams in the for loop
user_stream_view.synchronize();
#ifdef TIMING
HighResTimer hr_timer;
hr_timer.start("ego_neighbors");
#endif
for (vertex_t i = 0; i < n_subgraphs; i++) {
// get light handle from worker pool
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();

// Allocations and operations are attached to the worker stream
rmm::device_uvector<vertex_t> local_reach(v, worker_stream_view);
reached.push_back(std::move(local_reach));

// BFS with cutoff
rmm::device_vector<vertex_t> reached(v);
rmm::device_vector<vertex_t> predecessors(v); // not used
// consider adding a device API to BFS (ie. accept source on the device)
rmm::device_uvector<vertex_t> predecessors(v, worker_stream_view); // not used
bool direction_optimizing = false;
cugraph::experimental::bfs<vertex_t, edge_t, weight_t, false>(handle,
thrust::fill(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
std::numeric_limits<vertex_t>::max());
thrust::fill(
rmm::exec_policy(worker_stream_view), reached[i].begin(), reached[i].begin() + 100, 1.0);

cugraph::experimental::bfs<vertex_t, edge_t, weight_t, false>(light_handle,
csr_view,
reached.data().get(),
predecessors.data().get(),
reached[i].data(),
predecessors.data(),
h_source_vertex[i],
direction_optimizing,
radius);

// identify reached vertex ids from distance array
thrust::transform(rmm::exec_policy(stream)->on(stream),
thrust::transform(rmm::exec_policy(worker_stream_view),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(v),
reached.begin(),
reached.begin(),
reached[i].begin(),
reached[i].begin(),
[sentinel = std::numeric_limits<vertex_t>::max()] __device__(
auto id, auto val) { return val < sentinel ? id : sentinel; });

// removes unreached data
auto reached_end = thrust::remove(rmm::exec_policy(stream)->on(stream),
reached.begin(),
reached.end(),
auto reached_end = thrust::remove(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
std::numeric_limits<vertex_t>::max());
// release temp storage
reached[i].resize(thrust::distance(reached[i].begin(), reached_end), worker_stream_view);
reached[i].shrink_to_fit(worker_stream_view);
}

// update extraction input
size_t n_reached = thrust::distance(reached.begin(), reached_end);
neighbors_offsets[i + 1] = neighbors_offsets[i] + n_reached;
if (neighbors_offsets[i + 1] > neighbors.capacity())
neighbors.reserve(neighbors_offsets[i + 1] * 2);
neighbors.insert(neighbors.end(), reached.begin(), reached_end);
// wait on every one to identify their neighboors before proceeding to concatenation
handle.wait_on_internal_streams();

// Construct neighboors offsets (just a scan on neighborhod vector sizes)
h_neighbors_offsets[0] = 0;
for (vertex_t i = 0; i < n_subgraphs; i++) {
h_neighbors_offsets[i + 1] = h_neighbors_offsets[i] + reached[i].size();
}
raft::update_device(neighbors_offsets.data().get(),
&h_neighbors_offsets[0],
n_subgraphs + 1,
user_stream_view.value());
neighbors.resize(h_neighbors_offsets[n_subgraphs]);
user_stream_view.synchronize();

// Construct the neighboors list concurrently
for (vertex_t i = 0; i < n_subgraphs; i++) {
raft::handle_t light_handle(handle, i);
auto worker_stream_view = light_handle.get_stream_view();
thrust::copy(rmm::exec_policy(worker_stream_view),
reached[i].begin(),
reached[i].end(),
neighbors.begin() + h_neighbors_offsets[i]);

// reached info is not needed anymore
reached[i].resize(0, worker_stream_view);
reached[i].shrink_to_fit(worker_stream_view);
}

// wait on every one before proceeding to grouped extraction
handle.wait_on_internal_streams();

#ifdef TIMING
hr_timer.stop();
hr_timer.display(std::cout);
#endif

// extract
return cugraph::experimental::extract_induced_subgraphs(
handle, csr_view, neighbors_offsets.data().get(), neighbors.data().get(), n_subgraphs);
Expand Down Expand Up @@ -207,4 +265,4 @@ extract_ego(raft::handle_t const &,
int64_t,
int64_t);
} // namespace experimental
} // namespace cugraph
} // namespace cugraph
11 changes: 10 additions & 1 deletion cpp/src/experimental/induced_subgraph.cu
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

#include <tuple>

#include <utilities/high_res_timer.hpp>

namespace cugraph {
namespace experimental {

Expand All @@ -52,6 +54,10 @@ extract_induced_subgraphs(
size_t num_subgraphs,
bool do_expensive_check)
{
#ifdef TIMING
HighResTimer hr_timer;
hr_timer.start("extract_induced_subgraphs");
#endif
// FIXME: this code is inefficient for the vertices with their local degrees much larger than the
// number of vertices in the subgraphs (in this case, searching that the subgraph vertices are
// included in the local neighbors is more efficient than searching the local neighbors are
Expand Down Expand Up @@ -244,7 +250,10 @@ extract_induced_subgraphs(
subgraph_offsets + (num_subgraphs + 1),
subgraph_vertex_output_offsets.begin(),
subgraph_edge_offsets.begin());

#ifdef TIMING
hr_timer.stop();
hr_timer.display(std::cout);
#endif
return std::make_tuple(std::move(edge_majors),
std::move(edge_minors),
std::move(edge_weights),
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/utilities/high_res_timer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
#include <map>
#include <string>

//#define TIMING

class HighResTimer {
public:
HighResTimer() : timers() {}
Expand Down
Loading

0 comments on commit e525040

Please sign in to comment.