Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Batching haversine to improve concurrency #191

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cpp/include/raft/sparse/selection/knn.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,6 @@ class sparse_knn_t {
value_idx batch_rows = query_batcher.batch_rows(),
batch_cols = idx_batcher.batch_rows();

// build translation buffer to shift resulting indices by the batch
std::vector<value_idx> id_ranges;
id_ranges.push_back(0);
id_ranges.push_back(idx_batcher.batch_start());

// in the case where the number of idx rows in the batch is < k, we
// want to adjust k.
value_idx n_neighbors = min(k, batch_cols);
Expand Down
49 changes: 24 additions & 25 deletions cpp/include/raft/sparse/selection/selection.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
#include <raft/cudart_utils.h>
#include <raft/sparse/cusparse_wrappers.h>
#include <raft/cuda_utils.cuh>
#include <raft/matrix/matrix.cuh>

#include <raft/sparse/coo.cuh>
#include <raft/sparse/csr.cuh>
#include <raft/sparse/distance/distance.cuh>

#include <faiss/gpu/GpuDistance.h>
#include <faiss/gpu/GpuIndexFlat.h>
Expand All @@ -43,9 +38,12 @@ template <typename K, typename IndexType, bool select_min, int warp_q,
int thread_q, int tpb>
__global__ void select_k_kernel(K *inK, IndexType *inV, size_t n_rows,
size_t n_cols, K *outK, IndexType *outV,
K initK, IndexType initV, int k) {
K initK, IndexType initV, int k,
int k_out = -1) {
constexpr int kNumWarps = tpb / faiss::gpu::kWarpSize;

k_out = k_out > 0 ? k_out : k;

__shared__ K smemK[kNumWarps * warp_q];
__shared__ IndexType smemV[kNumWarps * warp_q];

Expand Down Expand Up @@ -81,16 +79,17 @@ __global__ void select_k_kernel(K *inK, IndexType *inV, size_t n_rows,
heap.reduce();

for (int i = threadIdx.x; i < k; i += tpb) {
outK[row * k + i] = smemK[i];
outV[row * k + i] = smemV[i];
outK[row * k_out + i] = smemK[i];
outV[row * k_out + i] = smemV[i];
}
}

template <typename value_idx = int, typename value_t = float, int warp_q,
int thread_q>
inline void select_k_impl(value_t *inK, value_idx *inV, size_t n_rows,
size_t n_cols, value_t *outK, value_idx *outV,
bool select_min, int k, cudaStream_t stream) {
bool select_min, int k, cudaStream_t stream,
int k_out = -1) {
auto grid = dim3(n_rows);

constexpr int n_threads = (warp_q <= 1024) ? 128 : 64;
Expand All @@ -102,11 +101,11 @@ inline void select_k_impl(value_t *inK, value_idx *inV, size_t n_rows,
if (select_min) {
select_k_kernel<value_t, value_idx, false, warp_q, thread_q, n_threads>
<<<grid, block, 0, stream>>>(inK, inV, n_rows, n_cols, outK, outV, kInit,
vInit, k);
vInit, k, k_out);
} else {
select_k_kernel<value_t, value_idx, true, warp_q, thread_q, n_threads>
<<<grid, block, 0, stream>>>(inK, inV, n_rows, n_cols, outK, outV, kInit,
vInit, k);
vInit, k, k_out);
}
CUDA_CHECK(cudaGetLastError());
}
Expand All @@ -128,28 +127,28 @@ inline void select_k_impl(value_t *inK, value_idx *inV, size_t n_rows,
template <typename value_idx = int, typename value_t = float>
inline void select_k(value_t *inK, value_idx *inV, size_t n_rows, size_t n_cols,
value_t *outK, value_idx *outV, bool select_min, int k,
cudaStream_t stream) {
cudaStream_t stream, int k_out = -1) {
if (k == 1)
select_k_impl<value_idx, value_t, 1, 1>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
outV, select_min, k, stream, k_out);
else if (k <= 32)
select_k_impl<value_idx, value_t, 32, 2>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 32, 2>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
else if (k <= 64)
select_k_impl<value_idx, value_t, 64, 3>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 64, 3>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
else if (k <= 128)
select_k_impl<value_idx, value_t, 128, 3>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 128, 3>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
else if (k <= 256)
select_k_impl<value_idx, value_t, 256, 4>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 256, 4>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
else if (k <= 512)
select_k_impl<value_idx, value_t, 512, 8>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 512, 8>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
else if (k <= 1024)
select_k_impl<value_idx, value_t, 1024, 8>(inK, inV, n_rows, n_cols, outK,
outV, select_min, k, stream);
select_k_impl<value_idx, value_t, 1024, 8>(
inK, inV, n_rows, n_cols, outK, outV, select_min, k, stream, k_out);
}

}; // namespace selection
Expand Down
129 changes: 2 additions & 127 deletions cpp/include/raft/spatial/knn/detail/brute_force_knn.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -33,140 +33,15 @@
#include <raft/handle.hpp>
#include <set>

