Skip to content

Commit

Permalink
Accelerate adjacency matrix to CSR conversion for DBSCAN (#4803)
Browse files Browse the repository at this point in the history
Fixes issue #2387.

For large data sizes, the batch size of the DBSCAN algorithm is small in order to fit the distance matrix in memory.

This results in a matrix that has dimensions num_points x batch_size, both for the distance and adjacency matrix.

The conversion of the boolean adjacency matrix to CSR format is performed in the 'adjgraph' step. This step was slow when the batch size was small, as described in issue #2387.

In this commit, the adjgraph step is sped up. This is done in two ways:

1. The adjacency matrix is now stored in row-major batch_size x num_points format --- it was transposed before. This required changes    in the vertexdeg step.

2. The csr_row_op kernel has been replaced by the adj_to_csr kernel.    This kernel can divide the work over multiple blocks even when the    number of rows (batch size) is small. It makes optimal use of memory    bandwidth because rows of the matrix are laid out contiguously in memory.

Authors:
  - Allard Hendriksen (https://github.com/ahendriksen)
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Corey J. Nolet (https://github.com/cjnolet)
  - Tamas Bela Feher (https://github.com/tfeher)

URL: #4803
  • Loading branch information
Allard Hendriksen authored Jul 22, 2022
1 parent 97941e3 commit 2fad4c0
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 47 deletions.
131 changes: 123 additions & 8 deletions cpp/src/dbscan/adjgraph/algo.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,157 @@

#pragma once

#include <cooperative_groups.h>
#include <thrust/device_ptr.h>
#include <thrust/scan.h>

#include "../common.cuh"
#include "pack.h"

#include <raft/cuda_utils.cuh>
#include <raft/sparse/convert/csr.hpp>
#include <raft/device_atomics.cuh>
#include <raft/handle.hpp>
#include <raft/vectorized.cuh>
#include <rmm/device_uvector.hpp>

namespace ML {
namespace Dbscan {
namespace AdjGraph {
namespace Algo {

static const int TPB_X = 256;
/**
* @brief Convert a boolean adjacency matrix into CSR format.
*
* The adj_to_csr kernel converts a boolean adjacency matrix into CSR format.
* High performance comes at the cost of non-deterministic output: the column
* indices are not guaranteed to be stored in order.
*
* The kernel has been optimized to handle matrices that are non-square, for
* instance subsets of a full adjacency matrix. In practice, these matrices can
* be very wide and not very tall. In principle, each row is assigned to one
* block. If there are more SMs than rows, multiple blocks operate on a single
* row. To enable cooperation between these blocks, each row is provided a
* counter where the current output index can be cooperatively (atomically)
* incremented. As a result, the order of the output indices is not guaranteed
* to be in order.
*
* @param[in] adj: a num_rows x num_cols boolean matrix in contiguous row-major
* format.
*
* @param[in] row_ind: an array of length num_rows that indicates at which index
* a row starts in out_col_ind. Equivalently, it is the
* exclusive scan of the number of non-zeros in each row of
* `adj`.
*
* @param[in] num_rows: number of rows of adj.
* @param[in] num_cols: number of columns of adj.
*
* @param[in,out] row_counters: a temporary zero-initialized array of length num_rows.
*
* @param[out] out_col_ind: an array containing the column indices of the
* non-zero values in `adj`. Size should be at least
* the number of non-zeros in `adj`.
*/
template <typename index_t>
__global__ void adj_to_csr(const bool* adj, // row-major adjacency matrix
const index_t* row_ind, // precomputed row indices
index_t num_rows, // # rows of adj
index_t num_cols, // # cols of adj
index_t* row_counters, // pre-allocated (zeroed) atomic counters
index_t* out_col_ind // output column indices
)
{
typedef raft::TxN_t<bool, 16> bool16;

for (index_t i = blockIdx.y; i < num_rows; i += gridDim.y) {
// Load row information
index_t row_base = row_ind[i];
index_t* row_count = row_counters + i;
const bool* row = adj + i * num_cols;

// Peeling: process the first j0 elements that are not aligned to a 16-byte
// boundary.
index_t j0 = (16 - (((uintptr_t)(const void*)row) % 16)) % 16;
j0 = min(j0, num_cols);
if (threadIdx.x < j0 && blockIdx.x == 0) {
if (row[threadIdx.x]) { out_col_ind[row_base + atomicIncWarp(row_count)] = threadIdx.x; }
}

// Process the rest of the row in 16 byte chunks starting at j0.
// This is a grid-stride loop.
index_t j = j0 + 16 * (blockIdx.x * blockDim.x + threadIdx.x);
for (; j + 15 < num_cols; j += 16 * (blockDim.x * gridDim.x)) {
bool16 chunk;
chunk.load(row, j);

for (int k = 0; k < 16; ++k) {
if (chunk.val.data[k]) { out_col_ind[row_base + atomicIncWarp(row_count)] = j + k; }
}
}

// Remainder: process the last j1 bools in the row individually.
index_t j1 = (num_cols - j0) % 16;
if (threadIdx.x < j1 && blockIdx.x == 0) {
int j = num_cols - j1 + threadIdx.x;
if (row[j]) { out_col_ind[row_base + atomicIncWarp(row_count)] = j; }
}
}
}

/**
* Takes vertex degree array (vd) and CSR row_ind array (ex_scan) to produce the
* CSR row_ind_ptr array (adj_graph)
* @brief Converts a boolean adjacency matrix into CSR format.
*
* @tparam[Index_]: indexing arithmetic type
* @param[in] handle: raft::handle_t
*
* @param[in,out] data: A struct containing the adjacency matrix, its number of
* columns, and the vertex degrees.
*
* @param[in] batch_size: The number of rows of the adjacency matrix data.adj
* @param row_counters: A pre-allocated temporary buffer on the device.
* Must be able to contain at least `batch_size` elements.
* @param[in] stream: CUDA stream
*/
template <typename Index_ = int>
void launcher(const raft::handle_t& handle,
Pack<Index_> data,
Index_ batch_size,
Index_* row_counters,
cudaStream_t stream)
{
using namespace thrust;
Index_ num_rows = batch_size;
Index_ num_cols = data.N;
bool* adj = data.adj; // batch_size x N row-major adjacency matrix

// Compute the exclusive scan of the vertex degrees
using namespace thrust;
device_ptr<Index_> dev_vd = device_pointer_cast(data.vd);
device_ptr<Index_> dev_ex_scan = device_pointer_cast(data.ex_scan);
thrust::exclusive_scan(handle.get_thrust_policy(), dev_vd, dev_vd + batch_size, dev_ex_scan);

// Zero-fill a temporary vector that can be used by the adj_to_csr kernel to
// keep track of the number of entries added to a row.
RAFT_CUDA_TRY(cudaMemsetAsync(row_counters, 0, batch_size * sizeof(Index_), stream));

exclusive_scan(handle.get_thrust_policy(), dev_vd, dev_vd + batch_size, dev_ex_scan);
// Split the grid in the row direction (since each row can be processed
// independently). If the maximum number of active blocks (num_sms *
// occupancy) exceeds the number of rows, assign multiple blocks to a single
// row.
int threads_per_block = 1024;
int dev_id, sm_count, blocks_per_sm;
cudaGetDevice(&dev_id);
cudaDeviceGetAttribute(&sm_count, cudaDevAttrMultiProcessorCount, dev_id);
cudaOccupancyMaxActiveBlocksPerMultiprocessor(
&blocks_per_sm, adj_to_csr<Index_>, threads_per_block, 0);

raft::sparse::convert::csr_adj_graph_batched<Index_>(
data.ex_scan, data.N, data.adjnnz, batch_size, data.adj, data.adj_graph, stream);
Index_ max_active_blocks = sm_count * blocks_per_sm;
Index_ blocks_per_row = raft::ceildiv(max_active_blocks, num_rows);
Index_ grid_rows = raft::ceildiv(max_active_blocks, blocks_per_row);
dim3 block(threads_per_block, 1);
dim3 grid(blocks_per_row, grid_rows);

adj_to_csr<Index_><<<grid, block, 0, stream>>>(
adj, data.ex_scan, num_rows, num_cols, row_counters, data.adj_graph);
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/dbscan/adjgraph/runner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ void run(const raft::handle_t& handle,
Index_ N,
int algo,
Index_ batch_size,
Index_* row_counters,
cudaStream_t stream)
{
Pack<Index_> data = {vd, adj, adj_graph, adjnnz, ex_scan, N};
switch (algo) {
// TODO: deprecate naive runner. cf #3414
case 0: Naive::launcher<Index_>(handle, data, batch_size, stream); break;
case 1: Algo::launcher<Index_>(handle, data, batch_size, stream); break;
case 1: Algo::launcher<Index_>(handle, data, batch_size, row_counters, stream); break;
default: ASSERT(false, "Incorrect algo passed! '%d'", algo);
}
}
Expand Down
20 changes: 17 additions & 3 deletions cpp/src/dbscan/runner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ std::size_t run(const raft::handle_t& handle,
std::size_t m_size = raft::alignTo<std::size_t>(sizeof(bool), align);
std::size_t vd_size = raft::alignTo<std::size_t>(sizeof(Index_) * (batch_size + 1), align);
std::size_t ex_scan_size = raft::alignTo<std::size_t>(sizeof(Index_) * batch_size, align);
std::size_t row_cnt_size = raft::alignTo<std::size_t>(sizeof(Index_) * batch_size, align);
std::size_t labels_size = raft::alignTo<std::size_t>(sizeof(Index_) * N, align);

Index_ MAX_LABEL = std::numeric_limits<Index_>::max();
Expand All @@ -160,7 +161,8 @@ std::size_t run(const raft::handle_t& handle,
(unsigned long)batch_size);

if (workspace == NULL) {
auto size = adj_size + core_pts_size + m_size + vd_size + ex_scan_size + 2 * labels_size;
auto size =
adj_size + core_pts_size + m_size + vd_size + ex_scan_size + row_cnt_size + 2 * labels_size;
return size;
}

Expand All @@ -179,6 +181,8 @@ std::size_t run(const raft::handle_t& handle,
temp += vd_size;
Index_* ex_scan = (Index_*)temp;
temp += ex_scan_size;
Index_* row_counters = (Index_*)temp;
temp += row_cnt_size;
Index_* labels_temp = (Index_*)temp;
temp += labels_size;
Index_* work_buffer = (Index_*)temp;
Expand Down Expand Up @@ -237,8 +241,18 @@ std::size_t run(const raft::handle_t& handle,
maxadjlen = curradjlen;
adj_graph.resize(maxadjlen, stream);
}
AdjGraph::run<Index_>(
handle, adj, vd, adj_graph.data(), curradjlen, ex_scan, N, algo_adj, n_points, stream);
AdjGraph::run<Index_>(handle,
adj,
vd,
adj_graph.data(),
curradjlen,
ex_scan,
N,
algo_adj,
n_points,
row_counters,
stream);

raft::common::nvtx::pop_range();

CUML_LOG_DEBUG("--> Computing connected components");
Expand Down
40 changes: 32 additions & 8 deletions cpp/src/dbscan/vertexdeg/algo.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@

#pragma once

#include "pack.h"
#include <cuda_runtime.h>
#include <math.h>
#include <raft/device_atomics.cuh>
#include <raft/linalg/coalesced_reduction.hpp>
#include <raft/linalg/matrix_vector_op.hpp>
#include <raft/linalg/norm.cuh>
#include <raft/spatial/knn/epsilon_neighborhood.hpp>
#include <rmm/device_uvector.hpp>

#include "pack.h"

namespace ML {
namespace Dbscan {
namespace VertexDeg {
Expand All @@ -41,7 +42,11 @@ void launcher(const raft::handle_t& handle,
cudaStream_t stream,
raft::distance::DistanceType metric)
{
data.resetArray(stream, batch_size + 1);
// The last position of data.vd is the sum of all elements in this array
// (excluding it). Hence, its length is one more than the number of points
// Initialize it to zero.
index_t* d_nnz = data.vd + batch_size;
RAFT_CUDA_TRY(cudaMemsetAsync(d_nnz, 0, sizeof(index_t), stream));

ASSERT(sizeof(index_t) == 4 || sizeof(index_t) == 8, "index_t should be 4 or 8 bytes");

Expand All @@ -50,6 +55,7 @@ void launcher(const raft::handle_t& handle,
index_t k = data.D;
value_t eps2;

// Compute adjacency matrix `adj` using Cosine or L2 metric.
if (metric == raft::distance::DistanceType::CosineExpanded) {
rmm::device_uvector<value_t> rowNorms(m, stream);

Expand Down Expand Up @@ -79,7 +85,7 @@ void launcher(const raft::handle_t& handle,
eps2 = 2 * data.eps;

raft::spatial::knn::epsUnexpL2SqNeighborhood<value_t, index_t>(
data.adj, data.vd, data.x, data.x + start_vertex_id * k, m, n, k, eps2, stream);
data.adj, nullptr, data.x + start_vertex_id * k, data.x, n, m, k, eps2, stream);

/**
* Restoring the input matrix after normalization.
Expand All @@ -94,14 +100,32 @@ void launcher(const raft::handle_t& handle,
true,
[] __device__(value_t mat_in, value_t vec_in) { return mat_in * vec_in; },
stream);
}

else {
} else {
eps2 = data.eps * data.eps;

// 1. The output matrix adj is now an n x m matrix (row-major order)
// 2. Do not compute the vertex degree in epsUnexpL2SqNeighborhood (pass a
// nullptr)
raft::spatial::knn::epsUnexpL2SqNeighborhood<value_t, index_t>(
data.adj, data.vd, data.x, data.x + start_vertex_id * k, m, n, k, eps2, stream);
data.adj, nullptr, data.x + start_vertex_id * k, data.x, n, m, k, eps2, stream);
}

// Reduction of adj to compute the vertex degrees
raft::linalg::coalescedReduction<bool, index_t, index_t>(
data.vd,
data.adj,
data.N,
batch_size,
(index_t)0,
stream,
false,
[] __device__(bool adj_ij, index_t idx) { return static_cast<index_t>(adj_ij); },
raft::Sum<index_t>(),
[d_nnz] __device__(index_t degree) {
atomicAdd(d_nnz, degree);
return degree;
});
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

} // namespace Algo
Expand Down
47 changes: 20 additions & 27 deletions cpp/src/dbscan/vertexdeg/precomputed.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -51,51 +51,44 @@ void launcher(const raft::handle_t& handle,
index_t batch_size,
cudaStream_t stream)
{
const value_t& eps = data.eps;

// Note: the matrix is symmetric. We take advantage of this to have two
// coalesced kernels:
// - The reduction works on a column-major N*B matrix to compute a B vector
// with a cub-BlockReduce-based primitive.
// The final_op is used to compute the total number of non-zero elements
// - The conversion to a boolean matrix works on a column-major B*N matrix
// (coalesced 2d copy + transform).
//
// If we end up supporting distributed distance matrices for MNMG, we can't
// rely on this trick anymore and need either a transposed kernel or to
// change the output layout.

// Regarding index types, a special index type is used here for indices in
// the distance matrix due to its dimensions (that are independent of the
// batch size)
using long_index_t = long long int;

// Reduction to compute the vertex degrees
// The last position of data.vd is the sum of all elements in this array
// (excluding it). Hence, its length is one more than the number of points
// Initialize it to zero.
index_t* d_nnz = data.vd + batch_size;
RAFT_CUDA_TRY(cudaMemsetAsync(d_nnz, 0, sizeof(index_t), stream));
raft::linalg::coalescedReduction<value_t, index_t, long_index_t>(

long_index_t N = data.N;
long_index_t cur_batch_size = min(data.N - start_vertex_id, batch_size);

const value_t& eps = data.eps;
raft::linalg::unaryOp<value_t>(
data.adj,
data.x + (long_index_t)start_vertex_id * N,
cur_batch_size * N,
[eps] __device__(value_t dist) { return (dist <= eps); },
stream);
RAFT_CUDA_TRY(cudaPeekAtLastError());

// Reduction of adj to compute the vertex degrees
raft::linalg::coalescedReduction<bool, index_t, long_index_t>(
data.vd,
data.x + start_vertex_id * data.N,
data.adj,
data.N,
batch_size,
(index_t)0,
stream,
false,
[eps] __device__(value_t dist, long_index_t idx) { return static_cast<index_t>(dist <= eps); },
[] __device__(bool adj_ij, long_index_t idx) { return static_cast<index_t>(adj_ij); },
raft::Sum<index_t>(),
[d_nnz] __device__(index_t degree) {
atomicAdd(d_nnz, degree);
return degree;
});

// Transform the distance matrix into a neighborhood matrix
dist_to_adj_kernel<<<data.N, std::min(batch_size, (index_t)256), 0, stream>>>(
data.x,
data.adj,
(long_index_t)data.N,
(long_index_t)start_vertex_id,
(long_index_t)batch_size,
data.eps);
RAFT_CUDA_TRY(cudaPeekAtLastError());
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/dbscan/vertexdeg/runner.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void run(const raft::handle_t& handle,
{
Pack<Type_f, Index_> data = {vd, adj, x, eps, N, D};
switch (algo) {
// TODO: deprecate naive runner. cf #3414
case 0: Naive::launcher<Type_f, Index_>(data, start_vertex_id, batch_size, stream); break;
case 1:
Algo::launcher<Type_f, Index_>(handle, data, start_vertex_id, batch_size, stream, metric);
Expand Down

0 comments on commit 2fad4c0

Please sign in to comment.