diff --git a/CHANGELOG.md b/CHANGELOG.md index fc6e2e72173..c647ef78c8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - PR #840 OPG degree ## Improvements +- PR #817 Add native Betweenness Centrality with sources subset - PR #764 Updated sssp and bfs with GraphCSR, removed gdf_column, added nullptr weights test for sssp - PR #765 Remove gdf_column from connected components - PR #780 Remove gdf_column from cuhornet features diff --git a/cpp/include/algorithms.hpp b/cpp/include/algorithms.hpp index 7f3b8445679..4791e2d5380 100644 --- a/cpp/include/algorithms.hpp +++ b/cpp/include/algorithms.hpp @@ -186,13 +186,18 @@ void overlap_list(experimental::GraphCSRView const &graph, VT const *second, WT *result); +enum class cugraph_bc_implem_t { + CUGRAPH_DEFAULT = 0, ///> Native cugraph implementation + CUGRAPH_GUNROCK ///> Gunrock implementation +}; + /** * @brief Compute betweenness centrality for a graph * * Betweenness centrality for a vertex is the sum of the fraction of * all pairs shortest paths that pass through the vertex. * - * Note that gunrock (current implementation) does not support a weighted graph. + * Note that both the native and the gunrock implementations do not support a weighted graph. * * @throws cugraph::logic_error with a custom message when an error * occurs. @@ -202,7 +207,8 @@ void overlap_list(experimental::GraphCSRView const &graph, * @tparam ET Type of edge identifiers. Supported value : int (signed, * 32-bit) * @tparam WT Type of edge weights. Supported values : float or double. - * @tparam result_t Type of computed result. Supported values : float + * @tparam result_t Type of computed result. Supported values : float or double + * (double only supported in default implementation) * * @param[in] graph cuGRAPH graph descriptor, should contain the connectivity * information as a CSR @@ -212,19 +218,23 @@ void overlap_list(experimental::GraphCSRView const &graph, * @param[in] endpoints If true, include endpoints of paths in score, if false do not * @param[in] weight If specified, device array of weights for each edge * @param[in] k If specified, number of vertex samples defined in the vertices - * array - * @param[in] vertices If specified, device array of sampled vertex ids to estimate - * betweenness centrality. + * array. + * @param[in] vertices If specified, host array of vertex ids to estimate betweenness + * centrality, these vertices will serve as sources for the traversal algorihtm to obtain + * shortest path counters. + * @param[in] implem Cugraph currently supports 2 implementations: native and + * gunrock * */ template void betweenness_centrality(experimental::GraphCSRView const &graph, result_t *result, - bool normalized = true, - bool endpoints = false, - WT const *weight = nullptr, - VT k = 0, - VT const *vertices = nullptr); + bool normalized = true, + bool endpoints = false, + WT const *weight = nullptr, + VT k = 0, + VT const *vertices = nullptr, + cugraph_bc_implem_t implem = cugraph_bc_implem_t::CUGRAPH_DEFAULT); enum class cugraph_cc_t { CUGRAPH_WEAK = 0, ///> Weakly Connected Components @@ -435,7 +445,9 @@ void sssp(experimental::GraphCSRView const &graph, VT *predecessors, const VT source_vertex); -// TODO: Either distances is in VT or in WT, even if there should be no weights +// FIXME: Internally distances is of int (signed 32-bit) data type, but current +// template uses data from VT, ET, WT from he GraphCSR View even if weights +// are not considered /** * @Synopsis Performs a breadth first search traversal of a graph starting from a vertex. * @@ -450,12 +462,15 @@ void sssp(experimental::GraphCSRView const &graph, * @param[in] graph cuGRAPH graph descriptor, should contain the connectivity * information as a CSR * - * @param[out] distances If set to a valid column, this is populated by distance of every - * vertex in the graph from the starting vertex + * @param[out] distances If set to a valid pointer, this is populated by distance of + * every vertex in the graph from the starting vertex * - * @param[out] predecessors If set to a valid column, this is populated by bfs traversal + * @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal * predecessor of every vertex * + * @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal + * shortest_path counter of every vertex + * * @param[in] start_vertex The starting vertex for breadth first search traversal * * @param[in] directed Treat the input graph as directed @@ -466,6 +481,7 @@ template void bfs(experimental::GraphCSRView const &graph, VT *distances, VT *predecessors, + double *sp_counters, const VT start_vertex, bool directed = true); diff --git a/cpp/include/graph.hpp b/cpp/include/graph.hpp index 96dc4501b24..d7b1a2838ac 100644 --- a/cpp/include/graph.hpp +++ b/cpp/include/graph.hpp @@ -30,7 +30,7 @@ enum class PropType { PROP_UNDEF, PROP_FALSE, PROP_TRUE }; struct GraphProperties { bool directed{false}; bool weighted{false}; - bool multigraph{multigraph}; + bool multigraph{false}; bool bipartite{false}; bool tree{false}; PropType has_negative_edges{PropType::PROP_UNDEF}; diff --git a/cpp/src/centrality/betweenness_centrality.cu b/cpp/src/centrality/betweenness_centrality.cu index 810245bb597..e7bb5a7803d 100644 --- a/cpp/src/centrality/betweenness_centrality.cu +++ b/cpp/src/centrality/betweenness_centrality.cu @@ -26,10 +26,281 @@ #include +#include "betweenness_centrality.cuh" + namespace cugraph { +namespace detail { + +template +void BC::setup() +{ + // --- Set up parameters from graph adjList --- + number_of_vertices = graph.number_of_vertices; + number_of_edges = graph.number_of_edges; + offsets_ptr = graph.offsets; + indices_ptr = graph.indices; +} + +template +void BC::configure(result_t *_betweenness, + bool _normalized, + bool _endpoints, + WT const *_weights, + VT const *_sources, + VT _number_of_sources) +{ + // --- Bind betweenness output vector to internal --- + betweenness = _betweenness; + normalized = _normalized; + endpoints = _endpoints; + sources = _sources; + number_of_sources = _number_of_sources; + edge_weights_ptr = _weights; + + // --- Working data allocation --- + distances_vec.resize(number_of_vertices); + predecessors_vec.resize(number_of_vertices); + sp_counters_vec.resize(number_of_vertices); + deltas_vec.resize(number_of_vertices); + + distances = distances_vec.data().get(); + predecessors = predecessors_vec.data().get(); + sp_counters = sp_counters_vec.data().get(); + deltas = deltas_vec.data().get(); + + // --- Get Device Information --- + CUDA_TRY(cudaGetDevice(&device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&max_grid_dim_1D, cudaDevAttrMaxGridDimX, device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&max_block_dim_1D, cudaDevAttrMaxBlockDimX, device_id)); + + // --- Confirm that configuration went through --- + configured = true; +} + +// Dependecy Accumulation: McLaughlin and Bader, 2018 +// NOTE: Accumulation kernel might not scale well, as each thread is handling +// all the edges for each node, an approach similar to the traversal +// bucket (i.e. BFS / SSSP) system might enable speed up +// NOTE: Shortest Path counter can increase extremely fast, thus double are used +// however, the user might want to get the result back in float +// we delay casting the result until dependecy accumulation +template +__global__ void accumulation_kernel(result_t *betweenness, + VT number_vertices, + VT const *indices, + ET const *offsets, + VT *distances, + double *sp_counters, + double *deltas, + VT source, + VT depth) +{ + for (int tid = blockIdx.x * blockDim.x + threadIdx.x; tid < number_vertices; + tid += gridDim.x * blockDim.x) { + VT w = tid; + double dsw = 0; + double sw = sp_counters[w]; + if (distances[w] == depth) { // Process nodes at this depth + ET edge_start = offsets[w]; + ET edge_end = offsets[w + 1]; + ET edge_count = edge_end - edge_start; + for (ET edge_idx = 0; edge_idx < edge_count; ++edge_idx) { // Visit neighbors + VT v = indices[edge_start + edge_idx]; + if (distances[v] == distances[w] + 1) { + double factor = (static_cast(1) + deltas[v]) / sp_counters[v]; + dsw += sw * factor; + } + } + deltas[w] = dsw; + } + } +} + +template +void BC::accumulate(result_t *betweenness, + VT *distances, + double *sp_counters, + double *deltas, + VT source, + VT max_depth) +{ + dim3 grid, block; + block.x = max_block_dim_1D; + grid.x = min(max_grid_dim_1D, (number_of_edges / block.x + 1)); + // Step 1) Dependencies (deltas) are initialized to 0 before starting + thrust::fill(rmm::exec_policy(stream)->on(stream), + deltas, + deltas + number_of_vertices, + static_cast(0)); + // Step 2) Process each node, -1 is used to notify unreached nodes in the sssp + for (VT depth = max_depth; depth > 0; --depth) { + accumulation_kernel<<>>(betweenness, + number_of_vertices, + graph.indices, + graph.offsets, + distances, + sp_counters, + deltas, + source, + depth); + } + + thrust::transform(rmm::exec_policy(stream)->on(stream), + deltas, + deltas + number_of_vertices, + betweenness, + betweenness, + thrust::plus()); +} + +// We do not verifiy the graph structure as the new graph structure +// enforces CSR Format + +// FIXME: Having a system that relies on an class might make it harder to +// dispatch later +template +void BC::compute_single_source(VT source_vertex) +{ + // Step 1) Singe-source shortest-path problem + cugraph::bfs(graph, distances, predecessors, sp_counters, source_vertex, graph.prop.directed); + + // FIXME: Remove that with a BC specific class to gather + // information during traversal + + // Numeric max value is replaced by -1 as we look for the maximal depth of + // the traversal, this value is avalaible within the bfs implementation and + // there could be a way to access it directly and avoid both replace and the + // max + thrust::replace(rmm::exec_policy(stream)->on(stream), + distances, + distances + number_of_vertices, + std::numeric_limits::max(), + static_cast(-1)); + auto current_max_depth = thrust::max_element( + rmm::exec_policy(stream)->on(stream), distances, distances + number_of_vertices); + VT max_depth = 0; + cudaMemcpy(&max_depth, current_max_depth, sizeof(VT), cudaMemcpyDeviceToHost); + // Step 2) Dependency accumulation + accumulate(betweenness, distances, sp_counters, deltas, source_vertex, max_depth); +} + +template +void BC::compute() +{ + CUGRAPH_EXPECTS(configured, "BC must be configured before computation"); + // If sources is defined we only process vertices contained in it + thrust::fill(rmm::exec_policy(stream)->on(stream), + betweenness, + betweenness + number_of_vertices, + static_cast(0)); + cudaStreamSynchronize(stream); + if (sources) { + for (VT source_idx = 0; source_idx < number_of_sources; ++source_idx) { + VT source_vertex = sources[source_idx]; + compute_single_source(source_vertex); + } + } else { // Otherwise process every vertices + // NOTE: Maybe we could still use number of sources and set it to number_of_vertices? + // It woudl imply having a host vector of size |V| + // But no need for the if/ else statement + for (VT source_vertex = 0; source_vertex < number_of_vertices; ++source_vertex) { + compute_single_source(source_vertex); + } + } + rescale(); +} + +template +void BC::rescale() +{ + thrust::device_vector normalizer(number_of_vertices); + bool modified = false; + result_t rescale_factor = static_cast(1); + result_t casted_number_of_vertices = static_cast(number_of_vertices); + result_t casted_number_of_sources = static_cast(number_of_sources); + if (normalized) { + if (number_of_vertices > 2) { + rescale_factor /= ((casted_number_of_vertices - 1) * (casted_number_of_vertices - 2)); + modified = true; + } + } else { + if (!graph.prop.directed) { + rescale_factor /= static_cast(2); + modified = true; + } + } + if (modified) { + if (number_of_sources > 0) { + rescale_factor *= (casted_number_of_vertices / casted_number_of_sources); + } + } + thrust::fill(normalizer.begin(), normalizer.end(), rescale_factor); + thrust::transform(rmm::exec_policy(stream)->on(stream), + betweenness, + betweenness + number_of_vertices, + normalizer.begin(), + betweenness, + thrust::multiplies()); +} + +template +void verify_input(result_t *result, + bool normalize, + bool endpoints, + WT const *weights, + VT const number_of_sources, + VT const *sources) +{ + CUGRAPH_EXPECTS(result != nullptr, "Invalid API parameter: output betwenness is nullptr"); + if (typeid(VT) != typeid(int)) { + CUGRAPH_FAIL("Unsupported vertex id data type, please use int"); + } + if (typeid(ET) != typeid(int)) { CUGRAPH_FAIL("Unsupported edge id data type, please use int"); } + if (typeid(WT) != typeid(float) && typeid(WT) != typeid(double)) { + CUGRAPH_FAIL("Unsupported weight data type, please use float or double"); + } + if (typeid(result_t) != typeid(float) && typeid(result_t) != typeid(double)) { + CUGRAPH_FAIL("Unsupported result data type, please use float or double"); + } + if (number_of_sources < 0) { + CUGRAPH_FAIL("Number of sources must be positive or equal to 0."); + } else if (number_of_sources != 0) { + CUGRAPH_EXPECTS(sources != nullptr, + "sources cannot be null if number_of_source is different from 0."); + } + if (endpoints) { CUGRAPH_FAIL("Endpoints option is currently not supported."); } +} +/** + * ---------------------------------------------------------------------------* + * @brief Native betweenness centrality + * + * @file betweenness_centrality.cu + * --------------------------------------------------------------------------*/ +template +void betweenness_centrality(experimental::GraphCSRView const &graph, + result_t *result, + bool normalize, + bool endpoints, + WT const *weight, + VT const number_of_sources, + VT const *sources) +{ + // Current Implementation relies on BFS + // FIXME: For SSSP version + // Brandes Algorithm expects non negative weights for the accumulation + verify_input( + result, normalize, endpoints, weight, number_of_sources, sources); + cugraph::detail::BC bc(graph); + bc.configure(result, normalize, endpoints, weight, sources, number_of_sources); + bc.compute(); +} +} // namespace detail namespace gunrock { +// NOTE: sample_seeds is not really available anymore, as it has been +// replaced by k and vertices parameters, delegating the random +// generation to somewhere else (i.e python's side) template void betweenness_centrality(experimental::GraphCSRView const &graph, result_t *result, @@ -51,7 +322,7 @@ void betweenness_centrality(experimental::GraphCSRView const &graph, // std::vector v_offsets(graph.number_of_vertices + 1); std::vector v_indices(graph.number_of_edges); - std::vector v_result(graph.number_of_vertices); + std::vector v_result(graph.number_of_vertices); std::vector v_sigmas(graph.number_of_vertices); std::vector v_labels(graph.number_of_vertices); @@ -84,29 +355,41 @@ void betweenness_centrality(experimental::GraphCSRView const &graph, CUDA_TRY(cudaMemcpy( result, v_result.data(), sizeof(result_t) * graph.number_of_vertices, cudaMemcpyHostToDevice)); - // normalize result + // Rescale result (Based on normalize and directed/undirected) if (normalize) { - float denominator = (graph.number_of_vertices - 1) * (graph.number_of_vertices - 2); + if (graph.number_of_vertices > 2) { + float denominator = (graph.number_of_vertices - 1) * (graph.number_of_vertices - 2); - thrust::transform(rmm::exec_policy(stream)->on(stream), - result, - result + graph.number_of_vertices, - result, - [denominator] __device__(float f) { return (f * 2) / denominator; }); + thrust::transform(rmm::exec_policy(stream)->on(stream), + result, + result + graph.number_of_vertices, + result, + [denominator] __device__(float f) { return (f * 2) / denominator; }); + } } else { // // gunrock answer needs to be doubled to match networkx // - thrust::transform(rmm::exec_policy(stream)->on(stream), - result, - result + graph.number_of_vertices, - result, - [] __device__(float f) { return (f * 2); }); + if (graph.prop.directed) { + thrust::transform(rmm::exec_policy(stream)->on(stream), + result, + result + graph.number_of_vertices, + result, + [] __device__(float f) { return (f * 2); }); + } } } } // namespace gunrock +/** + * @param[out] result array(number_of_vertices) + * @param[in] normalize bool True -> Apply normalization + * @param[in] endpoints (NIY) bool Include endpoints + * @param[in] weights (NIY) array(number_of_edges) Weights to use + * @param[in] k Number of sources + * @param[in] vertices array(k) Sources for traversal + */ template void betweenness_centrality(experimental::GraphCSRView const &graph, result_t *result, @@ -114,8 +397,16 @@ void betweenness_centrality(experimental::GraphCSRView const &graph, bool endpoints, WT const *weight, VT k, - VT const *vertices) + VT const *vertices, + cugraph_bc_implem_t implem) { + // FIXME: Gunrock call returns float and not result_t hence the implementation + // switch + if ((typeid(result_t) == typeid(double)) && (implem == cugraph_bc_implem_t::CUGRAPH_GUNROCK)) { + implem = cugraph_bc_implem_t::CUGRAPH_DEFAULT; + std::cerr << "[WARN] result_t type is 'double', switching to default " + << "implementation" << std::endl; + } // // NOTE: gunrock implementation doesn't yet support the unused parameters: // - endpoints @@ -125,7 +416,15 @@ void betweenness_centrality(experimental::GraphCSRView const &graph, // // These parameters are present in the API to support future features. // - gunrock::betweenness_centrality(graph, result, normalize); + if (implem == cugraph_bc_implem_t::CUGRAPH_DEFAULT) { + detail::betweenness_centrality(graph, result, normalize, endpoints, weight, k, vertices); + } else if (implem == cugraph_bc_implem_t::CUGRAPH_GUNROCK) { + gunrock::betweenness_centrality(graph, result, normalize); + } else { + CUGRAPH_FAIL( + "Invalid Betweenness Centrality implementation, please refer to cugraph_bc_implem_t for " + "valid implementations"); + } } template void betweenness_centrality( @@ -135,6 +434,16 @@ template void betweenness_centrality( bool, float const *, int, - int const *); + int const *, + cugraph_bc_implem_t); +template void betweenness_centrality( + experimental::GraphCSRView const &, + double *, + bool, + bool, + double const *, + int, + int const *, + cugraph_bc_implem_t); } // namespace cugraph diff --git a/cpp/src/centrality/betweenness_centrality.cuh b/cpp/src/centrality/betweenness_centrality.cuh new file mode 100644 index 00000000000..a5030d85437 --- /dev/null +++ b/cpp/src/centrality/betweenness_centrality.cuh @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Author: Xavier Cadet xcadet@nvidia.com +#pragma once + +namespace cugraph { +namespace detail { +template +class BC { + private: + // --- Information concerning the graph --- + const experimental::GraphCSRView &graph; + // --- These information are extracted on setup --- + VT number_of_vertices; // Number of vertices in the graph + VT number_of_edges; // Number of edges in the graph + ET const *offsets_ptr; // Pointer to the offsets + VT const *indices_ptr; // Pointers to the indices + + // --- Information from configuration --- + bool configured = false; // Flag to ensure configuration was called + bool normalized = false; // If True normalize the betweenness + // FIXME: For weighted version + WT const *edge_weights_ptr = nullptr; // Pointer to the weights + bool endpoints = false; // If True normalize the betweenness + VT const *sources = nullptr; // Subset of vertices to gather information from + VT number_of_sources; // Number of vertices in sources + + // --- Output ---- + // betweenness is set/read by users - using Vectors + result_t *betweenness = nullptr; + + // --- Data required to perform computation ---- + rmm::device_vector distances_vec; + rmm::device_vector predecessors_vec; + rmm::device_vector sp_counters_vec; + rmm::device_vector deltas_vec; + + VT *distances = nullptr; // array(|V|) stores the distances gathered by the latest SSSP + VT *predecessors = nullptr; // array(|V|) stores the predecessors of the latest SSSP + double *sp_counters = + nullptr; // array(|V|) stores the shortest path counter for the latest SSSP + double *deltas = nullptr; // array(|V|) stores the dependencies for the latest SSSP + + // FIXME: This should be replaced using RAFT handle + int device_id = 0; + int max_grid_dim_1D = 0; + int max_block_dim_1D = 0; + cudaStream_t stream; + + // ----------------------------------------------------------------------- + void setup(); // Saves information related to the graph itself + + void accumulate(result_t *betweenness, + VT *distances, + double *sp_counters, + double *deltas, + VT source, + VT max_depth); + void compute_single_source(VT source_vertex); + void rescale(); + + public: + virtual ~BC(void) {} + BC(experimental::GraphCSRView const &_graph, cudaStream_t _stream = 0) + : graph(_graph), stream(_stream) + { + setup(); + } + void configure(result_t *betweenness, + bool normalize, + bool endpoints, + WT const *weigth, + VT const *sources, + VT const number_of_sources); + void compute(); +}; +} // namespace detail +} // namespace cugraph \ No newline at end of file diff --git a/cpp/src/traversal/bfs.cu b/cpp/src/traversal/bfs.cu index 8d2792bd720..f28f3ec936a 100644 --- a/cpp/src/traversal/bfs.cu +++ b/cpp/src/traversal/bfs.cu @@ -42,6 +42,7 @@ void BFS::setup() // size of bitmaps for vertices vertices_bmap_size = (n / (8 * sizeof(int)) + 1); // ith bit of visited_bmap is set <=> ith vertex is visited + ALLOC_TRY(&visited_bmap, sizeof(int) * vertices_bmap_size, nullptr); // ith bit of isolated_bmap is set <=> degree of ith vertex = 0 @@ -71,9 +72,11 @@ void BFS::setup() unvisited_queue = buffer_np1_1; // size of the "last" unvisited queue : size_last_unvisited_queue // refers to the size of unvisited_queue - // which may not be up to date (the queue may contains vertices that are now visited) + // which may not be up to date (the queue may contains vertices that are now + // visited) - // We may leave vertices unvisited after bottom up main kernels - storing them here + // We may leave vertices unvisited after bottom up main kernels - storing them + // here left_unvisited_queue = buffer_np1_2; // We use buckets of edges (32 edges per bucket for now, see exact macro in bfs_kernels). @@ -95,7 +98,8 @@ void BFS::setup() d_left_unvisited_cnt = &d_counters_pad[3]; // Lets use this int* for the next 3 lines - // Its dereferenced value is not initialized - so we dont care about what we put in it + // Its dereferenced value is not initialized - so we dont care about what we + // put in it IndexType *d_nisolated = d_new_frontier_cnt; cudaMemsetAsync(d_nisolated, 0, sizeof(IndexType), stream); @@ -110,11 +114,15 @@ void BFS::setup() } template -void BFS::configure(IndexType *_distances, IndexType *_predecessors, int *_edge_mask) +void BFS::configure(IndexType *_distances, + IndexType *_predecessors, + double *_sp_counters, + int *_edge_mask) { distances = _distances; predecessors = _predecessors; edge_mask = _edge_mask; + sp_counters = _sp_counters; useEdgeMask = (edge_mask != NULL); computeDistances = (distances != NULL); @@ -122,6 +130,9 @@ void BFS::configure(IndexType *_distances, IndexType *_predecessors, // We need distances to use bottom up if (directed && !computeDistances) ALLOC_TRY(&distances, n * sizeof(IndexType), nullptr); + + // In case the shortest path counters is required, previous_bmap has to be allocated + if (sp_counters) { ALLOC_TRY(&previous_visited_bmap, sizeof(int) * vertices_bmap_size, nullptr); } } template @@ -153,6 +164,12 @@ void BFS::traverse(IndexType source_vertex) // If needed, setting all predecessors to non-existent (-1) if (computePredecessors) { cudaMemsetAsync(predecessors, -1, n * sizeof(IndexType), stream); } + if (sp_counters) { + cudaMemsetAsync(sp_counters, 0, n * sizeof(double), stream); + double value = 1; + cudaMemcpyAsync(sp_counters + source_vertex, &value, sizeof(double), cudaMemcpyHostToDevice); + } + // // Initial frontier // @@ -162,7 +179,8 @@ void BFS::traverse(IndexType source_vertex) if (distances) { cudaMemsetAsync(&distances[source_vertex], 0, sizeof(IndexType), stream); } // Setting source_vertex as visited - // There may be bit already set on that bmap (isolated vertices) - if the graph is undirected + // There may be bit already set on that bmap (isolated vertices) - if the + // graph is undirected int current_visited_bmap_source_vert = 0; if (!directed) { @@ -238,7 +256,9 @@ void BFS::traverse(IndexType source_vertex) // useDistances : we check if a vertex is a parent using distances in bottom up - distances become // working data undirected g : need parents to be in children's neighbors - bool can_use_bottom_up = !directed && distances; + + // In case the shortest path counters need to be computeed, the bottom_up approach cannot be used + bool can_use_bottom_up = (!sp_counters && !directed && distances); while (nf > 0 && nu > 0) { // Each vertices can appear only once in the frontierer array - we know it will fit @@ -296,6 +316,16 @@ void BFS::traverse(IndexType source_vertex) switch (algo_state) { case TOPDOWN: + // This step is only required if sp_counters is not nullptr + if (sp_counters) { + cudaMemcpyAsync(previous_visited_bmap, + visited_bmap, + vertices_bmap_size * sizeof(int), + cudaMemcpyDeviceToDevice, + stream); + // We need to copy the visited_bmap before doing the traversal + cudaStreamSynchronize(stream); + } traversal::compute_bucket_offsets(exclusive_sum_frontier_vertex_degree, exclusive_sum_frontier_vertex_buckets_offsets, nf, @@ -311,9 +341,11 @@ void BFS::traverse(IndexType source_vertex) d_new_frontier_cnt, exclusive_sum_frontier_vertex_degree, exclusive_sum_frontier_vertex_buckets_offsets, + previous_visited_bmap, visited_bmap, distances, predecessors, + sp_counters, edge_mask, isolated_bmap, directed, @@ -442,15 +474,21 @@ void BFS::clean() // In that case, distances is a working data if (directed && !computeDistances) ALLOC_FREE_TRY(distances, nullptr); + + // In that case, previous_visited_bmap has been allocated + if (sp_counters) { ALLOC_FREE_TRY(previous_visited_bmap, nullptr); } } template class BFS; } // namespace detail +// NOTE: SP counter increase extremely fast on large graph +// It can easily reach 1e40~1e70 on GAP-road.mtx template void bfs(experimental::GraphCSRView const &graph, VT *distances, VT *predecessors, + double *sp_counters, const VT start_vertex, bool directed) { @@ -470,14 +508,21 @@ void bfs(experimental::GraphCSRView const &graph, // FIXME: Use VT and ET in the BFS detail cugraph::detail::BFS bfs( number_of_vertices, number_of_edges, offsets_ptr, indices_ptr, directed, alpha, beta); - bfs.configure(distances, predecessors, nullptr); + bfs.configure(distances, predecessors, sp_counters, nullptr); bfs.traverse(start_vertex); } template void bfs(experimental::GraphCSRView const &graph, int *distances, int *predecessors, + double *sp_counters, const int source_vertex, bool directed); +template void bfs(experimental::GraphCSRView const &graph, + int *distances, + int *predecessors, + double *sp_counters, + const int source_vertex, + bool directed); } // namespace cugraph diff --git a/cpp/src/traversal/bfs.cuh b/cpp/src/traversal/bfs.cuh index 80f84407271..0acc8988d3a 100644 --- a/cpp/src/traversal/bfs.cuh +++ b/cpp/src/traversal/bfs.cuh @@ -36,6 +36,7 @@ class BFS { bool computePredecessors; IndexType *distances; IndexType *predecessors; + double *sp_counters = nullptr; int *edge_mask; // Working data @@ -44,7 +45,7 @@ class BFS { IndexType *frontier, *new_frontier; IndexType *original_frontier; IndexType vertices_bmap_size; - int *visited_bmap, *isolated_bmap; + int *visited_bmap, *isolated_bmap, *previous_visited_bmap; IndexType *vertex_degree; IndexType *buffer_np1_1, *buffer_np1_2; IndexType *frontier_vertex_degree; @@ -92,7 +93,10 @@ class BFS { setup(); } - void configure(IndexType *distances, IndexType *predecessors, int *edge_mask); + void configure(IndexType *distances, + IndexType *predecessors, + double *sp_counters, + int *edge_mask); void traverse(IndexType source_vertex); }; diff --git a/cpp/src/traversal/bfs_kernels.cuh b/cpp/src/traversal/bfs_kernels.cuh index c9e6abc183d..4cb97ae5109 100644 --- a/cpp/src/traversal/bfs_kernels.cuh +++ b/cpp/src/traversal/bfs_kernels.cuh @@ -30,10 +30,9 @@ namespace bfs_kernels { // fill_unvisited_queue_kernel // // Finding unvisited vertices in the visited_bmap, and putting them in the queue -// Vertices represented by the same int in the bitmap are adjacent in the queue, and sorted -// For instance, the queue can look like this : -// 34 38 45 58 61 4 18 24 29 71 84 85 90 -// Because they are represented by those ints in the bitmap : +// Vertices represented by the same int in the bitmap are adjacent in the queue, +// and sorted For instance, the queue can look like this : 34 38 45 58 61 4 18 +// 24 29 71 84 85 90 Because they are represented by those ints in the bitmap : // [34 38 45 58 61] [4 18 24 29] [71 84 85 90] // visited_bmap_nints = the visited_bmap is made of that number of ints @@ -48,27 +47,29 @@ __global__ void fill_unvisited_queue_kernel(int *visited_bmap, typedef cub::BlockScan BlockScan; __shared__ typename BlockScan::TempStorage scan_temp_storage; - // When filling the "unvisited" queue, we use "unvisited_cnt" to know where to write in the queue - // (equivalent of int off = atomicAddd(unvisited_cnt, 1) ) We will actually do only one atomicAdd - // per block - we first do a scan, then call one atomicAdd, and store the common offset for the - // block in unvisited_common_block_offset + // When filling the "unvisited" queue, we use "unvisited_cnt" to know where to + // write in the queue (equivalent of int off = atomicAddd(unvisited_cnt, 1) ) + // We will actually do only one atomicAdd per block - we first do a scan, then + // call one atomicAdd, and store the common offset for the block in + // unvisited_common_block_offset __shared__ IndexType unvisited_common_block_offset; - // We don't want threads divergence in the loop (we're going to call __syncthreads) - // Using a block-only dependent in the condition of the loop + // We don't want threads divergence in the loop (we're going to call + // __syncthreads) Using a block-only dependent in the condition of the loop for (IndexType block_v_idx = blockIdx.x * blockDim.x; block_v_idx < visited_bmap_nints; block_v_idx += blockDim.x * gridDim.x) { // Index of visited_bmap that this thread will compute IndexType v_idx = block_v_idx + threadIdx.x; - int thread_visited_int = - (v_idx < visited_bmap_nints) - ? visited_bmap[v_idx] - : (~0); // will be neutral in the next lines (virtual vertices all visited) + int thread_visited_int = (v_idx < visited_bmap_nints) + ? visited_bmap[v_idx] + : (~0); // will be neutral in the next lines + // (virtual vertices all visited) // The last int can only be partially valid // If we are indeed taking care of the last visited int in this thread, - // We need to first disable (ie set as "visited") the inactive bits (vertices >= n) + // We need to first disable (ie set as "visited") the inactive bits + // (vertices >= n) if (v_idx == (visited_bmap_nints - 1)) { int active_bits = n - (INT_SIZE * v_idx); int inactive_bits = INT_SIZE - active_bits; @@ -80,13 +81,15 @@ __global__ void fill_unvisited_queue_kernel(int *visited_bmap, int n_unvisited_in_int = __popc(~thread_visited_int); int unvisited_thread_offset; - // We will need to write n_unvisited_in_int unvisited vertices to the unvisited queue - // We ask for that space when computing the block scan, that will tell where to write those - // vertices in the queue, using the common offset of the block (see below) + // We will need to write n_unvisited_in_int unvisited vertices to the + // unvisited queue We ask for that space when computing the block scan, that + // will tell where to write those vertices in the queue, using the common + // offset of the block (see below) BlockScan(scan_temp_storage).ExclusiveSum(n_unvisited_in_int, unvisited_thread_offset); - // Last thread knows how many vertices will be written to the queue by this block - // Asking for that space in the queue using the global count, and saving the common offset + // Last thread knows how many vertices will be written to the queue by this + // block Asking for that space in the queue using the global count, and + // saving the common offset if (threadIdx.x == (FILL_UNVISITED_QUEUE_DIMX - 1)) { IndexType total = unvisited_thread_offset + n_unvisited_in_int; unvisited_common_block_offset = atomicAdd(unvisited_cnt, total); @@ -167,11 +170,11 @@ void fill_unvisited_queue(int *visited_bmap, // // count_unvisited_edges_kernel -// Couting the total number of unvisited edges in the graph - using an potentially unvisited queue -// We need the current unvisited vertices to be in the unvisited queue -// But visited vertices can be in the potentially_unvisited queue -// We first check if the vertex is still unvisited before using it -// Useful when switching from "Bottom up" to "Top down" +// Couting the total number of unvisited edges in the graph - using an +// potentially unvisited queue We need the current unvisited vertices to be in +// the unvisited queue But visited vertices can be in the potentially_unvisited +// queue We first check if the vertex is still unvisited before using it Useful +// when switching from "Bottom up" to "Top down" // template @@ -233,9 +236,9 @@ void count_unvisited_edges(const IndexType *potentially_unvisited, // // -// We will use the "vertices represented by the same int in the visited bmap are adjacents and -// sorted in the unvisited queue" property It is used to do a reduction locally and fully build the -// new visited_bmap +// We will use the "vertices represented by the same int in the visited bmap are +// adjacents and sorted in the unvisited queue" property It is used to do a +// reduction locally and fully build the new visited_bmap // template @@ -266,15 +269,16 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, // frontier_common_block_offset contains the common offset for the block __shared__ IndexType frontier_common_block_offset; - // When building the new visited_bmap, we reduce (using a bitwise and) the visited_bmap ints - // from the vertices represented by the same int (for instance vertices 1, 5, 9, 13, 23) - // vertices represented by the same int will be designed as part of the same "group" - // To detect the deliminations between those groups, we use BlockDiscontinuity - // Then we need to create the new "visited_bmap" within those group. - // We use a warp reduction that takes into account limits between groups to do it - // But a group can be cut in two different warps : in that case, the second warp - // put the result of its local reduction in local_visited_bmap_warp_head - // the first warp will then read it and finish the reduction + // When building the new visited_bmap, we reduce (using a bitwise and) the + // visited_bmap ints from the vertices represented by the same int (for + // instance vertices 1, 5, 9, 13, 23) vertices represented by the same int + // will be designed as part of the same "group" To detect the deliminations + // between those groups, we use BlockDiscontinuity Then we need to create the + // new "visited_bmap" within those group. We use a warp reduction that takes + // into account limits between groups to do it But a group can be cut in two + // different warps : in that case, the second warp put the result of its local + // reduction in local_visited_bmap_warp_head the first warp will then read it + // and finish the reduction __shared__ int local_visited_bmap_warp_head[MAIN_BOTTOMUP_NWARPS]; @@ -291,9 +295,10 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, // in the visited_bmap, it is represented by the int at index // visited_bmap_index = unvisited_vertex/INT_SIZE // it will be used by BlockDiscontinuity - // to flag the separation between groups of vertices (vertices represented by different in in - // visited_bmap) - IndexType visited_bmap_index[1]; // this is an array of size 1 because CUB needs one + // to flag the separation between groups of vertices (vertices represented + // by different in in visited_bmap) + IndexType visited_bmap_index[1]; // this is an array of size 1 because CUB + // needs one visited_bmap_index[0] = -1; IndexType unvisited_vertex = -1; @@ -355,14 +360,14 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, // // We will separate vertices in group - // Two vertices are in the same group if represented by same int in visited_bmap - // ie u and v in same group <=> u/32 == v/32 + // Two vertices are in the same group if represented by same int in + // visited_bmap ie u and v in same group <=> u/32 == v/32 // // We will now flag the head of those group (first element of each group) // - // 1) All vertices within the same group are adjacent in the queue (cf fill_unvisited_queue) - // 2) A group is of size <= 32, so a warp will contain at least one head, and a group will be - // contained at most by two warps + // 1) All vertices within the same group are adjacent in the queue (cf + // fill_unvisited_queue) 2) A group is of size <= 32, so a warp will contain + // at least one head, and a group will be contained at most by two warps int is_head_a[1]; // CUB need an array BlockDiscontinuity(discontinuity_temp_storage) @@ -370,17 +375,19 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, int is_head = is_head_a[0]; // Computing the warp reduce within group - // This primitive uses the is_head flags to know where the limits of the groups are - // We use bitwise and as operator, because of the fact that 1 is the default value - // If a vertex is unvisited, we have to explicitly ask for it + // This primitive uses the is_head flags to know where the limits of the + // groups are We use bitwise and as operator, because of the fact that 1 is + // the default value If a vertex is unvisited, we have to explicitly ask for + // it int local_bmap_agg = WarpReduce(reduce_temp_storage) .HeadSegmentedReduce(local_visited_bmap, is_head, traversal::BitwiseAnd()); // We need to take care of the groups cut in two in two different warps - // Saving second part of the reduce here, then applying it on the first part bellow - // Corner case : if the first thread of the warp is a head, then this group is not cut in two - // and then we have to be neutral (for an bitwise and, it's an ~0) + // Saving second part of the reduce here, then applying it on the first part + // bellow Corner case : if the first thread of the warp is a head, then this + // group is not cut in two and then we have to be neutral (for an bitwise + // and, it's an ~0) if (laneid == 0) { local_visited_bmap_warp_head[warpid] = (is_head) ? (~0) : local_bmap_agg; } // broadcasting local_visited_bmap_warp_head @@ -388,35 +395,39 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, int head_ballot = cugraph::detail::utils::ballot(is_head); - // As long as idx < unvisited_size, we know there's at least one head per warp + // As long as idx < unvisited_size, we know there's at least one head per + // warp int laneid_last_head_in_warp = INT_SIZE - 1 - __clz(head_ballot); int is_last_head_in_warp = (laneid == laneid_last_head_in_warp); // if laneid == 0 && is_last_head_in_warp, it's a special case where // a group of size 32 starts exactly at lane 0 - // in that case, nothing to do (this group is not cut by a warp delimitation) - // we also have to make sure that a warp actually exists after this one (this corner case is - // handled after) + // in that case, nothing to do (this group is not cut by a warp + // delimitation) we also have to make sure that a warp actually exists after + // this one (this corner case is handled after) if (laneid != 0 && (is_last_head_in_warp & (warpid + 1) < MAIN_BOTTOMUP_NWARPS)) { local_bmap_agg &= local_visited_bmap_warp_head[warpid + 1]; } // Three cases : - // -> This is the first group of the block - it may be cut in two (with previous block) + // -> This is the first group of the block - it may be cut in two (with + // previous block) // -> This is the last group of the block - same thing // -> This group is completely contained in this block if (warpid == 0 && laneid == 0) { - // The first elt of this group considered in this block is unvisited_vertex - // We know that's the case because elts are sorted in a group, and we are at laneid == 0 - // We will do an atomicOr - we have to be neutral about elts < unvisited_vertex + // The first elt of this group considered in this block is + // unvisited_vertex We know that's the case because elts are sorted in a + // group, and we are at laneid == 0 We will do an atomicOr - we have to be + // neutral about elts < unvisited_vertex int iv = unvisited_vertex % INT_SIZE; // we know that this unvisited_vertex is valid int mask = traversal::getMaskNLeftmostBitSet(INT_SIZE - iv); local_bmap_agg &= mask; // we have to be neutral for elts < unvisited_vertex atomicOr(&visited_bmap[unvisited_vertex / INT_SIZE], local_bmap_agg); } else if (warpid == (MAIN_BOTTOMUP_NWARPS - 1) && - laneid >= laneid_last_head_in_warp && // We need the other ones to go in else case + laneid >= laneid_last_head_in_warp && // We need the other ones + // to go in else case idx < unvisited_size // we could be out ) { // Last head of the block @@ -440,8 +451,8 @@ __global__ void main_bottomup_kernel(const IndexType *unvisited, } else { // group completely in block if (is_head && idx < unvisited_size) { - visited_bmap[unvisited_vertex / INT_SIZE] = - local_bmap_agg; // no atomics needed, we know everything about this int + visited_bmap[unvisited_vertex / INT_SIZE] = local_bmap_agg; // no atomics needed, we know + // everything about this int } } @@ -533,9 +544,9 @@ __global__ void bottom_up_large_degree_kernel(IndexType *left_unvisited, // Used only with symmetric graphs // Parents are included in v's neighbors - IndexType first_i_edge = - row_ptr[v] + MAIN_BOTTOMUP_MAX_EDGES; // we already have checked the first - // MAIN_BOTTOMUP_MAX_EDGES edges in find_unvisited + IndexType first_i_edge = row_ptr[v] + MAIN_BOTTOMUP_MAX_EDGES; // we already have checked the + // first MAIN_BOTTOMUP_MAX_EDGES + // edges in find_unvisited IndexType end_i_edge = row_ptr[v + 1]; @@ -619,29 +630,32 @@ void bottom_up_large(IndexType *left_unvisited, // Read current frontier and compute new one with top down paradigm // One thread = One edge // To know origin of edge, we have to find where is index_edge in the values of -// frontier_degrees_exclusive_sum (using a binary search, max less or equal than) This index k will -// give us the origin of this edge, which is frontier[k] This thread will then process the -// (linear_idx_thread - frontier_degrees_exclusive_sum[k])-ith edge of vertex frontier[k] +// frontier_degrees_exclusive_sum (using a binary search, max less or equal +// than) This index k will give us the origin of this edge, which is frontier[k] +// This thread will then process the (linear_idx_thread - +// frontier_degrees_exclusive_sum[k])-ith edge of vertex frontier[k] // -// To process blockDim.x = TOP_DOWN_EXPAND_DIMX edges, we need to first load NBUCKETS_PER_BLOCK -// bucket offsets - those will help us do the binary searches We can load up to TOP_DOWN_EXPAND_DIMX -// of those bucket offsets - that way we prepare for the next MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD +// To process blockDim.x = TOP_DOWN_EXPAND_DIMX edges, we need to first load +// NBUCKETS_PER_BLOCK bucket offsets - those will help us do the binary searches +// We can load up to TOP_DOWN_EXPAND_DIMX of those bucket offsets - that way we +// prepare for the next MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD // * blockDim.x edges // -// Once we have those offsets, we may still need a few values from frontier_degrees_exclusive_sum to -// compute exact index k To be able to do it, we will load the values that we need from -// frontier_degrees_exclusive_sum in shared memory We know that it will fit because we never add -// node with degree == 0 in the frontier, so we have an upper bound on the number of value to load -// (see below) +// Once we have those offsets, we may still need a few values from +// frontier_degrees_exclusive_sum to compute exact index k To be able to do it, +// we will load the values that we need from frontier_degrees_exclusive_sum in +// shared memory We know that it will fit because we never add node with degree +// == 0 in the frontier, so we have an upper bound on the number of value to +// load (see below) // // We will then look which vertices are not visited yet : -// 1) if the unvisited vertex is isolated (=> degree == 0), we mark it as visited, update distances -// and predecessors, and move on 2) if the unvisited vertex has degree > 0, we add it to the -// "frontier_candidates" queue +// 1) if the unvisited vertex is isolated (=> degree == 0), we mark it as +// visited, update distances and predecessors, and move on 2) if the unvisited +// vertex has degree > 0, we add it to the "frontier_candidates" queue // // We then treat the candidates queue using the threadIdx.x < ncandidates -// If we are indeed the first thread to discover that vertex (result of atomicOr(visited)) -// We add it to the new frontier +// If we are indeed the first thread to discover that vertex (result of +// atomicOr(visited)) We add it to the new frontier // template @@ -657,9 +671,11 @@ __global__ void topdown_expand_kernel( IndexType *new_frontier_cnt, const IndexType *frontier_degrees_exclusive_sum, const IndexType *frontier_degrees_exclusive_sum_buckets_offsets, + int *previous_bmap, int *bmap, IndexType *distances, IndexType *predecessors, + double *sp_counters, const int *edge_mask, const int *isolated_bmap, bool directed) @@ -677,8 +693,9 @@ __global__ void topdown_expand_kernel( // // Frontier candidates local queue - // We process TOP_DOWN_BATCH_SIZE vertices in parallel, so we need to be able to store everything - // We also save the predecessors here, because we will not be able to retrieve it after + // We process TOP_DOWN_BATCH_SIZE vertices in parallel, so we need to be able + // to store everything We also save the predecessors here, because we will not + // be able to retrieve it after // __shared__ IndexType shared_local_new_frontier_candidates[TOP_DOWN_BATCH_SIZE * TOP_DOWN_EXPAND_DIMX]; @@ -713,23 +730,28 @@ __global__ void topdown_expand_kernel( // // shared_buckets_offsets gives us a range of the possible indexes // for edge of linear_threadx, we are looking for the value k such as - // k is the max value such as frontier_degrees_exclusive_sum[k] <= linear_threadx + // k is the max value such as frontier_degrees_exclusive_sum[k] <= + // linear_threadx // // we have 0 <= k < frontier_size // but we also have : // // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE] // <= k - // <= frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE + 1] + // <= + // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE + // + 1] // // To find the exact value in that range, we need a few values from - // frontier_degrees_exclusive_sum (see below) We will load them here We will load as much as we - // can - if it doesn't fit we will make multiple iteration of the next loop Because all vertices - // in frontier have degree > 0, we know it will fits if left + 1 = right (see below) + // frontier_degrees_exclusive_sum (see below) We will load them here We will + // load as much as we can - if it doesn't fit we will make multiple + // iteration of the next loop Because all vertices in frontier have degree > + // 0, we know it will fits if left + 1 = right (see below) - // We're going to load values in frontier_degrees_exclusive_sum for batch [left; right[ - // If it doesn't fit, --right until it does, then loop - // It is excepted to fit on the first try, that's why we start right = nitems_per_thread + // We're going to load values in frontier_degrees_exclusive_sum for batch + // [left; right[ If it doesn't fit, --right until it does, then loop It is + // excepted to fit on the first try, that's why we start right = + // nitems_per_thread IndexType left = 0; IndexType right = nitems_per_thread; @@ -737,14 +759,16 @@ __global__ void topdown_expand_kernel( while (left < nitems_per_thread) { // // Values that are necessary to compute the local binary searches - // We only need those with indexes between extremes indexes of buckets_offsets - // We need the next val for the binary search, hence the +1 + // We only need those with indexes between extremes indexes of + // buckets_offsets We need the next val for the binary search, hence the + // +1 // IndexType nvalues_to_load = shared_buckets_offsets[right * NBUCKETS_PER_BLOCK] - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK] + 1; - // If left = right + 1 we are sure to have nvalues_to_load < TOP_DOWN_EXPAND_DIMX+1 + // If left = right + 1 we are sure to have nvalues_to_load < + // TOP_DOWN_EXPAND_DIMX+1 while (nvalues_to_load > (TOP_DOWN_EXPAND_DIMX + 1)) { --right; @@ -768,22 +792,23 @@ __global__ void topdown_expand_kernel( TOP_DOWN_EXPAND_DIMX]; } - // shared_frontier_degrees_exclusive_sum is in shared mem, we will use it, sync + // shared_frontier_degrees_exclusive_sum is in shared mem, we will use it, + // sync __syncthreads(); // Now we will process the edges // Here each thread will process nitems_per_thread_for_this_load for (IndexType item_index = 0; item_index < nitems_per_thread_for_this_load; item_index += TOP_DOWN_BATCH_SIZE) { - // We process TOP_DOWN_BATCH_SIZE edge in parallel (instruction parallism) - // Reduces latency + // We process TOP_DOWN_BATCH_SIZE edge in parallel (instruction + // parallism) Reduces latency IndexType current_max_edge_index = min(block_offset + (left + nitems_per_thread_for_this_load) * blockDim.x, totaldegree); - // We will need vec_u (source of the edge) until the end if we need to save the predecessors - // For others informations, we will reuse pointers on the go (nvcc does not color well the - // registers in that case) + // We will need vec_u (source of the edge) until the end if we need to + // save the predecessors For others informations, we will reuse pointers + // on the go (nvcc does not color well the registers in that case) IndexType vec_u[TOP_DOWN_BATCH_SIZE]; IndexType local_buf1[TOP_DOWN_BATCH_SIZE]; @@ -841,15 +866,20 @@ __global__ void topdown_expand_kernel( // We don't need vec_frontier_degrees_exclusive_sum_index anymore IndexType *vec_v_visited_bmap = vec_frontier_degrees_exclusive_sum_index; + + // Visited bmap need to contain information about the previous + // frontier if we actually process every edge (shortest path counting) + // otherwise we can read and update from the same bmap #pragma unroll for (int iv = 0; iv < TOP_DOWN_BATCH_SIZE; ++iv) { - IndexType v = vec_dest_v[iv]; - vec_v_visited_bmap[iv] = (v != -1) ? bmap[v / INT_SIZE] : (~0); // will look visited + IndexType v = vec_dest_v[iv]; + vec_v_visited_bmap[iv] = + (v != -1) ? previous_bmap[v / INT_SIZE] : (~0); // will look visited } // From now on we will consider v as a frontier candidate - // If for some reason vec_candidate[iv] should be put in the new_frontier - // Then set vec_candidate[iv] = -1 + // If for some reason vec_candidate[iv] should be put in the + // new_frontier Then set vec_candidate[iv] = -1 IndexType *vec_frontier_candidate = vec_dest_v; #pragma unroll @@ -862,9 +892,21 @@ __global__ void topdown_expand_kernel( if (is_visited) vec_frontier_candidate[iv] = -1; } + // Each source should update the destination shortest path counter + // if the destination has not been visited in the *previous* frontier + if (sp_counters) { +#pragma unroll + for (int iv = 0; iv < TOP_DOWN_BATCH_SIZE; ++iv) { + IndexType dst = vec_frontier_candidate[iv]; + if (dst != -1) { + IndexType src = vec_u[iv]; + atomicAdd(&sp_counters[dst], sp_counters[src]); + } + } + } + if (directed) { // vec_v_visited_bmap is available - IndexType *vec_is_isolated_bmap = vec_v_visited_bmap; #pragma unroll @@ -879,12 +921,12 @@ __global__ void topdown_expand_kernel( int m = 1 << (v % INT_SIZE); int is_isolated = vec_is_isolated_bmap[iv] & m; - // If v is isolated, we will not add it to the frontier (it's not a frontier candidate) - // 1st reason : it's useless - // 2nd reason : it will make top down algo fail - // we need each node in frontier to have a degree > 0 - // If it is isolated, we just need to mark it as visited, and save distance and - // predecessor here. Not need to check return value of atomicOr + // If v is isolated, we will not add it to the frontier (it's not a + // frontier candidate) 1st reason : it's useless 2nd reason : it + // will make top down algo fail we need each node in frontier to + // have a degree > 0 If it is isolated, we just need to mark it as + // visited, and save distance and predecessor here. Not need to + // check return value of atomicOr if (is_isolated && v != -1) { int m = 1 << (v % INT_SIZE); @@ -908,7 +950,8 @@ __global__ void topdown_expand_kernel( if (v != -1) ++thread_n_frontier_candidates; } - // We need to have all nfrontier_candidates to be ready before doing the scan + // We need to have all nfrontier_candidates to be ready before doing the + // scan __syncthreads(); // We will put the frontier candidates in a local queue @@ -950,9 +993,10 @@ __global__ void topdown_expand_kernel( vec_frontier_accepted_vertex[iv] = -1; if (idx_shared < block_n_frontier_candidates) { - IndexType v = shared_local_new_frontier_candidates[idx_shared]; // popping queue - int m = 1 << (v % INT_SIZE); - int q = atomicOr(&bmap[v / INT_SIZE], m); // atomicOr returns old + IndexType v = shared_local_new_frontier_candidates[idx_shared]; // popping + // queue + int m = 1 << (v % INT_SIZE); + int q = atomicOr(&bmap[v / INT_SIZE], m); // atomicOr returns old if (!(m & q)) { // if this thread was the first to discover this node if (distances) distances[v] = lvl; @@ -977,7 +1021,8 @@ __global__ void topdown_expand_kernel( if (threadIdx.x == (TOP_DOWN_EXPAND_DIMX - 1)) { IndexType inclusive_sum = thread_new_frontier_offset + naccepted_vertices; - // for this thread, thread_new_frontier_offset + has_successor (exclusive sum) + // for this thread, thread_new_frontier_offset + has_successor + // (exclusive sum) if (inclusive_sum) frontier_common_block_offset = atomicAdd(new_frontier_cnt, inclusive_sum); } @@ -1023,9 +1068,11 @@ void frontier_expand(const IndexType *row_ptr, IndexType *new_frontier_cnt, const IndexType *frontier_degrees_exclusive_sum, const IndexType *frontier_degrees_exclusive_sum_buckets_offsets, + int *previous_visited_bmap, int *visited_bmap, IndexType *distances, IndexType *predecessors, + double *sp_counters, const int *edge_mask, const int *isolated_bmap, bool directed, @@ -1043,7 +1090,13 @@ void frontier_expand(const IndexType *row_ptr, grid.x = min((totaldegree + max_items_per_thread * block.x - 1) / (max_items_per_thread * block.x), (IndexType)MAXBLOCKS); + // Shortest Path counting (Betweenness Centrality) + // We need to keep track of the previously visited bmap + // If the coutner of shortest path is nullptr + // The previous_visited_bmap is no longer needed (and should be nullptr on + // the first access), so it can be the same as the current visitedbmap + if (!sp_counters) { previous_visited_bmap = visited_bmap; } topdown_expand_kernel<<>>( row_ptr, col_ind, @@ -1056,9 +1109,11 @@ void frontier_expand(const IndexType *row_ptr, new_frontier_cnt, frontier_degrees_exclusive_sum, frontier_degrees_exclusive_sum_buckets_offsets, + previous_visited_bmap, visited_bmap, distances, predecessors, + sp_counters, edge_mask, isolated_bmap, directed); @@ -1139,7 +1194,8 @@ __global__ void flag_isolated_vertices_kernel(IndexType n, IndexType local_nisolated = __popc(local_isolated_bmap); - // We need local_nisolated and local_isolated_bmap to be ready for next steps + // We need local_nisolated and local_isolated_bmap to be ready for next + // steps __syncthreads(); IndexType total_nisolated = BlockReduce(block_reduce_temp_storage).Sum(local_nisolated); diff --git a/cpp/src/traversal/sssp.cu b/cpp/src/traversal/sssp.cu index 016e9f04629..8e1d0b28238 100644 --- a/cpp/src/traversal/sssp.cu +++ b/cpp/src/traversal/sssp.cu @@ -282,7 +282,7 @@ void sssp(experimental::GraphCSRView const &graph, if (!graph.edge_data) { // Generate unit weights - // TODO: This should fallback to BFS, but for now it'll go through the + // FIXME: This should fallback to BFS, but for now it'll go through the // SSSP path since BFS needs the directed flag, which should not be // necessary for the SSSP API. We can pass directed to the BFS call, but // BFS also does only integer distances right now whereas we need float or diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 6db875e5813..b984c47c97b 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -133,6 +133,7 @@ set(KATZ_TEST_SRC # - betweenness centrality tests ------------------------------------------------------------------------- set(BETWEENNESS_TEST_SRC + "${CMAKE_SOURCE_DIR}/../thirdparty/mmio/mmio.c" "${CMAKE_CURRENT_SOURCE_DIR}/centrality/betweenness_centrality_test.cu") ConfigureTest(BETWEENNESS_TEST "${BETWEENNESS_TEST_SRC}" "") diff --git a/cpp/tests/centrality/betweenness_centrality_test.cu b/cpp/tests/centrality/betweenness_centrality_test.cu index 55132ad6c12..b9870daf146 100644 --- a/cpp/tests/centrality/betweenness_centrality_test.cu +++ b/cpp/tests/centrality/betweenness_centrality_test.cu @@ -18,36 +18,581 @@ #include "gtest/gtest.h" #include +#include +#include "test_utils.h" #include #include -struct BetweennessCentralityTest : public ::testing::Test { +#include // C++ Reference Algorithm +#include // C++ Reference Algorithm + +#include // Loads GraphCSR from .mtx +#include + +#ifndef TEST_EPSILON +#define TEST_EPSILON 0.0001 +#endif + +// NOTE: Defines under which values the difference should be discarded when +// considering values are close to zero +// i.e: Do we consider that the difference between 1.3e-9 and 8.e-12 is +// significant +#ifndef TEST_ZERO_THRESHOLD +#define TEST_ZERO_THRESHOLD 1e-10 +#endif + +// ============================================================================= +// C++ Reference Implementation +// ============================================================================= +template +void populate_neighbors(VT *indices, ET *offsets, VT w, std::vector &neighbors) +{ + ET edge_start = offsets[w]; + ET edge_end = offsets[w + 1]; + ET edge_count = edge_end - edge_start; + + neighbors.clear(); // Reset neighbors vector's size + for (ET edge_idx = 0; edge_idx < edge_count; ++edge_idx) { + VT dst = indices[edge_start + edge_idx]; + neighbors.push_back(dst); + } +} + +// FIXME: This should be moved to BFS testing on the c++ side (#778) +// This implements the BFS from (Brandes, 2001) with shortest path counting +template +void ref_bfs(VT *indices, + ET *offsets, + VT const number_of_vertices, + std::queue &Q, + std::stack &S, + std::vector &dist, + std::vector> &pred, + std::vector &sigmas, + VT source) +{ + std::vector neighbors; + for (VT w = 0; w < number_of_vertices; ++w) { + pred[w].clear(); + dist[w] = std::numeric_limits::max(); + sigmas[w] = 0; + } + dist[source] = 0; + sigmas[source] = 1; + Q.push(source); + // b. Traversal + while (!Q.empty()) { + VT v = Q.front(); + Q.pop(); + S.push(v); + populate_neighbors(indices, offsets, v, neighbors); + for (VT w : neighbors) { + // Path Discovery: + // Found for the first time? + if (dist[w] == std::numeric_limits::max()) { + dist[w] = dist[v] + 1; + Q.push(w); + } + // Path counting + // Edge(v, w) on a shortest path? + if (dist[w] == dist[v] + 1) { + sigmas[w] += sigmas[v]; + pred[w].push_back(v); + } + } + } +} + +template +void ref_accumulation(result_t *result, + VT const number_of_vertices, + std::stack &S, + std::vector> &pred, + std::vector &sigmas, + std::vector &deltas, + VT source) +{ + for (VT v = 0; v < number_of_vertices; ++v) { deltas[v] = 0; } + while (!S.empty()) { + VT w = S.top(); + S.pop(); + for (VT v : pred[w]) { deltas[v] += (sigmas[v] / sigmas[w]) * (1.0 + deltas[w]); } + if (w != source) { result[w] += deltas[w]; } + } +} + +// Algorithm 1: Shortest-path vertex betweenness, (Brandes, 2001) +template +void reference_betweenness_centrality_impl(VT *indices, + ET *offsets, + VT const number_of_vertices, + result_t *result, + VT const *sources, + VT const number_of_sources) +{ + std::queue Q; + std::stack S; + // NOTE: dist is of type VT not WT + std::vector dist(number_of_vertices); + std::vector> pred(number_of_vertices); + std::vector sigmas(number_of_vertices); + std::vector deltas(number_of_vertices); + + std::vector neighbors; + + if (sources) { + for (VT source_idx = 0; source_idx < number_of_sources; ++source_idx) { + VT s = sources[source_idx]; + // Step 1: Single-source shortest-paths problem + // a. Initialization + ref_bfs( + indices, offsets, number_of_vertices, Q, S, dist, pred, sigmas, s); + // Step 2: Accumulation + // Back propagation of dependencies + ref_accumulation( + result, number_of_vertices, S, pred, sigmas, deltas, s); + } + } else { + for (VT s = 0; s < number_of_vertices; ++s) { + // Step 1: Single-source shortest-paths problem + // a. Initialization + ref_bfs( + indices, offsets, number_of_vertices, Q, S, dist, pred, sigmas, s); + // Step 2: Accumulation + // Back propagation of dependencies + ref_accumulation( + result, number_of_vertices, S, pred, sigmas, deltas, s); + } + } +} + +template +void reference_rescale(result_t *result, + bool normalize, + bool directed, + VT const number_of_vertices, + VT const number_of_sources) +{ + bool modified = false; + result_t rescale_factor = static_cast(1); + result_t casted_number_of_sources = static_cast(number_of_sources); + result_t casted_number_of_vertices = static_cast(number_of_vertices); + if (normalize) { + if (number_of_vertices > 2) { + rescale_factor /= ((casted_number_of_vertices - 1) * (casted_number_of_vertices - 2)); + modified = true; + } + } else { + if (!directed) { + rescale_factor /= static_cast(2); + modified = true; + } + } + if (modified) { + if (number_of_sources > 0) { + rescale_factor *= (casted_number_of_vertices / casted_number_of_sources); + } + } + for (auto idx = 0; idx < number_of_vertices; ++idx) { result[idx] *= rescale_factor; } +} + +template +void reference_betweenness_centrality(cugraph::experimental::GraphCSRView const &graph, + result_t *result, + bool normalize, + bool endpoints, // This is not yet implemented + VT const number_of_sources, + VT const *sources) +{ + VT number_of_vertices = graph.number_of_vertices; + ET number_of_edges = graph.number_of_edges; + thrust::host_vector h_indices(number_of_edges); + thrust::host_vector h_offsets(number_of_vertices + 1); + + thrust::device_ptr d_indices((VT *)&graph.indices[0]); + thrust::device_ptr d_offsets((ET *)&graph.offsets[0]); + + thrust::copy(d_indices, d_indices + number_of_edges, h_indices.begin()); + thrust::copy(d_offsets, d_offsets + (number_of_vertices + 1), h_offsets.begin()); + + cudaDeviceSynchronize(); + + reference_betweenness_centrality_impl( + &h_indices[0], &h_offsets[0], number_of_vertices, result, sources, number_of_sources); + reference_rescale( + result, normalize, graph.prop.directed, number_of_vertices, number_of_sources); +} +// Explicit declaration +template void reference_betweenness_centrality( + cugraph::experimental::GraphCSRView const &, + float *, + bool, + bool, + const int, + int const *); +template void reference_betweenness_centrality( + cugraph::experimental::GraphCSRView const &, + double *, + bool, + bool, + const int, + int const *); + +// ============================================================================= +// Utility functions +// ============================================================================= +// FIXME: This could be useful in other testsuite (SSSP, BFS, ...) +template +void generate_graph_csr(CSR_Result_Weighted &csr_result, + VT &m, + VT &nnz, + bool &is_directed, + std::string matrix_file) +{ + FILE *fpin = fopen(matrix_file.c_str(), "r"); + ASSERT_NE(fpin, nullptr) << "fopen (" << matrix_file << ") failure."; + + VT k; + MM_typecode mc; + ASSERT_EQ(mm_properties(fpin, 1, &mc, &m, &k, &nnz), 0) + << "could not read Matrix Market file properties" + << "\n"; + ASSERT_TRUE(mm_is_matrix(mc)); + ASSERT_TRUE(mm_is_coordinate(mc)); + ASSERT_FALSE(mm_is_complex(mc)); + ASSERT_FALSE(mm_is_skew(mc)); + is_directed = !mm_is_symmetric(mc); + + // Allocate memory on host + std::vector cooRowInd(nnz), cooColInd(nnz); + std::vector cooVal(nnz); + + // Read + ASSERT_EQ((mm_to_coo(fpin, 1, nnz, &cooRowInd[0], &cooColInd[0], &cooVal[0], NULL)), 0) + << "could not read matrix data" + << "\n"; + ASSERT_EQ(fclose(fpin), 0); + + ConvertCOOtoCSR_weighted(&cooRowInd[0], &cooColInd[0], &cooVal[0], nnz, csr_result); + CUDA_CHECK_LAST(); +} + +// Compare while allowing relatie error of epsilon +// zero_threshold indicates when we should drop comparison for small numbers +template +bool compare_close(const T &a, const T &b, const precision_t epsilon, precision_t zero_threshold) +{ + return ((zero_threshold > a && zero_threshold > b)) || + (a >= b * (1.0 - epsilon)) && (a <= b * (1.0 + epsilon)); +} + +// ============================================================================= +// Test Suite +// ============================================================================= +// Defines Betweenness Centrality UseCase +// SSSP's test suite code uses type of Graph parameter that could be used +// (MTX / RMAT) +// FIXME: Use VT for number_of_sources? +typedef struct BC_Usecase_t { + std::string config_; // Path to graph file + std::string file_path_; // Complete path to graph using dataset_root_dir + int number_of_sources_; // Starting point from the traversal + BC_Usecase_t(const std::string &config, int number_of_sources) + : config_(config), number_of_sources_(number_of_sources) + { + // assume relative paths are relative to RAPIDS_DATASET_ROOT_DIR + // FIXME: Use platform independent stuff from c++14/17 on compiler update + const std::string &rapidsDatasetRootDir = get_rapids_dataset_root_dir(); + if ((config_ != "") && (config_[0] != '/')) { + file_path_ = rapidsDatasetRootDir + "/" + config_; + } else { + file_path_ = config_; + } + }; +} BC_Usecase; + +class Tests_BC : public ::testing::TestWithParam { + public: + Tests_BC() {} + static void SetupTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + // FIXME: Should normalize be part of the configuration instead? + // VT vertex identifier data type + // ET edge identifier data type + // WT edge weight data type + // result_t result data type + // normalize should the result be normalized + // endpoints should the endpoints be included (Not Implemented Yet) + template + void run_current_test(const BC_Usecase &configuration) + { + // Step 1: Construction of the graph based on configuration + VT m; + ET nnz; + CSR_Result_Weighted csr_result; + bool is_directed = false; + generate_graph_csr(csr_result, m, nnz, is_directed, configuration.file_path_); + cudaDeviceSynchronize(); + cugraph::experimental::GraphCSRView G( + csr_result.rowOffsets, csr_result.colIndices, csr_result.edgeWeights, m, nnz); + G.prop.directed = is_directed; + CUDA_CHECK_LAST(); + std::vector result(G.number_of_vertices, 0); + std::vector expected(G.number_of_vertices, 0); + + // Step 2: Generation of sources based on configuration + // if number_of_sources_ is 0 then sources must be nullptr + // Otherwise we only use the first k values + ASSERT_TRUE(configuration.number_of_sources_ >= 0 && + configuration.number_of_sources_ <= G.number_of_vertices) + << "Number number of sources should be >= 0 and" + << " less than the number of vertices in the graph"; + std::vector sources(configuration.number_of_sources_); + thrust::sequence(thrust::host, sources.begin(), sources.end(), 0); + + VT *sources_ptr = nullptr; + if (configuration.number_of_sources_ > 0) { sources_ptr = sources.data(); } + + reference_betweenness_centrality(G, + expected.data(), + normalize, + endpoints, + // FIXME: weights + configuration.number_of_sources_, + sources_ptr); + + sources_ptr = nullptr; + if (configuration.number_of_sources_ > 0) { sources_ptr = sources.data(); } + + thrust::device_vector d_result(G.number_of_vertices); + // FIXME: Remove this once endpoints in handled + if (endpoints) { + ASSERT_THROW(cugraph::betweenness_centrality(G, + d_result.data().get(), + normalize, + endpoints, + static_cast(nullptr), + configuration.number_of_sources_, + sources_ptr, + cugraph::cugraph_bc_implem_t::CUGRAPH_DEFAULT), + cugraph::logic_error); + return; + } else { + cugraph::betweenness_centrality(G, + d_result.data().get(), + normalize, + endpoints, + static_cast(nullptr), + configuration.number_of_sources_, + sources_ptr, + cugraph::cugraph_bc_implem_t::CUGRAPH_DEFAULT); + } + cudaDeviceSynchronize(); + CUDA_TRY(cudaMemcpy(result.data(), + d_result.data().get(), + sizeof(result_t) * G.number_of_vertices, + cudaMemcpyDeviceToHost)); + cudaDeviceSynchronize(); + for (int i = 0; i < G.number_of_vertices; ++i) + EXPECT_TRUE(compare_close(result[i], expected[i], TEST_EPSILON, TEST_ZERO_THRESHOLD)) + << "[MISMATCH] vaid = " << i << ", cugraph = " << result[i] + << " expected = " << expected[i]; + } }; -TEST_F(BetweennessCentralityTest, SimpleGraph) +// BFS: Checking for shortest_path counting correctness +// ----------------------------------------------------------------------------- +// FIXME: This BFS testing is kept here as it only focus on the shortest path +// counting problem that is a core component of Betweennees Centrality, +// This should be moved to a separate file in for #778 dedicated to BFS, +// results verification. +typedef struct BFS_Usecase_t { + std::string config_; // Path to graph file + std::string file_path_; // Complete path to graph using dataset_root_dir + int source_; // Starting point from the traversal + BFS_Usecase_t(const std::string &config, int source) : config_(config), source_(source) + { + const std::string &rapidsDatasetRootDir = get_rapids_dataset_root_dir(); + if ((config_ != "") && (config_[0] != '/')) { + file_path_ = rapidsDatasetRootDir + "/" + config_; + } else { + file_path_ = config_; + } + }; +} BFS_Usecase; + +class Tests_BFS : public ::testing::TestWithParam { + public: + Tests_BFS() {} + static void SetupTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + template + void run_current_test(const BFS_Usecase &configuration) + { + // Step 1: Construction of the graph based on configuration + VT m; + ET nnz; + CSR_Result_Weighted csr_result; + bool is_directed = false; + generate_graph_csr(csr_result, m, nnz, is_directed, configuration.file_path_); + cudaDeviceSynchronize(); + cugraph::experimental::GraphCSRView G( + csr_result.rowOffsets, csr_result.colIndices, csr_result.edgeWeights, m, nnz); + G.prop.directed = is_directed; + + CUDA_CHECK_LAST(); + std::vector result(G.number_of_vertices, 0); + std::vector expected(G.number_of_vertices, 0); + + ASSERT_TRUE(configuration.source_ >= 0 && configuration.source_ <= G.number_of_vertices) + << "Starting sources should be >= 0 and" + << " less than the number of vertices in the graph"; + + VT source = configuration.source_; + + VT number_of_vertices = G.number_of_vertices; + ET number_of_edges = G.number_of_edges; + std::vector indices(number_of_edges); + std::vector offsets(number_of_vertices + 1); + + CUDA_TRY( + cudaMemcpy(indices.data(), G.indices, sizeof(VT) * indices.size(), cudaMemcpyDeviceToHost)); + CUDA_TRY( + cudaMemcpy(offsets.data(), G.offsets, sizeof(ET) * offsets.size(), cudaMemcpyDeviceToHost)); + cudaDeviceSynchronize(); + std::queue Q; + std::stack S; + std::vector ref_bfs_dist(number_of_vertices); + std::vector> ref_bfs_pred(number_of_vertices); + std::vector ref_bfs_sigmas(number_of_vertices); + + ref_bfs(indices.data(), + offsets.data(), + number_of_vertices, + Q, + S, + ref_bfs_dist, + ref_bfs_pred, + ref_bfs_sigmas, + source); + + // Device data for cugraph_bfs + thrust::device_vector d_cugraph_dist(number_of_vertices); + thrust::device_vector d_cugraph_pred(number_of_vertices); + thrust::device_vector d_cugraph_sigmas(number_of_vertices); + + // This test only checks for sigmas equality + std::vector cugraph_sigmas(number_of_vertices); + + cugraph::bfs(G, + d_cugraph_dist.data().get(), + d_cugraph_pred.data().get(), + d_cugraph_sigmas.data().get(), + source, + G.prop.directed); + CUDA_TRY(cudaMemcpy(cugraph_sigmas.data(), + d_cugraph_sigmas.data().get(), + sizeof(double) * d_cugraph_sigmas.size(), + cudaMemcpyDeviceToHost)); + for (int i = 0; i < number_of_vertices; ++i) { + EXPECT_TRUE( + compare_close(cugraph_sigmas[i], ref_bfs_sigmas[i], TEST_EPSILON, TEST_ZERO_THRESHOLD)) + << "[MISMATCH] vaid = " << i << ", cugraph = " << cugraph_sigmas[i] + << " c++ ref = " << ref_bfs_sigmas[i]; + } + } +}; +//============================================================================== +// Tests +//============================================================================== +// BC +// ----------------------------------------------------------------------------- +// Verifiy Un-Normalized results +// Endpoint parameter is currently not usefull, is for later use +TEST_P(Tests_BC, CheckFP32_NO_NORMALIZE_NO_ENDPOINTS) { - std::vector graph_offsets{{0, 1, 2, 5, 7, 10, 12, 14}}; - std::vector graph_indices{{2, 2, 0, 1, 3, 2, 4, 3, 5, 6, 4, 6, 4, 5}}; + run_current_test(GetParam()); +} - std::vector expected{{0.0, 0.0, 0.6, 0.6, 0.5333333, 0.0, 0.0}}; +TEST_P(Tests_BC, CheckFP64_NO_NORMALIZE_NO_ENDPOINTS) +{ + run_current_test(GetParam()); +} - int num_verts = graph_offsets.size() - 1; - int num_edges = graph_indices.size(); +// FIXME: Currently endpoints throws and exception as it is not supported +TEST_P(Tests_BC, CheckFP32_NO_NORMALIZE_ENDPOINTS) +{ + run_current_test(GetParam()); +} - thrust::device_vector d_graph_offsets(graph_offsets); - thrust::device_vector d_graph_indices(graph_indices); - thrust::device_vector d_result(num_verts); +TEST_P(Tests_BC, CheckFP64_NO_NORMALIZE_ENDPOINTS) +{ + run_current_test(GetParam()); +} - std::vector result(num_verts); +// Verifiy Normalized results +TEST_P(Tests_BC, CheckFP32_NORMALIZE_NO_ENPOINTS) +{ + run_current_test(GetParam()); +} - cugraph::experimental::GraphCSRView G( - d_graph_offsets.data().get(), d_graph_indices.data().get(), nullptr, num_verts, num_edges); +TEST_P(Tests_BC, CheckFP64_NORMALIZE_NO_ENPOINTS) +{ + run_current_test(GetParam()); +} - cugraph::betweenness_centrality(G, d_result.data().get()); +// FIXME: Currently endpoints throws and exception as it is not supported +TEST_P(Tests_BC, CheckFP32_NORMALIZE_ENDPOINTS) +{ + run_current_test(GetParam()); +} + +TEST_P(Tests_BC, CheckFP64_NORMALIZE_ENDPOINTS) +{ + run_current_test(GetParam()); +} - cudaMemcpy( - result.data(), d_result.data().get(), sizeof(float) * num_verts, cudaMemcpyDeviceToHost); +// FIXME: There is an InvalidValue on a Memcopy only on tests/datasets/dblp.mtx +INSTANTIATE_TEST_CASE_P(simple_test, + Tests_BC, + ::testing::Values(BC_Usecase("test/datasets/karate.mtx", 0), + BC_Usecase("test/datasets/netscience.mtx", 4), + BC_Usecase("test/datasets/wiki2003.mtx", 4), + BC_Usecase("test/datasets/wiki-Talk.mtx", 4))); - for (int i = 0; i < num_verts; ++i) EXPECT_FLOAT_EQ(result[i], expected[i]); +// BFS +// ----------------------------------------------------------------------------- +// FIXME: This could be reused by Issue #778 +TEST_P(Tests_BFS, CheckFP32) { run_current_test(GetParam()); } + +TEST_P(Tests_BFS, CheckFP64) { run_current_test(GetParam()); } + +INSTANTIATE_TEST_CASE_P(simple_test, + Tests_BFS, + ::testing::Values(BFS_Usecase("test/datasets/karate.mtx", 0), + BFS_Usecase("test/datasets/polbooks.mtx", 0), + BFS_Usecase("test/datasets/netscience.mtx", 0), + BFS_Usecase("test/datasets/netscience.mtx", 100), + BFS_Usecase("test/datasets/wiki2003.mtx", 1000), + BFS_Usecase("test/datasets/wiki-Talk.mtx", 1000))); + +int main(int argc, char **argv) +{ + rmmInitialize(nullptr); + testing::InitGoogleTest(&argc, argv); + int rc = RUN_ALL_TESTS(); + rmmFinalize(); + return rc; } diff --git a/cpp/tests/sssp/sssp_test.cu b/cpp/tests/sssp/sssp_test.cu index 867c9ec42e2..12b7c32888b 100644 --- a/cpp/tests/sssp/sssp_test.cu +++ b/cpp/tests/sssp/sssp_test.cu @@ -193,7 +193,7 @@ class Tests_SSSP : public ::testing::TestWithParam { ASSERT_TRUE((typeid(DistType) == typeid(float)) || (typeid(DistType) == typeid(double))); if (param.type_ == RMAT) { // This is size_t due to grmat_gen which should be fixed there - // TODO rmat is disabled + // FIXME: rmat is disabled return; } else if (param.type_ == MTX) { MaxVType m, k; @@ -395,7 +395,7 @@ TEST_P(Tests_SSSP, CheckFP64_NO_RANDOM_DIST_PREDS) run_current_test(GetParam()); } -// TODO: There might be some tests that are done twice (MTX that are not patterns) +// FIXME: There might be some tests that are done twice (MTX that are not patterns) TEST_P(Tests_SSSP, CheckFP32_RANDOM_DIST_NO_PREDS) { run_current_test(GetParam()); diff --git a/python/cugraph/centrality/betweenness_centrality.pxd b/python/cugraph/centrality/betweenness_centrality.pxd index 61bc159ae5c..183750ce463 100644 --- a/python/cugraph/centrality/betweenness_centrality.pxd +++ b/python/cugraph/centrality/betweenness_centrality.pxd @@ -22,6 +22,10 @@ from libcpp cimport bool cdef extern from "algorithms.hpp" namespace "cugraph": + ctypedef enum cugraph_bc_implem_t: + CUGRAPH_DEFAULT "cugraph::cugraph_bc_implem_t::CUGRAPH_DEFAULT" + CUGRAPH_GUNROCK "cugraph::cugraph_bc_implem_t::CUGRAPH_GUNROCK" + cdef void betweenness_centrality[VT,ET,WT,result_t]( const GraphCSRView[VT,ET,WT] &graph, result_t *result, @@ -29,5 +33,6 @@ cdef extern from "algorithms.hpp" namespace "cugraph": bool endpoints, const WT *weight, VT k, - const VT *vertices) except + + const VT *vertices, + cugraph_bc_implem_t implem) except + diff --git a/python/cugraph/centrality/betweenness_centrality.py b/python/cugraph/centrality/betweenness_centrality.py index bd19d76a8b0..4ab2f911333 100644 --- a/python/cugraph/centrality/betweenness_centrality.py +++ b/python/cugraph/centrality/betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. +# Copyright (c) 2019-2020, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -11,44 +11,83 @@ # See the License for the specific language governing permissions and # limitations under the License. +import random +import numpy as np from cugraph.centrality import betweenness_centrality_wrapper +# NOTE: result_type=float could ne an intuitive way to indicate the result type def betweenness_centrality(G, k=None, normalized=True, - weight=None, endpoints=False, seed=None): + weight=None, endpoints=False, implementation=None, + seed=None, result_dtype=np.float64): """ - Compute the betweenness centrality for all nodes of the graph G. cuGraph - does not currently support the 'endpoints' and 'weight' parameters + Compute the betweenness centrality for all nodes of the graph G from a + sample of 'k' sources. + CuGraph does not currently support the 'endpoints' and 'weight' parameters as seen in the corresponding networkX call. Parameters ---------- G : cuGraph.Graph cuGraph graph descriptor with connectivity information. The graph can - be either directed (DiGraph) or undirected (Graph) - k : int, optional - Default is None. + be either directed (DiGraph) or undirected (Graph). + Weights in the graph are ignored, the current implementation uses + BFS traversals. Use weight parameter if weights need to be considered + (currently not supported) + + k : int or list or None, optional, default=None If k is not None, use k node samples to estimate betweenness. Higher values give better approximation + If k is a list, use the content of the list for estimation: the list + should contain vertices identifiers. + Vertices obtained through sampling or defined as a list will be used as + sources for traversals inside the algorithm. + normalized : bool, optional Default is True. If true, the betweenness values are normalized by - 2/((n-1)(n-2)) for Graphs (undirected), and - 1 / ((n-1)(n-2)) for DiGraphs (directed graphs) + 2 / ((n - 1) * (n - 2)) for Graphs (undirected), and + 1 / ((n - 1) * (n - 2)) for DiGraphs (directed graphs) where n is the number of nodes in G. - weight : cudf.Series - Specifies the weights to be used for each vertex. - endpoints : bool, optional - If true, include the endpoints in the shortest path counts + Normalization will ensure that the values in [0, 1], + this normalization scales fo the highest possible value where one + node is crossed by every single shortest path. + + weight : cudf.DataFrame, optional, default=None + Specifies the weights to be used for each edge. + Should contain a mapping between + edges and weights. + (Not Supported) + + endpoints : bool, optional, default=False + If true, include the endpoints in the shortest path counts. + (Not Supported) + + implementation : string, optional, default=None + if implementation is None or "default", uses native cugraph, + if "gunrock" uses gunrock based bc. + The default version supports normalized, k and seed options. + "gunrock" might be faster when considering all the sources, but + only return float results and consider all the vertices as sources. + seed : optional - k is specified and seed is not None, use seed to initialize the random - number generator + if k is specified and k is an integer, use seed to initialize the + random number generator. + Using None as seed relies on random.seed() behavior: using current + system time + If k is either None or list: seed parameter is ignored + + result_dtype : np.float32 or np.float64, optional, default=np.float64 + Indicate the data type of the betweenness centrality scores + Using double automatically switch implementation to "default" Returns ------- df : cudf.DataFrame GPU data frame containing two cudf.Series of size V: the vertex - identifiers and the corresponding katz centrality values. + identifiers and the corresponding betweenness centrality values. + Please note that the resulting the 'vertex' column might not be + in ascending order. df['vertex'] : cudf.Series Contains the vertex identifiers @@ -75,16 +114,63 @@ def betweenness_centrality(G, k=None, normalized=True, # workaround. # vertices = None + if implementation is None: + implementation = "default" + if implementation not in ["default", "gunrock"]: + raise ValueError("Only two implementations are supported: 'default' " + "and 'gunrock'") + if k is not None: - raise Exception("sampling feature of betweenness " - "centrality not currently supported") + if implementation == "gunrock": + raise ValueError("sampling feature of betweenness " + "centrality not currently supported " + "with gunrock implementation, " + "please use None or 'default'") + # In order to compare with pre-set sources, + # k can either be a list or an integer or None + # int: Generate an random sample with k elements + # list: k become the length of the list and vertices become the content + # None: All the vertices are considered + # NOTE: We do not renumber in case k is an int, the sampling is + # not operating on the valid vertices identifiers but their + # indices: + # Example: + # - vertex '2' is missing + # - vertices '0' '1' '3' '4' exist + # - There is a vertex at index 2 (there is not guarantee that it is + # vertice '3' ) + if isinstance(k, int): + random.seed(seed) + vertices = random.sample(range(G.number_of_vertices()), k) + # Using k as a list allows to have an easier way to compare against + # other implementations on + elif isinstance(k, list): + vertices = k + k = len(vertices) + # We assume that the list that was provided is not the indices + # in the graph structure but the vertices identifiers in the graph + # hence: [1, 2, 10] should proceed to sampling on vertices that + # have 1, 2 and 10 as their identifiers + # FIXME: There might be a cleaner way to obtain the inverse mapping + if G.renumbered: + vertices = [G.edgelist.renumber_map[G.edgelist.renumber_map == + vert].index[0] for vert in + vertices] + + if endpoints is True: + raise NotImplementedError("endpoints accumulation for betweenness " + "centrality not currently supported") if weight is not None: - raise Exception("weighted implementation of betweenness " - "centrality not currently supported") + raise NotImplementedError("weighted implementation of betweenness " + "centrality not currently supported") + if result_dtype not in [np.float32, np.float64]: + raise TypeError("result type can only be np.float32 or np.float64") df = betweenness_centrality_wrapper.betweenness_centrality(G, normalized, endpoints, weight, - k, vertices) + k, vertices, + implementation, + result_dtype) return df diff --git a/python/cugraph/centrality/betweenness_centrality_wrapper.pyx b/python/cugraph/centrality/betweenness_centrality_wrapper.pyx index 5515d4c5a95..31fe7db8d4e 100644 --- a/python/cugraph/centrality/betweenness_centrality_wrapper.pyx +++ b/python/cugraph/centrality/betweenness_centrality_wrapper.pyx @@ -17,7 +17,9 @@ # cython: language_level = 3 from cugraph.centrality.betweenness_centrality cimport betweenness_centrality as c_betweenness_centrality +from cugraph.centrality.betweenness_centrality cimport cugraph_bc_implem_t from cugraph.structure.graph_new cimport * +import cugraph.structure.graph from cugraph.utilities.column_utils cimport * from cugraph.utilities.unrenumber import unrenumber from libcpp cimport bool @@ -30,10 +32,23 @@ import numpy as np import numpy.ctypeslib as ctypeslib -def betweenness_centrality(input_graph, normalized, endpoints, weight, k, vertices): +def betweenness_centrality(input_graph, normalized, endpoints, weight, k, + vertices, implementation, result_dtype): """ Call betweenness centrality """ + # NOTE: This is based on the fact that the call to the wrapper already + # checked for the validity of the implementation parameter + cdef cugraph_bc_implem_t bc_implementation = cugraph_bc_implem_t.CUGRAPH_DEFAULT + cdef GraphCSRView[int, int, float] graph_float + cdef GraphCSRView[int, int, double] graph_double + + if (implementation == "default"): # Redundant + bc_implementation = cugraph_bc_implem_t.CUGRAPH_DEFAULT + elif (implementation == "gunrock"): + bc_implementation = cugraph_bc_implem_t.CUGRAPH_GUNROCK + else: + raise ValueError() if not input_graph.adjlist: input_graph.view_adj_list() @@ -45,10 +60,10 @@ def betweenness_centrality(input_graph, normalized, endpoints, weight, k, vertic df = cudf.DataFrame() df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) - df['betweenness_centrality'] = cudf.Series(np.zeros(num_verts, dtype=np.float32)) + df['betweenness_centrality'] = cudf.Series(np.zeros(num_verts, dtype=result_dtype)) - cdef uintptr_t c_identifier = df['vertex'].__cuda_array_interface__['data'][0]; - cdef uintptr_t c_betweenness = df['betweenness_centrality'].__cuda_array_interface__['data'][0]; + cdef uintptr_t c_identifier = df['vertex'].__cuda_array_interface__['data'][0] + cdef uintptr_t c_betweenness = df['betweenness_centrality'].__cuda_array_interface__['data'][0] cdef uintptr_t c_offsets = offsets.__cuda_array_interface__['data'][0] cdef uintptr_t c_indices = indices.__cuda_array_interface__['data'][0] @@ -58,21 +73,57 @@ def betweenness_centrality(input_graph, normalized, endpoints, weight, k, vertic if weight is not None: c_weight = weight.__cuda_array_interface__['data'][0] + #FIXME: We could sample directly from a cudf array in the futur: i.e + # c_vertices = vertices.__cuda_array_interface__['data'][0] if vertices is not None: - c_vertices = vertices.__cuda_array_interface__['data'][0] + c_vertices = np.array(vertices, dtype=np.int32).__array_interface__['data'][0] c_k = 0 if k is not None: c_k = k - cdef GraphCSRView[int,int,float] graph - - graph = GraphCSRView[int,int,float](c_offsets, c_indices, NULL, num_verts, num_edges) - - c_betweenness_centrality[int,int,float,float](graph, c_betweenness, normalized, endpoints, c_weight, c_k, c_vertices) - - graph.get_vertex_identifiers(c_identifier) - + # NOTE: The current implementation only has and + # as explicit template declaration + # The current BFS requires the GraphCSR to be declared + # as or even if weights is null + if result_dtype == np.float32: + graph_float = GraphCSRView[int, int, float]( c_offsets, c_indices, + NULL, num_verts, num_edges) + # FIXME: There might be a way to avoid manually setting the Graph property + graph_float.prop.directed = type(input_graph) is cugraph.structure.graph.DiGraph + + c_betweenness_centrality[int, int, float, float](graph_float, + c_betweenness, + normalized, endpoints, + c_weight, c_k, + c_vertices, + bc_implementation) + graph_float.get_vertex_identifiers(c_identifier) + elif result_dtype == np.float64: + graph_double = GraphCSRView[int, int, double](c_offsets, c_indices, + NULL, num_verts, num_edges) + # FIXME: There might be a way to avoid manually setting the Graph property + graph_double.prop.directed = type(input_graph) is cugraph.structure.graph.DiGraph + + c_betweenness_centrality[int, int, double, double](graph_double, + c_betweenness, + normalized, endpoints, + c_weight, c_k, + c_vertices, + bc_implementation) + graph_double.get_vertex_identifiers(c_identifier) + else: + raise TypeError("result type for betweenness centrality can only be " + "float or double") + + #FIXME: For large graph renumbering produces a dataframe organized + # in buckets, i.e, if they are 3 buckets + # 0 + # 8191 + # 16382 + # 1 + # 8192 ... + # Instead of having the sources in ascending order if input_graph.renumbered: df = unrenumber(input_graph.edgelist.renumber_map, df, 'vertex') diff --git a/python/cugraph/tests/test_betweenness_centrality.py b/python/cugraph/tests/test_betweenness_centrality.py index a88ceeae3e6..16f2a425c77 100644 --- a/python/cugraph/tests/test_betweenness_centrality.py +++ b/python/cugraph/tests/test_betweenness_centrality.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. +# Copyright (c) 2019-2020, NVIDIA CORPORATION.: # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -17,6 +17,8 @@ import cugraph from cugraph.tests import utils +import random +import numpy as np # Temporarily suppress warnings till networkX fixes deprecation warnings # (Using or importing the ABCs from 'collections' instead of from @@ -28,64 +30,473 @@ warnings.filterwarnings("ignore", category=DeprecationWarning) import networkx as nx +# NOTE: Endpoint parameter is not currently being tested, there could be a test +# to verify that python raise an error if it is used +# ============================================================================= +# Parameters +# ============================================================================= +DIRECTED_GRAPH_OPTIONS = [False, True] +DEFAULT_EPSILON = 0.0001 +IMPLEMENTATION_OPTIONS = ['default', 'gunrock'] -print('Networkx version : {} '.format(nx.__version__)) +TINY_DATASETS = ['../datasets/karate.csv'] +UNRENUMBERED_DATASETS = ['../datasets/karate.csv'] -def calc_betweenness_centrality(graph_file, normalized=True): +SMALL_DATASETS = ['../datasets/netscience.csv'] + +SUBSET_SIZE_OPTIONS = [4] +SUBSET_SEED_OPTIONS = [42] + +# NOTE: The following is not really being exploited in the tests as the +# datasets that are used are too small to compare, but it ensures that both +# path are actually sane +RESULT_DTYPE_OPTIONS = [np.float32, np.float64] + + +# ============================================================================= +# Comparison functions +# ============================================================================= +def build_graphs(graph_file, directed=True): + # cugraph cu_M = utils.read_csv_file(graph_file) - G = cugraph.DiGraph() + G = cugraph.DiGraph() if directed else cugraph.Graph() G.from_cudf_edgelist(cu_M, source='0', destination='1') + G.view_adj_list() # Enforce generation before computation - df = cugraph.betweenness_centrality(G, normalized=normalized) - - NM = utils.read_csv_for_nx(graph_file) - Gnx = nx.from_pandas_edgelist(NM, create_using=nx.DiGraph(), + # networkx + M = utils.read_csv_for_nx(graph_file) + Gnx = nx.from_pandas_edgelist(M, create_using=(nx.DiGraph() if directed + else nx.Graph()), source='0', target='1') - nb = nx.betweenness_centrality(Gnx, normalized=normalized) - pdf = [nb[k] for k in sorted(nb.keys())] - df['nx'] = pdf - df = df.rename({'betweenness_centrality': 'cu'}) - return df + return G, Gnx -DATASETS = ['../datasets/dolphins.csv', - '../datasets/netscience.csv'] +def calc_betweenness_centrality(graph_file, directed=True, normalized=False, + weight=None, endpoints=False, + k=None, seed=None, implementation=None, + result_dtype=np.float32): + """ Generate both cugraph and networkx betweenness centrality + Parameters + ---------- + graph_file : string + Path to COO Graph representation in .csv format + + directed : bool, optional, default=True + + normalized : bool + True: Normalize Betweenness Centrality scores + False: Scores are left unnormalized + + k : int or None, optional, default=None + int: Number of sources to sample from + None: All sources are used to compute + + seed : int or None, optional, default=None + Seed for random sampling of the starting point + + implementation : string or None, optional, default=None + There are 2 possibilities 'default' and 'gunrock', if None falls back + into 'default' + + Returns + ------- + cu_bc : dict + Each key is the vertex identifier, each value is the betweenness + centrality score obtained from cugraph betweenness_centrality + nx_bc : dict + Each key is the vertex identifier, each value is the betweenness + centrality score obtained from networkx betweenness_centrality + """ + G, Gnx = build_graphs(graph_file, directed=directed) + calc_func = None + if k is not None and seed is not None: + calc_func = _calc_bc_subset + elif k is not None: + calc_func = _calc_bc_subset_fixed + else: # We processed to a comparison using every sources + calc_func = _calc_bc_full + cu_bc, nx_bc = calc_func(G, Gnx, normalized=normalized, weight=weight, + endpoints=endpoints, k=k, seed=seed, + implementation=implementation, + result_dtype=result_dtype) + + return cu_bc, nx_bc -@pytest.mark.parametrize('graph_file', DATASETS) -def test_betweenness_centrality(graph_file): - gc.collect() - scores = calc_betweenness_centrality(graph_file) +def _calc_bc_subset(G, Gnx, normalized, weight, endpoints, k, seed, + implementation, result_dtype): + # NOTE: Networkx API does not allow passing a list of vertices + # And the sampling is operated on Gnx.nodes() directly + # We first mimic acquisition of the nodes to compare with same sources + random.seed(seed) # It will be called again in nx's call + sources = random.sample(Gnx.nodes(), k) + df = cugraph.betweenness_centrality(G, normalized=normalized, + weight=weight, + endpoints=endpoints, + k=sources, + implementation=implementation, + result_dtype=result_dtype) + nx_bc = nx.betweenness_centrality(Gnx, normalized=normalized, k=k, + seed=seed) + cu_bc = {key: score for key, score in + zip(df['vertex'].to_array(), + df['betweenness_centrality'].to_array())} + return cu_bc, nx_bc - err = 0 - epsilon = 0.0001 - for i in range(len(scores)): - if (scores['cu'][i] < (scores['nx'][i] * (1 - epsilon)) or - scores['cu'][i] > (scores['nx'][i] * (1 + epsilon))): - err = err + 1 - print('ERROR: cu = {}, nx = {}'.format(scores['cu'].iloc[i], - scores['nx'].iloc[i])) +def _calc_bc_subset_fixed(G, Gnx, normalized, weight, endpoints, k, seed, + implementation, result_dtype): + assert isinstance(k, int), "This test is meant for verifying coherence " \ + "when k is given as an int" + # In the fixed set we compare cu_bc against itself as we random.seed(seed) + # on the same seed and then sample on the number of vertices themselves + if seed is None: + seed = 123 # random.seed(None) uses time, but we want same sources + random.seed(seed) # It will be called again in cugraph's call + sources = random.sample(range(G.number_of_vertices()), k) + # The first call is going to proceed to the random sampling in the same + # fashion as the lines above + df = cugraph.betweenness_centrality(G, k=k, normalized=normalized, + weight=weight, + endpoints=endpoints, + implementation=implementation, + seed=seed, + result_dtype=result_dtype) + # The second call is going to process source that were already sampled + # We set seed to None as k : int, seed : not none should not be normal + # behavior + df2 = cugraph.betweenness_centrality(G, k=sources, normalized=normalized, + weight=weight, + endpoints=endpoints, + implementation=implementation, + seed=None, + result_dtype=result_dtype) + cu_bc = {key: score for key, score in + zip(df['vertex'].to_array(), + df['betweenness_centrality'].to_array())} + cu_bc2 = {key: score for key, score in + zip(df2['vertex'].to_array(), + df2['betweenness_centrality'].to_array())} - assert err == 0 + return cu_bc, cu_bc2 -@pytest.mark.parametrize('graph_file', DATASETS) -def test_betweenness_centrality_unnormalized(graph_file): +def _calc_bc_full(G, Gnx, normalized, weight, endpoints, implementation, + k, seed, + result_dtype): + df = cugraph.betweenness_centrality(G, normalized=normalized, + weight=weight, + endpoints=endpoints, + implementation=implementation, + result_dtype=result_dtype) + assert df['betweenness_centrality'].dtype == result_dtype, \ + "'betweenness_centrality' column has not the expected type" + nx_bc = nx.betweenness_centrality(Gnx, normalized=normalized, + weight=weight, + endpoints=endpoints) + + cu_bc = {key: score for key, score in + zip(df['vertex'].to_array(), + df['betweenness_centrality'].to_array())} + return cu_bc, nx_bc + + +# ============================================================================= +# Utils +# ============================================================================= +def compare_single_score(result, expected, epsilon): + """ + Compare value in score at given index with relative error + + Parameters + ---------- + scores : DataFrame + contains 'cu' and 'nx' columns which are the values to compare + idx : int + row index of the DataFrame + epsilon : floating point + indicates relative error tolerated + + Returns + ------- + close : bool + True: Result and expected are close to each other + False: Otherwise + """ + close = np.isclose(result, expected, rtol=epsilon) + return close + + +# NOTE: We assume that both cugraph and networkx are generating dicts with +# all the sources, thus we can compare all of them +def compare_scores(cu_bc, ref_bc, epsilon=DEFAULT_EPSILON): + missing_key_error = 0 + score_mismatch_error = 0 + for vertex in ref_bc: + if vertex in cu_bc: + result = cu_bc[vertex] + expected = ref_bc[vertex] + if not compare_single_score(result, expected, epsilon=epsilon): + score_mismatch_error += 1 + print("ERROR: vid = {}, cu = {}, " + "nx = {}".format(vertex, result, expected)) + else: + missing_key_error += 1 + print("[ERROR] Missing vertex {vertex}".format(vertex=vertex)) + assert missing_key_error == 0, "Some vertices were missing" + assert score_mismatch_error == 0, "Some scores were not close enough" + + +def prepare_test(): gc.collect() - scores = calc_betweenness_centrality(graph_file, False) - err = 0 - epsilon = 0.0001 +# ============================================================================= +# Tests +# ============================================================================= +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('implementation', IMPLEMENTATION_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_tiny(graph_file, + directed, implementation, + result_dtype): + """Test Normalized Betweenness Centrality""" + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, directed=directed, + normalized=True, + implementation=implementation, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('implementation', IMPLEMENTATION_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_unnormalized_tiny(graph_file, + directed, implementation, + result_dtype): + """Test Unnormalized Betweenness Centrality""" + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, directed=directed, + normalized=False, + implementation=implementation, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', SMALL_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('implementation', IMPLEMENTATION_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_small(graph_file, + directed, implementation, + result_dtype): + """Test Unnormalized Betweenness Centrality""" + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, directed=directed, + normalized=True, + implementation=implementation, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', SMALL_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('implementation', IMPLEMENTATION_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_unnormalized_small(graph_file, + directed, implementation, + result_dtype): + """Test Unnormalized Betweenness Centrality""" + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, directed=directed, + normalized=False, + implementation=implementation, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', SMALL_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('subset_size', SUBSET_SIZE_OPTIONS) +@pytest.mark.parametrize('subset_seed', SUBSET_SEED_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_subset_small(graph_file, + directed, + subset_size, + subset_seed, + result_dtype): + """Test Unnormalized Betweenness Centrality using a subset + + Only k sources are considered for an approximate Betweenness Centrality + """ + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + directed=directed, + normalized=True, + k=subset_size, + seed=subset_seed, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +# NOTE: This test should only be execute on unrenumbered datasets +# the function operating the comparison inside is first proceeding +# to a random sampling over the number of vertices (thus direct offsets) +# in the graph structure instead of actual vertices identifiers +@pytest.mark.parametrize('graph_file', UNRENUMBERED_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('subset_size', SUBSET_SIZE_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_fixed_sample(graph_file, + directed, + subset_size, + result_dtype): + """Test Unnormalized Betweenness Centrality using a subset + + Only k sources are considered for an approximate Betweenness Centrality + """ + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + directed=directed, + normalized=True, + k=subset_size, + seed=None, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', SMALL_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('subset_size', SUBSET_SIZE_OPTIONS) +@pytest.mark.parametrize('subset_seed', SUBSET_SEED_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_unnormalized_subset_small(graph_file, + directed, + subset_size, + subset_seed, + result_dtype): + """Test Unnormalized Betweenness Centrality on Graph on subset + + Only k sources are considered for an approximate Betweenness Centrality + """ + prepare_test() + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + directed=directed, + normalized=False, + k=subset_size, + seed=subset_seed, + result_dtype=result_dtype) + compare_scores(cu_bc, nx_bc) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_invalid_implementation(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality with an invalid implementation name""" + prepare_test() + with pytest.raises(ValueError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + directed=directed, + implementation="invalid", + result_dtype=result_dtype) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_gunrock_subset(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality with subset and gunrock""" + prepare_test() + with pytest.raises(ValueError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + directed=directed, + normalized=False, + k=1, + implementation="gunrock", + result_dtype=result_dtype) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_unnormalized_endpoints_except(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality unnormalized + endpoints""" + prepare_test() + with pytest.raises(NotImplementedError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + normalized=False, + endpoints=True, + directed=directed, + result_dtype=result_dtype) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_endpoints_except(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality normalized + endpoints""" + prepare_test() + with pytest.raises(NotImplementedError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + normalized=True, + endpoints=True, + directed=directed, + result_dtype=result_dtype) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_unnormalized_weight_except(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality unnormalized + weight""" + prepare_test() + with pytest.raises(NotImplementedError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + normalized=False, + weight=True, + directed=directed, + result_dtype=result_dtype) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('result_dtype', RESULT_DTYPE_OPTIONS) +def test_betweenness_centrality_normalized_weight_except(graph_file, + directed, + result_dtype): + """Test calls betwenness_centrality normalized + weight""" + prepare_test() + with pytest.raises(NotImplementedError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + normalized=True, + weight=True, + directed=directed, + result_dtype=result_dtype) - for i in range(len(scores)): - if (scores['cu'][i] < (scores['nx'][i] * (1 - epsilon)) or - scores['cu'][i] > (scores['nx'][i] * (1 + epsilon))): - err = err + 1 - print('ERROR: cu = {}, nx = {}'.format(scores['cu'].iloc[i], - scores['nx'].iloc[i])) - assert err == 0 +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +def test_betweenness_centrality_invalid_dtype(graph_file, directed): + """Test calls betwenness_centrality normalized + weight""" + prepare_test() + with pytest.raises(TypeError): + cu_bc, nx_bc = calc_betweenness_centrality(graph_file, + normalized=True, + result_dtype=str, + directed=directed) diff --git a/python/cugraph/tests/test_bfs.py b/python/cugraph/tests/test_bfs.py index cea555bde05..d3afa5a90b3 100644 --- a/python/cugraph/tests/test_bfs.py +++ b/python/cugraph/tests/test_bfs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. +# Copyright (c) 2019-2020, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -12,89 +12,243 @@ # limitations under the License. import gc -import queue -import time import numpy as np import pytest -import scipy import cugraph from cugraph.tests import utils +import random +# Temporarily suppress warnings till networkX fixes deprecation warnings +# (Using or importing the ABCs from 'collections' instead of from +# 'collections.abc' is deprecated, and in 3.8 it will stop working) for +# python 3.7. Also, this import networkx needs to be relocated in the +# third-party group once this gets fixed. +import warnings +with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=DeprecationWarning) + import networkx as nx + import networkx.algorithms.centrality.betweenness as nxacb -def cugraph_call(cu_M, start_vertex): +# ============================================================================= +# Parameters +# ============================================================================= +DIRECTED_GRAPH_OPTIONS = [True, False] - G = cugraph.DiGraph() - G.from_cudf_edgelist(cu_M, source='0', destination='1', - edge_attr='2') +TINY_DATASETS = ['../datasets/karate.csv', + '../datasets/dolphins.csv', + '../datasets/polbooks.csv'] +SMALL_DATASETS = ['../datasets/netscience.csv', + '../datasets/email-Eu-core.csv'] - t1 = time.time() - df = cugraph.bfs(G, start_vertex) - t2 = time.time() - t1 - print('Time : '+str(t2)) +DATASETS = TINY_DATASETS + SMALL_DATASETS - # Return distances as np.array() - return df['vertex'].to_array(), df['distance'].to_array() +SUBSET_SEED_OPTIONS = [42] +DEFAULT_EPSILON = 1e-6 -def base_call(M, start_vertex): - int_max = 2**31 - 1 - N = max(max(M['0']), max(M['1'])) + 1 - M = scipy.sparse.csr_matrix((M.weight, (M['0'], M['1'])), - shape=(N, N)) - - offsets = M.indptr - indices = M.indices - num_verts = len(offsets) - 1 - dist = np.zeros(num_verts, dtype=np.int32) - vertex = list(range(num_verts)) - - for i in range(num_verts): - dist[i] = int_max - - q = queue.Queue() - q.put(start_vertex) - dist[start_vertex] = 0 - while(not q.empty()): - u = q.get() - for i_col in range(offsets[u], offsets[u + 1]): - v = indices[i_col] - if (dist[v] == int_max): - dist[v] = dist[u] + 1 - q.put(v) - - return vertex, dist +# ============================================================================= +# Utils +# ============================================================================= +def prepare_test(): + gc.collect() -DATASETS = ['../datasets/dolphins.csv', - '../datasets/karate.csv', - '../datasets/polbooks.csv', - '../datasets/netscience.csv', - '../datasets/email-Eu-core.csv'] +# TODO: This is also present in test_betweenness_centrality.py +# And it could probably be used in SSSP also +def build_graphs(graph_file, directed=True): + # cugraph + cu_M = utils.read_csv_file(graph_file) + G = cugraph.DiGraph() if directed else cugraph.Graph() + G.from_cudf_edgelist(cu_M, source='0', destination='1') + G.view_adj_list() # Enforce CSR generation before computation -# Test all combinations of default/managed and pooled/non-pooled allocation + # networkx + M = utils.read_csv_for_nx(graph_file) + Gnx = nx.from_pandas_edgelist(M, create_using=(nx.DiGraph() if directed + else nx.Graph()), + source='0', target='1') + return G, Gnx + + +# ============================================================================= +# Functions for comparison +# ============================================================================= +# NOTE: We need to use relative error, the values of the shortest path +# counters can reach extremely high values 1e+80 and above +def compare_single_sp_counter(result, expected, epsilon=DEFAULT_EPSILON): + return np.isclose(result, expected, rtol=epsilon) + + +def compare_bfs(graph_file, directed=True, return_sp_counter=False, + seed=42): + """ Genereate both cugraph and reference bfs traversal + + Parameters + ----------- + graph_file : string + Path to COO Graph representation in .csv format + + directed : bool, optional, default=True + Indicated whether the graph is directed or not + + return_sp_counter : bool, optional, default=False + Retrun shortest path counters from traversal if True + + seed : int, optional, default=42 + Value for random seed to obtain starting vertex + + Returns + ------- + """ + G, Gnx = build_graphs(graph_file, directed) + # Seed for reproducibility + if isinstance(seed, int): + random.seed(seed) + start_vertex = random.sample(Gnx.nodes(), 1)[0] + + # Test for shortest_path_counter + compare_func = _compare_bfs_spc if return_sp_counter else _compare_bfs + + # NOTE: We need to take 2 different path for verification as the nx + # functions used as reference return dictionaries that might + # not contain all the vertices while the cugraph version return + # a cudf.DataFrame with all the vertices, also some verification + # become slow with the data transfer + compare_func(G, Gnx, start_vertex) + elif isinstance(seed, list): # For other Verifications + for start_vertex in seed: + compare_func = _compare_bfs_spc if return_sp_counter else \ + _compare_bfs + compare_func(G, Gnx, start_vertex) + elif seed is None: # Same here, it is only to run full checks + for start_vertex in Gnx: + compare_func = _compare_bfs_spc if return_sp_counter else \ + _compare_bfs + compare_func(G, Gnx, start_vertex) + else: # Unknown type given to seed + raise NotImplementedError("Invalid type for seed") + + +def _compare_bfs(G, Gnx, source): + df = cugraph.bfs(G, source, return_sp_counter=False) + # This call should only contain 3 columns: + # 'vertex', 'distance', 'predecessor' + # It also confirms wether or not 'sp_counter' has been created by the call + # 'sp_counter' triggers atomic operations in BFS, thus we want to make + # sure that it was not the case + # NOTE: 'predecessor' is always returned while the C++ function allows to + # pass a nullptr + assert len(df.columns) == 3, "The result of the BFS has an invalid " \ + "number of columns" + cu_distances = {vertex: dist for vertex, dist in + zip(df['vertex'].to_array(), df['distance'].to_array())} + cu_predecessors = {vertex: dist for vertex, dist in + zip(df['vertex'].to_array(), + df['predecessor'].to_array())} + + nx_distances = nx.single_source_shortest_path_length(Gnx, source) + # TODO: The following only verifies vertices that were reached + # by cugraph's BFS. + # We assume that the distances are ginven back as integers in BFS + # max_val = np.iinfo(df['distance'].dtype).max + # Unreached vertices have a distance of max_val + + missing_vertex_error = 0 + distance_mismatch_error = 0 + invalid_predecessor_error = 0 + for vertex in nx_distances: + if vertex in cu_distances: + result = cu_distances[vertex] + expected = nx_distances[vertex] + if (result != expected): + print("[ERR] Mismatch on distances: " + "vid = {}, cugraph = {}, nx = {}".format(vertex, + result, + expected)) + distance_mismatch_error += 1 + if vertex not in cu_predecessors: + missing_vertex_error += 1 + else: + pred = cu_predecessors[vertex] + if vertex != source and pred not in nx_distances: + invalid_predecessor_error += 1 + else: + # The graph is unweighted thus, predecessors are 1 away + if (vertex != source and ((nx_distances[pred] + 1 != + cu_distances[vertex]))): + print("[ERR] Invalid on predecessors: " + "vid = {}, cugraph = {}".format(vertex, pred)) + invalid_predecessor_error += 1 + else: + missing_vertex_error += 1 + assert missing_vertex_error == 0, "There are missing vertices" + assert distance_mismatch_error == 0, "There are invalid distances" + assert invalid_predecessor_error == 0, "There are invalid predecessors" + + +def _compare_bfs_spc(G, Gnx, source): + df = cugraph.bfs(G, source, return_sp_counter=True) + cu_sp_counter = {vertex: dist for vertex, dist in + zip(df['vertex'].to_array(), df['sp_counter'].to_array())} + # This call should only contain 3 columns: + # 'vertex', 'distance', 'predecessor', 'sp_counter' + assert len(df.columns) == 4, "The result of the BFS has an invalid " \ + "number of columns" + _, _, nx_sp_counter = nxacb._single_source_shortest_path_basic(Gnx, + source) + # We are not checking for distances / predecessors here as we assume + # that these have been checked in the _compare_bfs tests + # We focus solely on shortest path counting + # NOTE:(as 04/29/2020) The networkx implementation generates a dict with + # all the vertices thus we check for all of them + missing_vertex_error = 0 + shortest_path_counter_errors = 0 + for vertex in nx_sp_counter: + if vertex in cu_sp_counter: + result = cu_sp_counter[vertex] + expected = cu_sp_counter[vertex] + if not compare_single_sp_counter(result, expected): + print("[ERR] Mismatch on shortest paths: " + "vid = {}, cugraph = {}, nx = {}".format(vertex, + result, + expected)) + shortest_path_counter_errors += 1 + else: + missing_vertex_error += 1 + assert missing_vertex_error == 0, "There are missing vertices" + assert shortest_path_counter_errors == 0, "Shortest path counters are " \ + "too different" + + +# ============================================================================= +# Tests +# ============================================================================= @pytest.mark.parametrize('graph_file', DATASETS) -def test_bfs(graph_file): - gc.collect() +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('seed', SUBSET_SEED_OPTIONS) +def test_bfs(graph_file, directed, seed): + """Test BFS traversal on random source with distance and predecessors""" + prepare_test() + compare_bfs(graph_file, directed=directed, return_sp_counter=False, + seed=seed) - M = utils.read_csv_for_nx(graph_file) - cu_M = utils.read_csv_file(graph_file) - base_vid, base_dist = base_call(M, 0) - cugraph_vid, cugraph_dist = cugraph_call(cu_M, 0) - - # Calculating mismatch - # Currently, vertex order mismatch is not considered as an error - cugraph_idx = 0 - base_idx = 0 - distance_error_counter = 0 - while cugraph_idx < len(cugraph_dist): - if base_vid[base_idx] == cugraph_vid[cugraph_idx]: - # An error is detected when for the same vertex - # the distances are different - if base_dist[base_idx] != cugraph_dist[cugraph_idx]: - distance_error_counter += 1 - cugraph_idx += 1 - base_idx += 1 - assert distance_error_counter == 0 +@pytest.mark.parametrize('graph_file', DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +@pytest.mark.parametrize('seed', SUBSET_SEED_OPTIONS) +def test_bfs_spc(graph_file, directed, seed): + """Test BFS traversal on random source with shortest path counting""" + prepare_test() + compare_bfs(graph_file, directed=directed, return_sp_counter=True, + seed=seed) + + +@pytest.mark.parametrize('graph_file', TINY_DATASETS) +@pytest.mark.parametrize('directed', DIRECTED_GRAPH_OPTIONS) +def test_bfs_spc_full(graph_file, directed): + """Test BFS traversal on every vertex with shortest path counting""" + prepare_test() + compare_bfs(graph_file, directed=directed, return_sp_counter=True, + seed=None) diff --git a/python/cugraph/traversal/bfs.pxd b/python/cugraph/traversal/bfs.pxd index 83817751cf6..fe1dd5e5965 100644 --- a/python/cugraph/traversal/bfs.pxd +++ b/python/cugraph/traversal/bfs.pxd @@ -26,5 +26,6 @@ cdef extern from "algorithms.hpp" namespace "cugraph": const GraphCSRView[VT,ET,WT] &graph, VT *distances, VT *predecessors, + double *sp_counters, const VT start_vertex, bool directed) except + diff --git a/python/cugraph/traversal/bfs.py b/python/cugraph/traversal/bfs.py index 8b88318d0b4..9a1e3caa431 100644 --- a/python/cugraph/traversal/bfs.py +++ b/python/cugraph/traversal/bfs.py @@ -15,7 +15,7 @@ from cugraph.structure.graph import Graph -def bfs(G, start): +def bfs(G, start, return_sp_counter=False): """ Find the distances and predecessors for a breadth first traversal of a graph. @@ -28,6 +28,9 @@ def bfs(G, start): start : Integer The index of the graph vertex from which the traversal begins + return_sp_counter : bool, optional, default=False + Indicates if shortest path counters should be returned + Returns ------- df : cudf.DataFrame @@ -39,6 +42,9 @@ def bfs(G, start): df['predecessor'][i] gives for the i'th vertex the vertex it was reached from in the traversal + df['sp_counter'][i] gives for the i'th vertex the number of shortest + path leading to it during traversal (Only if retrun_sp_counter is True) + Examples -------- >>> M = cudf.read_csv('datasets/karate.csv', delimiter=' ', @@ -53,6 +59,6 @@ def bfs(G, start): else: directed = True - df = bfs_wrapper.bfs(G, start, directed) + df = bfs_wrapper.bfs(G, start, directed, return_sp_counter) return df diff --git a/python/cugraph/traversal/bfs_wrapper.pyx b/python/cugraph/traversal/bfs_wrapper.pyx index 92c0c60970c..1b6a508f2ef 100644 --- a/python/cugraph/traversal/bfs_wrapper.pyx +++ b/python/cugraph/traversal/bfs_wrapper.pyx @@ -30,7 +30,8 @@ import cudf import rmm import numpy as np -def bfs(input_graph, start, directed=True): +def bfs(input_graph, start, directed=True, + return_sp_counter=False): """ Call bfs """ @@ -46,6 +47,7 @@ def bfs(input_graph, start, directed=True): cdef uintptr_t c_identifier_ptr = NULL # Pointer to the DataFrame 'vertex' Series cdef uintptr_t c_distance_ptr = NULL # Pointer to the DataFrame 'distance' Series cdef uintptr_t c_predecessor_ptr = NULL # Pointer to the DataFrame 'predecessor' Series + cdef uintptr_t c_sp_counter_ptr = NULL # Pointer to the DataFrame 'sp_counter' Series # Step 2: Verifiy input_graph has the expected format if input_graph.adjlist is None: @@ -75,23 +77,29 @@ def bfs(input_graph, start, directed=True): df['vertex'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) df['distance'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) df['predecessor'] = cudf.Series(np.zeros(num_verts, dtype=np.int32)) + if (return_sp_counter): + df['sp_counter'] = cudf.Series(np.zeros(num_verts, dtype=np.double)) # Step 7: Associate to cudf Series c_identifier_ptr = df['vertex'].__cuda_array_interface__['data'][0] c_distance_ptr = df['distance'].__cuda_array_interface__['data'][0] c_predecessor_ptr = df['predecessor'].__cuda_array_interface__['data'][0] + if return_sp_counter: + c_sp_counter_ptr = df['sp_counter'].__cuda_array_interface__['data'][0] # Step 8: Proceed to BFS - # TODO: [int, int, float] or may add an explicit [int, int, int] in graph.cu? + # FIXME: [int, int, float] or may add an explicit [int, int, int] in graph.cu? graph_float = GraphCSRView[int, int, float]( c_offsets_ptr, c_indices_ptr, NULL, num_verts, num_edges) graph_float.get_vertex_identifiers( c_identifier_ptr) + # Different pathing wether shortest_path_counting is required or not c_bfs.bfs[int, int, float](graph_float, c_distance_ptr, c_predecessor_ptr, + c_sp_counter_ptr, start, directed) #FIXME: Update with multicolumn renumbering diff --git a/python/cugraph/traversal/sssp_wrapper.pyx b/python/cugraph/traversal/sssp_wrapper.pyx index 9434961611e..785baf9a777 100644 --- a/python/cugraph/traversal/sssp_wrapper.pyx +++ b/python/cugraph/traversal/sssp_wrapper.pyx @@ -124,7 +124,7 @@ def sssp(input_graph, source): else: # This case should not happen raise NotImplementedError else: - # TODO: Something might be done here considering WT = float + # FIXME: Something might be done here considering WT = float graph_float = GraphCSRView[int, int, float]( c_offsets_ptr, c_indices_ptr, NULL, @@ -134,6 +134,7 @@ def sssp(input_graph, source): c_bfs.bfs[int, int, float](graph_float, c_distance_ptr, c_predecessor_ptr, + NULL, source) #FIXME: Update with multiple column renumbering