#include "haversine_distance.cuh"
#include "haversine_knn.cuh"
#include "processing.hpp"
#include "selection.cuh"

namespace raft {
namespace spatial {
namespace knn {
namespace detail {

template <typename value_idx = int64_t, typename value_t = float, int warp_q,
int thread_q, int tpb>
__global__ void knn_merge_parts_kernel(value_t *inK, value_idx *inV,
value_t *outK, value_idx *outV,
size_t n_samples, int n_parts,
value_t initK, value_idx initV, int k,
value_idx *translations) {
constexpr int kNumWarps = tpb / faiss::gpu::kWarpSize;

__shared__ value_t smemK[kNumWarps * warp_q];
__shared__ value_idx smemV[kNumWarps * warp_q];

/**
* Uses shared memory
*/
faiss::gpu::BlockSelect<value_t, value_idx, false,
faiss::gpu::Comparator<value_t>, warp_q, thread_q,
tpb>
heap(initK, initV, smemK, smemV, k);

// Grid is exactly sized to rows available
int row = blockIdx.x;
int total_k = k * n_parts;

int i = threadIdx.x;

// Get starting pointers for cols in current thread
int part = i / k;
size_t row_idx = (row * k) + (part * n_samples * k);

int col = i % k;

value_t *inKStart = inK + (row_idx + col);
value_idx *inVStart = inV + (row_idx + col);

int limit = faiss::gpu::utils::roundDown(total_k, faiss::gpu::kWarpSize);
value_idx translation = 0;

for (; i < limit; i += tpb) {
translation = translations[part];
heap.add(*inKStart, (*inVStart) + translation);

part = (i + tpb) / k;
row_idx = (row * k) + (part * n_samples * k);

col = (i + tpb) % k;

inKStart = inK + (row_idx + col);
inVStart = inV + (row_idx + col);
}

// Handle last remainder fraction of a warp of elements
if (i < total_k) {
translation = translations[part];
heap.addThreadQ(*inKStart, (*inVStart) + translation);
}

heap.reduce();

for (int i = threadIdx.x; i < k; i += tpb) {
outK[row * k + i] = smemK[i];
outV[row * k + i] = smemV[i];
}
}

template <typename value_idx = int64_t, typename value_t = float, int warp_q,
int thread_q>
inline void knn_merge_parts_impl(value_t *inK, value_idx *inV, value_t *outK,
value_idx *outV, size_t n_samples, int n_parts,
int k, cudaStream_t stream,
value_idx *translations) {
auto grid = dim3(n_samples);

constexpr int n_threads = (warp_q <= 1024) ? 128 : 64;
auto block = dim3(n_threads);

auto kInit = faiss::gpu::Limits<value_t>::getMax();
auto vInit = -1;
knn_merge_parts_kernel<value_idx, value_t, warp_q, thread_q, n_threads>
<<<grid, block, 0, stream>>>(inK, inV, outK, outV, n_samples, n_parts,
kInit, vInit, k, translations);
CUDA_CHECK(cudaPeekAtLastError());
}

/**
* @brief Merge knn distances and index matrix, which have been partitioned
* by row, into a single matrix with only the k-nearest neighbors.
*
* @param inK partitioned knn distance matrix
* @param inV partitioned knn index matrix
* @param outK merged knn distance matrix
* @param outV merged knn index matrix
* @param n_samples number of samples per partition
* @param n_parts number of partitions
* @param k number of neighbors per partition (also number of merged neighbors)
* @param stream CUDA stream to use
* @param translations mapping of index offsets for each partition
*/
template <typename value_idx = int64_t, typename value_t = float>
inline void knn_merge_parts(value_t *inK, value_idx *inV, value_t *outK,
value_idx *outV, size_t n_samples, int n_parts,
int k, cudaStream_t stream,
value_idx *translations) {
if (k == 1)
knn_merge_parts_impl<value_idx, value_t, 1, 1>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 32)
knn_merge_parts_impl<value_idx, value_t, 32, 2>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 64)
knn_merge_parts_impl<value_idx, value_t, 64, 3>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 128)
knn_merge_parts_impl<value_idx, value_t, 128, 3>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 256)
knn_merge_parts_impl<value_idx, value_t, 256, 4>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 512)
knn_merge_parts_impl<value_idx, value_t, 512, 8>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
else if (k <= 1024)
knn_merge_parts_impl<value_idx, value_t, 1024, 8>(
inK, inV, outK, outV, n_samples, n_parts, k, stream, translations);
}

inline faiss::MetricType build_faiss_metric(
raft::distance::DistanceType metric) {
switch (metric) {
Expand Down
140 changes: 0 additions & 140 deletions cpp/include/raft/spatial/knn/detail/haversine_distance.cuh

This file was deleted.

Loading