From 808ffa6c06e7b682c5b6c9543de57fd2d1639c71 Mon Sep 17 00:00:00 2001 From: Chuck Hastings <45364586+ChuckHastings@users.noreply.github.com> Date: Tue, 21 Jun 2022 15:17:43 -0400 Subject: [PATCH] Some legacy BFS cleanup (#2347) Legacy BFS is still used by Betweenness Centrality. However, it's no longer called from python. Remove the `call_bfs` functions from cython.cu, and remove a few legacy SSSP files that were missed by an earlier PR. Also removed `call_hits`, which is not currently used either. Authors: - Chuck Hastings (https://github.com/ChuckHastings) Approvers: - Seunghwa Kang (https://github.com/seunghwak) URL: https://github.com/rapidsai/cugraph/pull/2347 --- cpp/src/traversal/legacy/sssp.cu | 306 ------------ cpp/src/traversal/legacy/sssp.cuh | 85 ---- cpp/src/traversal/legacy/sssp_kernels.cuh | 554 ---------------------- cpp/src/utilities/cython.cu | 217 --------- 4 files changed, 1162 deletions(-) delete mode 100644 cpp/src/traversal/legacy/sssp.cu delete mode 100644 cpp/src/traversal/legacy/sssp.cuh delete mode 100644 cpp/src/traversal/legacy/sssp_kernels.cuh diff --git a/cpp/src/traversal/legacy/sssp.cu b/cpp/src/traversal/legacy/sssp.cu deleted file mode 100644 index 91927143d8e..00000000000 --- a/cpp/src/traversal/legacy/sssp.cu +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Author: Prasun Gera pgera@nvidia.com - -#include -#include -#include - -#include - -#include "sssp.cuh" -#include "sssp_kernels.cuh" -#include "traversal_common.cuh" - -namespace cugraph { -namespace detail { - -template -void SSSP::setup() -{ - // Working data - // Each vertex can be in the frontier at most once - frontier.resize(n); - new_frontier.resize(n); - - // size of bitmaps for vertices - vertices_bmap_size = (n / (8 * sizeof(int)) + 1); - - // size of bitmaps for edges - edges_bmap_size = (nnz / (8 * sizeof(int)) + 1); - - // ith bit of isolated_bmap is set <=> degree of ith vertex = 0 - isolated_bmap.resize(vertices_bmap_size); - - // Allocate buffer for data that need to be reset every iteration - iter_buffer_size = sizeof(int) * (edges_bmap_size + vertices_bmap_size) + sizeof(IndexType); - iter_buffer.resize(iter_buffer_size, stream); - // ith bit of relaxed_edges_bmap <=> ith edge was relaxed - relaxed_edges_bmap = static_cast(iter_buffer.data()); - // ith bit of next_frontier_bmap <=> vertex is active in the next frontier - next_frontier_bmap = static_cast(iter_buffer.data()) + edges_bmap_size; - // num vertices in the next frontier - d_new_frontier_cnt = next_frontier_bmap + vertices_bmap_size; - - // vertices_degree[i] = degree of vertex i - vertex_degree.resize(n); - - // frontier_vertex_degree[i] is the degree of vertex frontier[i] - frontier_vertex_degree.resize(n); - - // exclusive sum of frontier_vertex_degree - exclusive_sum_frontier_vertex_degree.resize(n + 1); - - // We use buckets of edges (32 edges per bucket for now, see exact macro in - // sssp_kernels). frontier_vertex_degree_buckets_offsets[i] is the index k - // such as frontier[k] is the source of the first edge of the bucket - // See top down kernels for more details - size_t bucket_off_size = - ((nnz / TOP_DOWN_EXPAND_DIMX + 1) * NBUCKETS_PER_BLOCK + 2) * sizeof(IndexType); - exclusive_sum_frontier_vertex_buckets_offsets.resize(bucket_off_size); - - // Repurpose d_new_frontier_cnt temporarily - IndexType* d_nisolated = d_new_frontier_cnt; - cudaMemsetAsync(d_nisolated, 0, sizeof(IndexType), stream); - - // Computing isolated_bmap - // Only dependent on graph - not source vertex - done once - traversal::flag_isolated_vertices( - n, isolated_bmap.data().get(), row_offsets, vertex_degree.data().get(), d_nisolated, stream); - - cudaMemcpyAsync(&nisolated, d_nisolated, sizeof(IndexType), cudaMemcpyDeviceToHost, stream); - - // We need nisolated to be ready to use - // nisolated is the number of isolated (zero out-degree) vertices - cudaStreamSynchronize(stream); -} - -template -void SSSP::configure(DistType* _distances, - IndexType* _predecessors, - int* _edge_mask) -{ - distances = _distances; - predecessors = _predecessors; - edge_mask = _edge_mask; - - useEdgeMask = (edge_mask != NULL); - computeDistances = (distances != NULL); - computePredecessors = (predecessors != NULL); - - // We need distances for SSSP even if the caller doesn't need them - if (!computeDistances) { - distances_vals.resize(n); - distances = distances_vals.data().get(); - } - // Need next_distances in either case - next_distances_vals.resize(n); - next_distances = next_distances_vals.data().get(); -} - -template -void SSSP::traverse(IndexType source_vertex) -{ - // Init distances to infinities - traversal::fill_vec(distances, n, traversal::vec_t::max, stream); - traversal::fill_vec(next_distances, n, traversal::vec_t::max, stream); - - // If needed, set all predecessors to non-existent (-1) - if (computePredecessors) { cudaMemsetAsync(predecessors, -1, n * sizeof(IndexType), stream); } - - // - // Initial frontier - // - - cudaMemsetAsync(&distances[source_vertex], 0, sizeof(DistType), stream); - cudaMemsetAsync(&next_distances[source_vertex], 0, sizeof(DistType), stream); - - int current_isolated_bmap_source_vert = 0; - - cudaMemcpyAsync(¤t_isolated_bmap_source_vert, - isolated_bmap.data().get() + (source_vertex / INT_SIZE), - sizeof(int), - cudaMemcpyDeviceToHost); - - // We need current_isolated_bmap_source_vert - cudaStreamSynchronize(stream); - - int m = (1 << (source_vertex % INT_SIZE)); - - // If source is isolated (zero outdegree), we are done - if ((m & current_isolated_bmap_source_vert)) { - // Init distances and predecessors are done; stream is synchronized - } - - // Adding source_vertex to init frontier - cudaMemcpyAsync( - frontier.data().get(), &source_vertex, sizeof(IndexType), cudaMemcpyHostToDevice, stream); - - // Number of vertices in the frontier and number of out-edges from the - // frontier - IndexType mf, nf; - nf = 1; - int iters = 0; - - while (nf > 0) { - // Typical pre-top down workflow. set_frontier_degree + exclusive-scan - traversal::set_frontier_degree(frontier_vertex_degree.data().get(), - frontier.data().get(), - vertex_degree.data().get(), - nf, - stream); - - traversal::exclusive_sum(frontier_vertex_degree.data().get(), - exclusive_sum_frontier_vertex_degree.data().get(), - nf + 1, - stream); - - cudaMemcpyAsync(&mf, - exclusive_sum_frontier_vertex_degree.data().get() + nf, - sizeof(IndexType), - cudaMemcpyDeviceToHost, - stream); - - // We need mf to know the next kernel's launch dims - cudaStreamSynchronize(stream); - - traversal::compute_bucket_offsets(exclusive_sum_frontier_vertex_degree.data().get(), - exclusive_sum_frontier_vertex_buckets_offsets.data().get(), - nf, - mf, - stream); - - // Reset the transient structures to 0 - cudaMemsetAsync(iter_buffer.data(), 0, iter_buffer_size, stream); - - sssp_kernels::frontier_expand(row_offsets, - col_indices, - edge_weights, - frontier.data().get(), - nf, - mf, - new_frontier.data().get(), - d_new_frontier_cnt, - exclusive_sum_frontier_vertex_degree.data().get(), - exclusive_sum_frontier_vertex_buckets_offsets.data().get(), - distances, - next_distances, - predecessors, - edge_mask, - next_frontier_bmap, - relaxed_edges_bmap, - isolated_bmap.data().get(), - stream); - - cudaMemcpyAsync(&nf, d_new_frontier_cnt, sizeof(IndexType), cudaMemcpyDeviceToHost, stream); - - // Copy next_distances to distances - cudaMemcpyAsync( - distances, next_distances, n * sizeof(DistType), cudaMemcpyDeviceToDevice, stream); - - // We need nf for the loop - RAFT_CUDA_TRY(cudaStreamSynchronize(stream)); - - // Swap frontiers - // IndexType *tmp = frontier; - // frontier = new_frontier; - // new_frontier = tmp; - new_frontier.swap(frontier); - iters++; - - if (iters > n) { - // Bail out. Got a graph with a negative cycle - CUGRAPH_FAIL("ERROR: Max iterations exceeded. Check the graph for negative weight cycles"); - } - } -} - -template -void SSSP::clean() -{ -} - -} // namespace detail - -/** - * ---------------------------------------------------------------------------* - * @brief Native sssp with predecessors - * - * @file sssp.cu - * --------------------------------------------------------------------------*/ -template -void sssp(legacy::GraphCSRView const& graph, - WT* distances, - VT* predecessors, - const VT source_vertex) -{ - CUGRAPH_EXPECTS(distances || predecessors, "Invalid input argument, both outputs are nullptr"); - - if (typeid(VT) != typeid(int)) CUGRAPH_FAIL("Unsupported vertex id data type, please use int"); - if (typeid(ET) != typeid(int)) CUGRAPH_FAIL("Unsupported edge id data type, please use int"); - if (typeid(WT) != typeid(float) && typeid(WT) != typeid(double)) - CUGRAPH_FAIL("Unsupported weight data type, please use float or double"); - - int num_vertices = graph.number_of_vertices; - int num_edges = graph.number_of_edges; - - const ET* offsets_ptr = graph.offsets; - const VT* indices_ptr = graph.indices; - const WT* edge_weights_ptr = nullptr; - - // Both if / else branch operate own calls due to - // thrust::device_vector lifetime - if (!graph.edge_data) { - // Generate unit weights - - // FIXME: This should fallback to BFS, but for now it'll go through the - // SSSP path since BFS needs the directed flag, which should not be - // necessary for the SSSP API. We can pass directed to the BFS call, but - // BFS also does only integer distances right now whereas we need float or - // double - - rmm::device_vector d_edge_weights(num_edges, static_cast(1)); - edge_weights_ptr = thrust::raw_pointer_cast(&d_edge_weights.front()); - cugraph::detail::SSSP sssp( - num_vertices, num_edges, offsets_ptr, indices_ptr, edge_weights_ptr); - sssp.configure(distances, predecessors, nullptr); - sssp.traverse(source_vertex); - } else { - // SSSP is not defined for graphs with negative weight cycles - // Warn user about any negative edges - if (graph.prop.has_negative_edges == legacy::PropType::PROP_TRUE) - std::cerr << "WARN: The graph has negative weight edges. SSSP will not " - "converge if the graph has negative weight cycles\n"; - edge_weights_ptr = graph.edge_data; - cugraph::detail::SSSP sssp( - num_vertices, num_edges, offsets_ptr, indices_ptr, edge_weights_ptr); - sssp.configure(distances, predecessors, nullptr); - sssp.traverse(source_vertex); - } -} - -// explicit instantiation -template void sssp(legacy::GraphCSRView const& graph, - float* distances, - int* predecessors, - const int source_vertex); -template void sssp(legacy::GraphCSRView const& graph, - double* distances, - int* predecessors, - const int source_vertex); - -} // namespace cugraph diff --git a/cpp/src/traversal/legacy/sssp.cuh b/cpp/src/traversal/legacy/sssp.cuh deleted file mode 100644 index c14f1f33708..00000000000 --- a/cpp/src/traversal/legacy/sssp.cuh +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Author: Prasun Gera pgera@nvidia.com - -#pragma once -#include - -namespace cugraph { -namespace detail { -template -class SSSP { - private: - IndexType n, nnz; - const IndexType* row_offsets; - const IndexType* col_indices; - const DistType* edge_weights; - - // edgemask, distances, predecessors are set/read by users - using Vectors - bool useEdgeMask; - bool computeDistances; - bool computePredecessors; - DistType* distances; - DistType* next_distances; - rmm::device_vector distances_vals; - rmm::device_vector next_distances_vals; - IndexType* predecessors; - int* edge_mask; - - // Working data - IndexType nisolated; - rmm::device_vector frontier, new_frontier; - IndexType vertices_bmap_size, edges_bmap_size; - int *relaxed_edges_bmap, *next_frontier_bmap; - rmm::device_vector isolated_bmap; - rmm::device_vector vertex_degree; - rmm::device_buffer iter_buffer; - size_t iter_buffer_size; - rmm::device_vector frontier_vertex_degree; - rmm::device_vector exclusive_sum_frontier_vertex_degree; - rmm::device_vector exclusive_sum_frontier_vertex_buckets_offsets; - IndexType* d_new_frontier_cnt; - - cudaStream_t stream; - - void setup(); - void clean(); - - public: - virtual ~SSSP(void) { clean(); } - - SSSP(IndexType _n, - IndexType _nnz, - const IndexType* _row_offsets, - const IndexType* _col_indices, - const DistType* _edge_weights, - cudaStream_t _stream = 0) - : n(_n), - nnz(_nnz), - row_offsets(_row_offsets), - edge_weights(_edge_weights), - col_indices(_col_indices), - stream(_stream) - { - setup(); - } - - void configure(DistType* distances, IndexType* predecessors, int* edge_mask); - void traverse(IndexType source_vertex); -}; -} // namespace detail -} // namespace cugraph diff --git a/cpp/src/traversal/legacy/sssp_kernels.cuh b/cpp/src/traversal/legacy/sssp_kernels.cuh deleted file mode 100644 index 86f6b4a88c4..00000000000 --- a/cpp/src/traversal/legacy/sssp_kernels.cuh +++ /dev/null @@ -1,554 +0,0 @@ -/* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Author: Prasun Gera pgera@nvidia.com - -#include - -#include "traversal_common.cuh" -#include -#include -namespace cugraph { -namespace detail { -namespace sssp_kernels { - -// This is the second pass after relax_edges that sets the active frontier -// nodes and predecessors -template -__global__ void populate_frontier_and_preds( - const IndexType* row_ptr, - const IndexType* col_ind, - const DistType* edge_weights, - const IndexType* frontier, - const IndexType frontier_size, - const IndexType totaldegree, - const IndexType max_items_per_thread, - IndexType* new_frontier, - IndexType* new_frontier_cnt, - const IndexType* frontier_degrees_exclusive_sum, - const IndexType* frontier_degrees_exclusive_sum_buckets_offsets, - int* next_frontier_bmap, - const int* relaxed_edges_bmap, - const int* isolated_bmap, - DistType* distances, - DistType* next_distances, - IndexType* predecessors, - const int* edge_mask) -{ - // BlockScan - typedef cub::BlockScan BlockScan; - __shared__ typename BlockScan::TempStorage scan_storage; - - // We will do a scan to know where to write in frontier - // This will contain the common offset of the block - __shared__ IndexType frontier_common_block_offset; - - __shared__ IndexType shared_buckets_offsets[TOP_DOWN_EXPAND_DIMX - NBUCKETS_PER_BLOCK + 1]; - __shared__ IndexType shared_frontier_degrees_exclusive_sum[TOP_DOWN_EXPAND_DIMX + 1]; - - IndexType block_offset = (blockDim.x * blockIdx.x) * max_items_per_thread; - IndexType n_items_per_thread_left = - (totaldegree - block_offset + TOP_DOWN_EXPAND_DIMX - 1) / TOP_DOWN_EXPAND_DIMX; - - n_items_per_thread_left = min(max_items_per_thread, n_items_per_thread_left); - - for (; (n_items_per_thread_left > 0) && (block_offset < totaldegree); - - block_offset += MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD * blockDim.x, - n_items_per_thread_left -= MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD) { - // In this loop, we will process batch_set_size batches - IndexType nitems_per_thread = - min(n_items_per_thread_left, (IndexType)MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD); - - // Loading buckets offset (see compute_bucket_offsets_kernel) - - if (threadIdx.x < (nitems_per_thread * NBUCKETS_PER_BLOCK + 1)) - shared_buckets_offsets[threadIdx.x] = - frontier_degrees_exclusive_sum_buckets_offsets[block_offset / TOP_DOWN_BUCKET_SIZE + - threadIdx.x]; - - // We will use shared_buckets_offsets - __syncthreads(); - - // - // shared_buckets_offsets gives us a range of the possible indexes - // for edge of linear_threadx, we are looking for the value k such as - // k is the max value such as frontier_degrees_exclusive_sum[k] <= - // linear_threadx - // - // we have 0 <= k < frontier_size - // but we also have : - // - // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE] - // <= k - // <= - // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE - // + 1] - // - // To find the exact value in that range, we need a few values from - // frontier_degrees_exclusive_sum (see below) - // We will load them here - // We will load as much as we can - if it doesn't fit we will make multiple - // iteration of the next loop - // Because all vertices in frontier have degree > 0, we know it will fits - // if left + 1 = right (see below) - - // We're going to load values in frontier_degrees_exclusive_sum for batch - // [left; right[ - // If it doesn't fit, --right until it does, then loop - // It is excepted to fit on the first try, that's why we start right = - // nitems_per_thread - - IndexType left = 0; - IndexType right = nitems_per_thread; - - while (left < nitems_per_thread) { - // - // Values that are necessary to compute the local binary searches - // We only need those with indexes between extremes indexes of - // buckets_offsets - // We need the next val for the binary search, hence the +1 - // - - IndexType nvalues_to_load = shared_buckets_offsets[right * NBUCKETS_PER_BLOCK] - - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK] + 1; - - // If left = right + 1 we are sure to have nvalues_to_load < - // TOP_DOWN_EXPAND_DIMX+1 - while (nvalues_to_load > (TOP_DOWN_EXPAND_DIMX + 1)) { - --right; - - nvalues_to_load = shared_buckets_offsets[right * NBUCKETS_PER_BLOCK] - - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK] + 1; - } - - IndexType nitems_per_thread_for_this_load = right - left; - - IndexType frontier_degrees_exclusive_sum_block_offset = - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK]; - - if (threadIdx.x < nvalues_to_load) { - shared_frontier_degrees_exclusive_sum[threadIdx.x] = - frontier_degrees_exclusive_sum[frontier_degrees_exclusive_sum_block_offset + threadIdx.x]; - } - - if (nvalues_to_load == (TOP_DOWN_EXPAND_DIMX + 1) && threadIdx.x == 0) { - shared_frontier_degrees_exclusive_sum[TOP_DOWN_EXPAND_DIMX] = - frontier_degrees_exclusive_sum[frontier_degrees_exclusive_sum_block_offset + - TOP_DOWN_EXPAND_DIMX]; - } - - // shared_frontier_degrees_exclusive_sum is in shared mem, we will use - // it, sync - __syncthreads(); - - // Now we will process the edges - // Here each thread will process nitems_per_thread_for_this_load - for (IndexType item_index = 0; item_index < nitems_per_thread_for_this_load; - item_index += TOP_DOWN_BATCH_SIZE) { - // We process TOP_DOWN_BATCH_SIZE edge in parallel (instruction - // parallism) - // Reduces latency - - IndexType current_max_edge_index = - min(block_offset + (left + nitems_per_thread_for_this_load) * blockDim.x, totaldegree); - - IndexType naccepted_vertices = 0; - IndexType vec_frontier_candidate[TOP_DOWN_BATCH_SIZE]; - -#pragma unroll - for (IndexType iv = 0; iv < TOP_DOWN_BATCH_SIZE; ++iv) { - IndexType ibatch = left + item_index + iv; - IndexType gid = block_offset + ibatch * blockDim.x + threadIdx.x; - vec_frontier_candidate[iv] = -1; - - if (gid < current_max_edge_index) { - IndexType start_off_idx = (ibatch * blockDim.x + threadIdx.x) / TOP_DOWN_BUCKET_SIZE; - IndexType bucket_start = - shared_buckets_offsets[start_off_idx] - frontier_degrees_exclusive_sum_block_offset; - IndexType bucket_end = shared_buckets_offsets[start_off_idx + 1] - - frontier_degrees_exclusive_sum_block_offset; - - IndexType k = traversal::binsearch_maxle( - shared_frontier_degrees_exclusive_sum, gid, bucket_start, bucket_end) + - frontier_degrees_exclusive_sum_block_offset; - - IndexType src_id = frontier[k]; // origin of this edge - IndexType edge = row_ptr[src_id] + gid - frontier_degrees_exclusive_sum[k]; - - bool was_edge_relaxed = relaxed_edges_bmap[gid / INT_SIZE] & (1 << (gid % INT_SIZE)); - // Check if this edge was relaxed in relax_edges earlier - if (was_edge_relaxed) { - IndexType dst_id = col_ind[edge]; - DistType dst_val = next_distances[dst_id]; - DistType expected_val = distances[src_id] + edge_weights[edge]; - - if (expected_val == dst_val) { - // Our relaxation was the last one (not necessarily unique) - // Try to become the parent in the SSSP tree atomically to - // break potential ties - // Set bit in next_frontier_bmap to 1 and check for old value - // to check for success - - int old_val = - atomicOr(&next_frontier_bmap[dst_id / INT_SIZE], 1 << (dst_id % INT_SIZE)); - - bool fail = (old_val >> (dst_id % INT_SIZE)) & 1; - - if (!fail) { - // Add dst_id to frontier if dst is not isolated - // (Can't have zero degree verts in frontier for the - // bucket/prefix-sum logic to work) - bool is_isolated = (isolated_bmap[dst_id / INT_SIZE] >> (dst_id % INT_SIZE)) & 1; - - if (!is_isolated) { - vec_frontier_candidate[iv] = dst_id; - ++naccepted_vertices; - } - - // Add src_id to predecessor in either case if needed - if (predecessors) { predecessors[dst_id] = src_id; } - } - // else lost the tie - } - // else somebody else relaxed it to a lower value after us in the - // previous kernel - } - } - } - - // We need to have all nfrontier_candidates to be ready before doing - // the scan - __syncthreads(); - - // Computing block offsets - IndexType thread_new_frontier_offset = 0; // offset inside block - BlockScan(scan_storage).ExclusiveSum(naccepted_vertices, thread_new_frontier_offset); - - if (threadIdx.x == (TOP_DOWN_EXPAND_DIMX - 1)) { - IndexType inclusive_sum = thread_new_frontier_offset + naccepted_vertices; - // for this thread, thread_new_frontier_offset + has_successor - // (exclusive sum) - if (inclusive_sum) - frontier_common_block_offset = atomicAdd(new_frontier_cnt, inclusive_sum); - } - - // Broadcasting frontier_common_block_offset - __syncthreads(); - - // Write to global memory - for (int iv = 0; iv < TOP_DOWN_BATCH_SIZE; ++iv) { - IndexType frontier_candidate = vec_frontier_candidate[iv]; - - if (frontier_candidate != -1) { - IndexType off = frontier_common_block_offset + thread_new_frontier_offset++; - new_frontier[off] = frontier_candidate; - } - } - } - - // We need to keep shared_frontier_degrees_exclusive_sum coherent - __syncthreads(); - - // Preparing for next load - left = right; - right = nitems_per_thread; - } - - // we need to keep shared_buckets_offsets coherent - __syncthreads(); - } -} - -template -__global__ void relax_edges(const IndexType* row_ptr, - const IndexType* col_ind, - const DistType* edge_weights, - const IndexType* frontier, - const IndexType frontier_size, - const IndexType totaldegree, - const IndexType max_items_per_thread, - const IndexType* frontier_degrees_exclusive_sum, - const IndexType* frontier_degrees_exclusive_sum_buckets_offsets, - int* relaxed_edges_bmap, - DistType* distances, - DistType* next_distances, - const int* edge_mask) -{ - __shared__ IndexType shared_buckets_offsets[TOP_DOWN_EXPAND_DIMX - NBUCKETS_PER_BLOCK + 1]; - __shared__ IndexType shared_frontier_degrees_exclusive_sum[TOP_DOWN_EXPAND_DIMX + 1]; - - IndexType block_offset = (blockDim.x * blockIdx.x) * max_items_per_thread; - IndexType n_items_per_thread_left = - (totaldegree - block_offset + TOP_DOWN_EXPAND_DIMX - 1) / TOP_DOWN_EXPAND_DIMX; - - n_items_per_thread_left = min(max_items_per_thread, n_items_per_thread_left); - - for (; (n_items_per_thread_left > 0) && (block_offset < totaldegree); - - block_offset += MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD * blockDim.x, - n_items_per_thread_left -= MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD) { - // In this loop, we will process batch_set_size batches - IndexType nitems_per_thread = - min(n_items_per_thread_left, (IndexType)MAX_ITEMS_PER_THREAD_PER_OFFSETS_LOAD); - - // Loading buckets offset (see compute_bucket_offsets_kernel) - - if (threadIdx.x < (nitems_per_thread * NBUCKETS_PER_BLOCK + 1)) - shared_buckets_offsets[threadIdx.x] = - frontier_degrees_exclusive_sum_buckets_offsets[block_offset / TOP_DOWN_BUCKET_SIZE + - threadIdx.x]; - - // We will use shared_buckets_offsets - __syncthreads(); - - // - // shared_buckets_offsets gives us a range of the possible indexes - // for edge of linear_threadx, we are looking for the value k such as - // k is the max value such as frontier_degrees_exclusive_sum[k] <= - // linear_threadx - // - // we have 0 <= k < frontier_size - // but we also have : - // - // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE] - // <= k - // <= - // frontier_degrees_exclusive_sum_buckets_offsets[linear_threadx/TOP_DOWN_BUCKET_SIZE - // + 1] - // - // To find the exact value in that range, we need a few values from - // frontier_degrees_exclusive_sum (see below) - // We will load them here - // We will load as much as we can - if it doesn't fit we will make multiple - // iteration of the next loop - // Because all vertices in frontier have degree > 0, we know it will fits - // if left + 1 = right (see below) - - // We're going to load values in frontier_degrees_exclusive_sum for batch - // [left; right[ - // If it doesn't fit, --right until it does, then loop - // It is excepted to fit on the first try, that's why we start right = - // nitems_per_thread - - IndexType left = 0; - IndexType right = nitems_per_thread; - - while (left < nitems_per_thread) { - // - // Values that are necessary to compute the local binary searches - // We only need those with indexes between extremes indexes of - // buckets_offsets - // We need the next val for the binary search, hence the +1 - // - - IndexType nvalues_to_load = shared_buckets_offsets[right * NBUCKETS_PER_BLOCK] - - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK] + 1; - - // If left = right + 1 we are sure to have nvalues_to_load < - // TOP_DOWN_EXPAND_DIMX+1 - while (nvalues_to_load > (TOP_DOWN_EXPAND_DIMX + 1)) { - --right; - - nvalues_to_load = shared_buckets_offsets[right * NBUCKETS_PER_BLOCK] - - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK] + 1; - } - - IndexType nitems_per_thread_for_this_load = right - left; - - IndexType frontier_degrees_exclusive_sum_block_offset = - shared_buckets_offsets[left * NBUCKETS_PER_BLOCK]; - - if (threadIdx.x < nvalues_to_load) { - shared_frontier_degrees_exclusive_sum[threadIdx.x] = - frontier_degrees_exclusive_sum[frontier_degrees_exclusive_sum_block_offset + threadIdx.x]; - } - - if (nvalues_to_load == (TOP_DOWN_EXPAND_DIMX + 1) && threadIdx.x == 0) { - shared_frontier_degrees_exclusive_sum[TOP_DOWN_EXPAND_DIMX] = - frontier_degrees_exclusive_sum[frontier_degrees_exclusive_sum_block_offset + - TOP_DOWN_EXPAND_DIMX]; - } - - // shared_frontier_degrees_exclusive_sum is in shared mem, we will use - // it, sync - __syncthreads(); - - // Now we will process the edges - // Here each thread will process nitems_per_thread_for_this_load - for (IndexType item_index = 0; item_index < nitems_per_thread_for_this_load; - item_index += TOP_DOWN_BATCH_SIZE) { - // We process TOP_DOWN_BATCH_SIZE edge in parallel (instruction - // parallism) - // Reduces latency - - IndexType current_max_edge_index = - min(block_offset + (left + nitems_per_thread_for_this_load) * blockDim.x, totaldegree); - -#pragma unroll - for (IndexType iv = 0; iv < TOP_DOWN_BATCH_SIZE; ++iv) { - IndexType ibatch = left + item_index + iv; - IndexType gid = block_offset + ibatch * blockDim.x + threadIdx.x; - - if (gid < current_max_edge_index) { - IndexType start_off_idx = (ibatch * blockDim.x + threadIdx.x) / TOP_DOWN_BUCKET_SIZE; - IndexType bucket_start = - shared_buckets_offsets[start_off_idx] - frontier_degrees_exclusive_sum_block_offset; - IndexType bucket_end = shared_buckets_offsets[start_off_idx + 1] - - frontier_degrees_exclusive_sum_block_offset; - - IndexType k = traversal::binsearch_maxle( - shared_frontier_degrees_exclusive_sum, gid, bucket_start, bucket_end) + - frontier_degrees_exclusive_sum_block_offset; - - IndexType src_id = frontier[k]; - IndexType edge = row_ptr[frontier[k]] + gid - frontier_degrees_exclusive_sum[k]; - IndexType dst_id = col_ind[edge]; - - // Try to relax non-masked edges - if (!edge_mask || edge_mask[edge]) { - DistType* update_addr = &next_distances[dst_id]; - DistType old_val = distances[dst_id]; - DistType new_val = distances[src_id] + edge_weights[edge]; - if (new_val < old_val) { - // This edge can be relaxed - - // OPTION1 - // Add it to local candidates. Create shared candidates and - // then call atomic on the candidates queue - // More storage and work, but may have better performance since - // the atomics will be packed in contiguous lanes - // Not pursued - - // OPTION2 - // Try to relax with atomicmin directly. Easier, but may have - // worse performance - old_val = traversal::atomicMin(update_addr, new_val); - - if (old_val > new_val) { - // OPTION1: - // Add to frontier candidates - // Increment thread_frontier_count - // We'll sort/reduce and remove dupes in a different kernel - // Needs extra O(E) storage - // Not pursued - - // OPTION2: - // Mark the bits in the edge bitmap. Still needs O(E) bitmap, - // but smaller constant - // We'll do a second pass for frontier and preds - - int m = 1 << (gid % INT_SIZE); - atomicOr(&relaxed_edges_bmap[gid / INT_SIZE], m); - - // OPTION3: - // Mark the ends (i.e., src and dst) in two bitmap. O(V) - // bitmap overhead, but more memory accesses in the second - // pass due to indexing col_ind - } - // else somebody else relaxed the dst distance to a lower value - } - } - } - } - } - - // We need to keep shared_frontier_degrees_exclusive_sum coherent - __syncthreads(); - - // Preparing for next load - left = right; - right = nitems_per_thread; - } - - // we need to keep shared_buckets_offsets coherent - __syncthreads(); - } -} - -template -void frontier_expand(const IndexType* row_ptr, - const IndexType* col_ind, - const DistType* edge_weights, - const IndexType* frontier, - const IndexType frontier_size, - const IndexType totaldegree, - IndexType* new_frontier, - IndexType* new_frontier_cnt, - const IndexType* frontier_degrees_exclusive_sum, - const IndexType* frontier_degrees_exclusive_sum_buckets_offsets, - DistType* distances, - DistType* next_distances, - IndexType* predecessors, - const int* edge_mask, - int* next_frontier_bmap, - int* relaxed_edges_bmap, - const int* isolated_bmap, - cudaStream_t m_stream) -{ - if (!totaldegree) return; - - dim3 block; - block.x = TOP_DOWN_EXPAND_DIMX; - - IndexType max_items_per_thread = (totaldegree + MAXBLOCKS * block.x - 1) / (MAXBLOCKS * block.x); - - dim3 grid; - grid.x = - min((totaldegree + max_items_per_thread * block.x - 1) / (max_items_per_thread * block.x), - (IndexType)MAXBLOCKS); - - // Relax edges going out from the current frontier - relax_edges<<>>(row_ptr, - col_ind, - edge_weights, - frontier, - frontier_size, - totaldegree, - max_items_per_thread, - frontier_degrees_exclusive_sum, - frontier_degrees_exclusive_sum_buckets_offsets, - relaxed_edges_bmap, - distances, - next_distances, - edge_mask); - - // Revisit relaxed edges and update the next frontier and preds - populate_frontier_and_preds<<>>( - row_ptr, - col_ind, - edge_weights, - frontier, - frontier_size, - totaldegree, - max_items_per_thread, - new_frontier, - new_frontier_cnt, - frontier_degrees_exclusive_sum, - frontier_degrees_exclusive_sum_buckets_offsets, - next_frontier_bmap, - relaxed_edges_bmap, - isolated_bmap, - distances, - next_distances, - predecessors, - edge_mask); - - RAFT_CHECK_CUDA(m_stream); -} -} // namespace sssp_kernels -} // namespace detail -} // namespace cugraph diff --git a/cpp/src/utilities/cython.cu b/cpp/src/utilities/cython.cu index 17d060c1900..d24d38039b5 100644 --- a/cpp/src/utilities/cython.cu +++ b/cpp/src/utilities/cython.cu @@ -676,69 +676,6 @@ void call_pagerank(raft::handle_t const& handle, } } -// Wrapper for calling BFS through a graph container -template -void call_bfs(raft::handle_t const& handle, - graph_container_t const& graph_container, - vertex_t* identifiers, - vertex_t* distances, - vertex_t* predecessors, - vertex_t depth_limit, - vertex_t* sources, - size_t n_sources, - bool direction_optimizing) -{ - if (graph_container.is_multi_gpu) { - if (graph_container.edgeType == numberTypeEnum::int32Type) { - auto graph = - detail::create_graph(handle, graph_container); - cugraph::bfs(handle, - graph->view(), - reinterpret_cast(distances), - reinterpret_cast(predecessors), - reinterpret_cast(sources), - static_cast(n_sources), - direction_optimizing, - static_cast(depth_limit)); - } else if (graph_container.edgeType == numberTypeEnum::int64Type) { - auto graph = - detail::create_graph(handle, graph_container); - cugraph::bfs(handle, - graph->view(), - reinterpret_cast(distances), - reinterpret_cast(predecessors), - reinterpret_cast(sources), - static_cast(n_sources), - direction_optimizing, - static_cast(depth_limit)); - } - } else { - if (graph_container.edgeType == numberTypeEnum::int32Type) { - auto graph = - detail::create_graph(handle, graph_container); - cugraph::bfs(handle, - graph->view(), - reinterpret_cast(distances), - reinterpret_cast(predecessors), - reinterpret_cast(sources), - static_cast(n_sources), - direction_optimizing, - static_cast(depth_limit)); - } else if (graph_container.edgeType == numberTypeEnum::int64Type) { - auto graph = - detail::create_graph(handle, graph_container); - cugraph::bfs(handle, - graph->view(), - reinterpret_cast(distances), - reinterpret_cast(predecessors), - reinterpret_cast(sources), - static_cast(n_sources), - direction_optimizing, - static_cast(depth_limit)); - } - } -} - // Wrapper for calling extract_egonet through a graph container // FIXME : this should not be a legacy COO and it is not clear how to handle C++ api return type as // is.graph_container Need to figure out how to return edge lists @@ -992,84 +929,6 @@ void call_wcc(raft::handle_t const& handle, } } -// wrapper for HITS: -// -template -void call_hits(raft::handle_t const& handle, - graph_container_t const& graph_container, - weight_t* hubs, - weight_t* authorities, - size_t max_iter, - weight_t tolerance, - const weight_t* starting_value, - bool normalized) -{ - constexpr bool has_initial_hubs_guess{false}; - constexpr bool normalize{true}; - constexpr bool do_expensive_check{false}; - constexpr bool transposed{true}; - - // FIXME: most of these branches are not currently executed: MG support is not - // yet in the python API, and only int32_t edge types are being used. Consider - // removing these until actually needed. - - if (graph_container.is_multi_gpu) { - constexpr bool multi_gpu{true}; - if (graph_container.edgeType == numberTypeEnum::int32Type) { - auto graph = detail::create_graph( - handle, graph_container); - cugraph::hits(handle, - graph->view(), - reinterpret_cast(hubs), - reinterpret_cast(authorities), - tolerance, - max_iter, - has_initial_hubs_guess, - normalize, - do_expensive_check); - } else if (graph_container.edgeType == numberTypeEnum::int64Type) { - auto graph = detail::create_graph( - handle, graph_container); - cugraph::hits(handle, - graph->view(), - reinterpret_cast(hubs), - reinterpret_cast(authorities), - tolerance, - max_iter, - has_initial_hubs_guess, - normalize, - do_expensive_check); - } - } else { - constexpr bool multi_gpu{false}; - if (graph_container.edgeType == numberTypeEnum::int32Type) { - auto graph = detail::create_graph( - handle, graph_container); - cugraph::hits(handle, - graph->view(), - reinterpret_cast(hubs), - reinterpret_cast(authorities), - tolerance, - max_iter, - has_initial_hubs_guess, - normalize, - do_expensive_check); - } else if (graph_container.edgeType == numberTypeEnum::int64Type) { - auto graph = detail::create_graph( - handle, graph_container); - cugraph::hits(handle, - graph->view(), - reinterpret_cast(hubs), - reinterpret_cast(authorities), - tolerance, - max_iter, - has_initial_hubs_guess, - normalize, - do_expensive_check); - } - } -} - // wrapper for shuffling: // template @@ -1298,46 +1157,6 @@ template void call_pagerank(raft::handle_t const& handle, int64_t max_iter, bool has_guess); -template void call_bfs(raft::handle_t const& handle, - graph_container_t const& graph_container, - int32_t* identifiers, - int32_t* distances, - int32_t* predecessors, - int32_t depth_limit, - int32_t* sources, - size_t n_sources, - bool direction_optimizing); - -template void call_bfs(raft::handle_t const& handle, - graph_container_t const& graph_container, - int32_t* identifiers, - int32_t* distances, - int32_t* predecessors, - int32_t depth_limit, - int32_t* sources, - size_t n_sources, - bool direction_optimizing); - -template void call_bfs(raft::handle_t const& handle, - graph_container_t const& graph_container, - int64_t* identifiers, - int64_t* distances, - int64_t* predecessors, - int64_t depth_limit, - int64_t* sources, - size_t n_sources, - bool direction_optimizing); - -template void call_bfs(raft::handle_t const& handle, - graph_container_t const& graph_container, - int64_t* identifiers, - int64_t* distances, - int64_t* predecessors, - int64_t depth_limit, - int64_t* sources, - size_t n_sources, - bool direction_optimizing); - template std::unique_ptr call_egonet( raft::handle_t const& handle, graph_container_t const& graph_container, @@ -1421,42 +1240,6 @@ template void call_wcc(raft::handle_t const& handle, graph_container_t const& graph_container, int64_t* components); -template void call_hits(raft::handle_t const& handle, - graph_container_t const& graph_container, - float* hubs, - float* authorities, - size_t max_iter, - float tolerance, - const float* starting_value, - bool normalized); - -template void call_hits(raft::handle_t const& handle, - graph_container_t const& graph_container, - double* hubs, - double* authorities, - size_t max_iter, - double tolerance, - const double* starting_value, - bool normalized); - -template void call_hits(raft::handle_t const& handle, - graph_container_t const& graph_container, - float* hubs, - float* authorities, - size_t max_iter, - float tolerance, - const float* starting_value, - bool normalized); - -template void call_hits(raft::handle_t const& handle, - graph_container_t const& graph_container, - double* hubs, - double* authorities, - size_t max_iter, - double tolerance, - const double* starting_value, - bool normalized); - template std::unique_ptr> call_shuffle( raft::handle_t const& handle, int32_t* edgelist_major_vertices,