Skip to content

Commit

Permalink
DBSCAN: Speed up adjgraph step
Browse files Browse the repository at this point in the history
For large data sizes, the batch size of the DBSCAN algorithm is small in
order to fit the distance matrix in memory.

This results in a matrix that has dimensions num_points x batch_size,
both for the distance and adjacency matrix.

The conversion of the boolean adjacency matrix to CSR format is
performed in the 'adjgraph' step. This step was slow when the batch size
was small, as described in issue rapidsai#2387.

In this commit, the adjgraph step is sped up. This is done in two ways:

1. The adjacency matrix is now stored in row-major batch_size x
   num_points format --- it was transposed before. This required changes
   in the vertexdeg step.

2. The csr_row_op kernel has been replaced by the adj_to_csr kernel.
   This kernel can divide the work over multiple blocks even when the
   number of rows (batch size) is small. It makes optimal use of memory
   bandwidth because rows of the matrix are laid out contiguously in
   memory.
  • Loading branch information
ahendriksen committed Jul 8, 2022
1 parent c8aebc3 commit f68e6fa
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 47 deletions.
150 changes: 142 additions & 8 deletions cpp/src/dbscan/adjgraph/algo.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,176 @@

#pragma once

#include <cooperative_groups.h>
#include <thrust/device_ptr.h>
#include <thrust/scan.h>

#include "../common.cuh"
#include "pack.h"

#include <raft/cuda_utils.cuh>
#include <raft/sparse/convert/csr.hpp>
#include <raft/device_atomics.cuh>
#include <raft/handle.hpp>
#include <raft/vectorized.cuh>
#include <rmm/device_uvector.hpp>

namespace ML {
namespace Dbscan {
namespace AdjGraph {
namespace Algo {

static const int TPB_X = 256;
// The following function was adapted from:
// https://developer.nvidia.com/blog/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/
//
// It cooperatively increments an atomic counter using all active threads in a
// warp. The return value is the original value of the counter plus the rank of
// the calling thread.
//
// The use of atomicIncWarp is a performance optimization. It can reduce the
// amount of atomic memory traffic by a factor of 32.
template <typename T = unsigned int>
__device__ T atomicIncWarp(T* ctr)
{
namespace cg = cooperative_groups;
auto g = cg::coalesced_threads();
T warp_res;
if (g.thread_rank() == 0) { warp_res = atomicAdd(ctr, static_cast<T>(g.size())); }
return g.shfl(warp_res, 0) + g.thread_rank();
}

/**
* Takes vertex degree array (vd) and CSR row_ind array (ex_scan) to produce the
* CSR row_ind_ptr array (adj_graph)
* @brief Convert a boolean adjacency matrix into CSR format.
*
* The adj_to_csr kernel converts a boolean adjacency matrix into CSR format.
* High performance comes at the cost of non-deterministic output: the column
* indices are not guaranteed to be stored in order.
*
* The kernel has been optimized to handle matrices that are non-square, for
* instance subsets of a full adjacency matrix. In practice, these matrices can
* be very wide and not very tall. In principle, each row is assigned to one
* block. If there are more SMs than rows, multiple blocks operate on a single
* row. To enable cooperation between these blocks, each row is provided a
* counter where the current output index can be cooperatively (atomically)
* incremented. As a result, the order of the output indices is not guaranteed
* to be in order.
*
* @param[in] adj: a num_rows x num_cols boolean matrix in contiguous row-major
* format.
*
* @param[in] row_ind: an array of length num_rows that indicates at which index
* a row starts in out_col_ind. Equivalently, it is the
* exclusive scan of the number of non-zeros in each row of
* `adj`.
*
* @param[in] num_rows: number of rows of adj.
* @param[in] num_cols: number of columns of adj.
*
* @param[in,out] row_counters: a temporary zero-initialized array of length num_rows.
*
* @param[out] out_col_ind: an array containing the column indices of the
* non-zero values in `adj`. Size should be at least
* the number of non-zeros in `adj`.
*/
template <typename index_t>
__global__ void adj_to_csr(const bool* adj, // row-major adjacency matrix
const index_t* row_ind, // precomputed row indices
index_t num_rows, // # rows of adj
index_t num_cols, // # cols of adj
index_t* row_counters, // pre-allocated (zeroed) atomic counters
index_t* out_col_ind // output column indices
)
{
typedef raft::TxN_t<bool, 16> bool16;

for (index_t i = blockIdx.y; i < num_rows; i += gridDim.y) {
// Load row information
index_t row_base = row_ind[i];
index_t* row_count = row_counters + i;
const bool* row = adj + i * num_cols;

// Peeling: process the first j0 elements that are not aligned to a 16-byte
// boundary.
index_t j0 = (16 - (((uintptr_t)(const void*)row) % 16)) % 16;
j0 = min(j0, num_cols);
if (threadIdx.x < j0 && blockIdx.x == 0) {
if (row[threadIdx.x]) { out_col_ind[row_base + atomicIncWarp(row_count)] = threadIdx.x; }
}

// Process the rest of the row in 16 byte chunks starting at j0.
// This is a grid-stride loop.
index_t j = j0 + 16 * (blockIdx.x * blockDim.x + threadIdx.x);
for (; j + 15 < num_cols; j += 16 * (blockDim.x * gridDim.x)) {
bool16 chunk;
chunk.load(row, j);

for (int k = 0; k < 16; ++k) {
if (chunk.val.data[k]) { out_col_ind[row_base + atomicIncWarp(row_count)] = j + k; }
}
}

// Remainder: process the last j1 bools in the row individually.
index_t j1 = (num_cols - j0) % 16;
if (threadIdx.x < j1 && blockIdx.x == 0) {
int j = num_cols - j1 + threadIdx.x;
if (row[j]) { out_col_ind[row_base + atomicIncWarp(row_count)] = j; }
}
}
}

/**
* @brief Converts a boolean adjacency matrix into CSR format.
*
* @tparam[Index_]: indexing arithmetic type
* @param[in] handle: raft::handle_t
*
* @param[in,out] data: A struct containing the adjacency matrix, its number of
* columns, and the vertex degrees.
*
* @param[in] batch_size: The number of rows of the adjacency matrix data.adj
* @param row_counters: A pre-allocated temporary buffer on the device.
* Must be able to contain at least `batch_size` elements.
* @param[in] stream: CUDA stream
*/
template <typename Index_ = int>
void launcher(const raft::handle_t& handle,
Pack<Index_> data,
Index_ batch_size,
Index_* row_counters,
cudaStream_t stream)
{
using namespace thrust;
Index_ num_rows = batch_size;
Index_ num_cols = data.N;
bool* adj = data.adj; // batch_size x N row-major adjacency matrix

// Compute the exclusive scan of the vertex degrees
using namespace thrust;
device_ptr<Index_> dev_vd = device_pointer_cast(data.vd);
device_ptr<Index_> dev_ex_scan = device_pointer_cast(data.ex_scan);
thrust::exclusive_scan(handle.get_thrust_policy(), dev_vd, dev_vd + batch_size, dev_ex_scan);

// Zero-fill a temporary vector that can be used by the adj_to_csr kernel to
// keep track of the number of entries added to a row.
RAFT_CUDA_TRY(cudaMemsetAsync(row_counters, 0, batch_size * sizeof(Index_), stream));

exclusive_scan(handle.get_thrust_policy(), dev_vd, dev_vd + batch_size, dev_ex_scan);
// Split the grid in the row direction (since each row can be processed
// independently). If the maximum number of active blocks (num_sms *
// occupancy) exceeds the number of rows, assign multiple blocks to a single
// row.
int threads_per_block = 1024;
int dev_id, sm_count, blocks_per_sm;
cudaGetDevice(&dev_id);
cudaDeviceGetAttribute(&sm_count, cudaDevAttrMultiProcessorCount, dev_id);
cudaOccupancyMaxActiveBlocksPerMultiprocessor(
&blocks_per_sm, adj_to_csr<Index_>, threads_per_block, 0);

raft::sparse::convert::csr_adj_graph_batched<Index_>(
data.ex_scan, data.N, data.adjnnz, batch_size, data.adj, data.adj_graph, stream);
Index_ max_active_blocks = sm_count * blocks_per_sm;
Index_ blocks_per_row = raft::ceildiv(max_active_blocks, num_rows);
Index_ grid_rows = raft::ceildiv(max_active_blocks, blocks_per_row);
dim3 block(threads_per_block, 1);
dim3 grid(blocks_per_row, grid_rows);

adj_to_csr<Index_><<<grid, block, 0, stream>>>(
adj, data.ex_scan, num_rows, num_cols, row_counters, data.adj_graph);
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/dbscan/adjgraph/runner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ void run(const raft::handle_t& handle,
Index_ N,
int algo,
Index_ batch_size,
Index_* row_counters,
cudaStream_t stream)
{
Pack<Index_> data = {vd, adj, adj_graph, adjnnz, ex_scan, N};
switch (algo) {
// TODO: deprecate naive runner. cf #3414
case 0: Naive::launcher<Index_>(handle, data, batch_size, stream); break;
case 1: Algo::launcher<Index_>(handle, data, batch_size, stream); break;
case 1: Algo::launcher<Index_>(handle, data, batch_size, row_counters, stream); break;
default: ASSERT(false, "Incorrect algo passed! '%d'", algo);
}
}
Expand Down
20 changes: 17 additions & 3 deletions cpp/src/dbscan/runner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ std::size_t run(const raft::handle_t& handle,
std::size_t m_size = raft::alignTo<std::size_t>(sizeof(bool), align);
std::size_t vd_size = raft::alignTo<std::size_t>(sizeof(Index_) * (batch_size + 1), align);
std::size_t ex_scan_size = raft::alignTo<std::size_t>(sizeof(Index_) * batch_size, align);
std::size_t row_cnt_size = raft::alignTo<std::size_t>(sizeof(Index_) * batch_size, align);
std::size_t labels_size = raft::alignTo<std::size_t>(sizeof(Index_) * N, align);

Index_ MAX_LABEL = std::numeric_limits<Index_>::max();
Expand All @@ -160,7 +161,8 @@ std::size_t run(const raft::handle_t& handle,
(unsigned long)batch_size);

if (workspace == NULL) {
auto size = adj_size + core_pts_size + m_size + vd_size + ex_scan_size + 2 * labels_size;
auto size =
adj_size + core_pts_size + m_size + vd_size + ex_scan_size + row_cnt_size + 2 * labels_size;
return size;
}

Expand All @@ -179,6 +181,8 @@ std::size_t run(const raft::handle_t& handle,
temp += vd_size;
Index_* ex_scan = (Index_*)temp;
temp += ex_scan_size;
Index_* row_counters = (Index_*)temp;
temp += row_cnt_size;
Index_* labels_temp = (Index_*)temp;
temp += labels_size;
Index_* work_buffer = (Index_*)temp;
Expand Down Expand Up @@ -237,8 +241,18 @@ std::size_t run(const raft::handle_t& handle,
maxadjlen = curradjlen;
adj_graph.resize(maxadjlen, stream);
}
AdjGraph::run<Index_>(
handle, adj, vd, adj_graph.data(), curradjlen, ex_scan, N, algo_adj, n_points, stream);
AdjGraph::run<Index_>(handle,
adj,
vd,
adj_graph.data(),
curradjlen,
ex_scan,
N,
algo_adj,
n_points,
row_counters,
stream);

raft::common::nvtx::pop_range();

CUML_LOG_DEBUG("--> Computing connected components");
Expand Down
40 changes: 32 additions & 8 deletions cpp/src/dbscan/vertexdeg/algo.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

#pragma once

#include "pack.h"
#include <cuda_runtime.h>
#include <math.h>
#include <raft/device_atomics.cuh>
#include <raft/linalg/coalesced_reduction.hpp>
#include <raft/linalg/matrix_vector_op.hpp>
#include <raft/linalg/norm.cuh>
#include <raft/spatial/knn/epsilon_neighborhood.hpp>
#include <rmm/device_uvector.hpp>

#include "pack.h"

namespace ML {
namespace Dbscan {
namespace VertexDeg {
Expand All @@ -41,7 +42,11 @@ void launcher(const raft::handle_t& handle,
cudaStream_t stream,
raft::distance::DistanceType metric)
{
data.resetArray(stream, batch_size + 1);
// The last position of data.vd is the sum of all elements in this array
// (excluding it). Hence, its length is one more than the number of points
// Initialize it to zero.
index_t* d_nnz = data.vd + batch_size;
RAFT_CUDA_TRY(cudaMemsetAsync(d_nnz, 0, sizeof(index_t), stream));

ASSERT(sizeof(index_t) == 4 || sizeof(index_t) == 8, "index_t should be 4 or 8 bytes");

Expand All @@ -50,6 +55,7 @@ void launcher(const raft::handle_t& handle,
index_t k = data.D;
value_t eps2;

// Compute adjacency matrix `adj` using Cosine or L2 metric.
if (metric == raft::distance::DistanceType::CosineExpanded) {
rmm::device_uvector<value_t> rowNorms(m, stream);

Expand Down Expand Up @@ -79,7 +85,7 @@ void launcher(const raft::handle_t& handle,
eps2 = 2 * data.eps;

raft::spatial::knn::epsUnexpL2SqNeighborhood<value_t, index_t>(
data.adj, data.vd, data.x, data.x + start_vertex_id * k, m, n, k, eps2, stream);
data.adj, nullptr, data.x + start_vertex_id * k, data.x, n, m, k, eps2, stream);

/**
* Restoring the input matrix after normalization.
Expand All @@ -94,14 +100,32 @@ void launcher(const raft::handle_t& handle,
true,
[] __device__(value_t mat_in, value_t vec_in) { return mat_in * vec_in; },
stream);
}

else {
} else {
eps2 = data.eps * data.eps;

// 1. The output matrix adj is now an n x m matrix (row-major order)
// 2. Do not compute the vertex degree in epsUnexpL2SqNeighborhood (pass a
// nullptr)
raft::spatial::knn::epsUnexpL2SqNeighborhood<value_t, index_t>(
data.adj, data.vd, data.x, data.x + start_vertex_id * k, m, n, k, eps2, stream);
data.adj, nullptr, data.x + start_vertex_id * k, data.x, n, m, k, eps2, stream);
}

// Reduction of adj to compute the vertex degrees
raft::linalg::coalescedReduction<bool, index_t, index_t>(
data.vd,
data.adj,
data.N,
batch_size,
(index_t)0,
stream,
false,
[] __device__(bool adj_ij, index_t idx) { return static_cast<index_t>(adj_ij); },
raft::Sum<index_t>(),
[d_nnz] __device__(index_t degree) {
atomicAdd(d_nnz, degree);
return degree;
});
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

} // namespace Algo
Expand Down
Loading

0 comments on commit f68e6fa

Please sign in to comment.