diff --git a/cpp/include/raft/neighbors/ivf_pq.cuh b/cpp/include/raft/neighbors/ivf_pq.cuh index 207e298947..5b2035fadf 100644 --- a/cpp/include/raft/neighbors/ivf_pq.cuh +++ b/cpp/include/raft/neighbors/ivf_pq.cuh @@ -53,7 +53,7 @@ namespace raft::neighbors::ivf_pq { * * @param handle * @param params configure the index building - * @param[in] dataset a device pointer to a row-major matrix [n_rows, dim] + * @param[in] dataset a device/host pointer to a row-major matrix [n_rows, dim] * @param n_rows the number of samples * @param dim the dimensionality of the data * @@ -91,8 +91,8 @@ inline auto build( * * @param handle * @param orig_index original index - * @param[in] new_vectors a device pointer to a row-major matrix [n_rows, index.dim()] - * @param[in] new_indices a device pointer to a vector of indices [n_rows]. + * @param[in] new_vectors a device/host pointer to a row-major matrix [n_rows, index.dim()] + * @param[in] new_indices a device/host pointer to a vector of indices [n_rows]. * If the original index is empty (`orig_index.size() == 0`), you can pass `nullptr` * here to imply a continuous range `[0...n_rows)`. * @param n_rows the number of samples @@ -118,8 +118,8 @@ inline auto extend(const handle_t& handle, * * @param handle * @param[inout] index - * @param[in] new_vectors a device pointer to a row-major matrix [n_rows, index.dim()] - * @param[in] new_indices a device pointer to a vector of indices [n_rows]. + * @param[in] new_vectors a device/host pointer to a row-major matrix [n_rows, index.dim()] + * @param[in] new_indices a device/host pointer to a vector of indices [n_rows]. * If the original index is empty (`orig_index.size() == 0`), you can pass `nullptr` * here to imply a continuous range `[0...n_rows)`. * @param n_rows the number of samples diff --git a/cpp/include/raft/neighbors/ivf_pq_types.hpp b/cpp/include/raft/neighbors/ivf_pq_types.hpp index 3dbf004e95..afb3eb6cd6 100644 --- a/cpp/include/raft/neighbors/ivf_pq_types.hpp +++ b/cpp/include/raft/neighbors/ivf_pq_types.hpp @@ -23,6 +23,8 @@ #include #include +#include + #include namespace raft::neighbors::ivf_pq { @@ -108,17 +110,30 @@ struct search_params : ann::search_params { */ cudaDataType_t internal_distance_dtype = CUDA_R_32F; /** - * Thread block size of the distance calculation kernel at search time. - * When zero, an optimal block size is selected using a heuristic. + * Preferred fraction of SM's unified memory / L1 cache to be used as shared memory. + * + * Possible values: [0.0 - 1.0] as a fraction of the `sharedMemPerMultiprocessor`. + * + * One wants to increase the carveout to make sure a good GPU occupancy for the main search + * kernel, but not to keep it too high to leave some memory to be used as L1 cache. Note, this + * value is interpreted only as a hint. Moreover, a GPU usually allows only a fixed set of cache + * configurations, so the provided value is rounded up to the nearest configuration. Refer to the + * NVIDIA tuning guide for the target GPU architecture. * - * Possible values: [0, 256, 512, 1024] + * Note, this is a low-level tuning parameter that can have drastic negative effects on the search + * performance if tweaked incorrectly. */ - uint32_t preferred_thread_block_size = 0; + double preferred_shmem_carveout = 1.0; }; static_assert(std::is_aggregate_v); static_assert(std::is_aggregate_v); +/** Size of the interleaved group. */ +constexpr static uint32_t kIndexGroupSize = 32; +/** Stride of the interleaved group for vectorized loads. */ +constexpr static uint32_t kIndexGroupVecLen = 16; + /** * @brief IVF-PQ index. * @@ -170,6 +185,19 @@ struct index : ann::index { "IdxT must be able to represent all values of uint32_t"); public: + /** + * Default value filled in the `indices()` array. + * One may encounter it trying to access a record within a cluster that is outside of the + * `list_sizes()` bound (due to the record alignment `kIndexGroupSize`). + */ + constexpr static IdxT kInvalidRecord = std::numeric_limits::max() - 1; + /** + * Default value returned by `search` when the `n_probes` is too small and top-k is too large. + * One may encounter it if the combined size of probed clusters is smaller than the requested + * number of results per query. + */ + constexpr static IdxT kOutOfBoundsRecord = std::numeric_limits::max(); + /** Total length of the index. */ [[nodiscard]] constexpr inline auto size() const noexcept -> IdxT { return indices_.extent(0); } /** Dimensionality of the input data. */ @@ -247,12 +275,12 @@ struct index : ann::index { pq_dim_(pq_dim == 0 ? calculate_pq_dim(dim) : pq_dim), n_nonempty_lists_(n_nonempty_lists), pq_centers_{make_device_mdarray(handle, make_pq_centers_extents())}, - pq_dataset_{make_device_mdarray( - handle, make_extents(0, this->pq_dim() * this->pq_bits() / 8))}, + pq_dataset_{make_device_mdarray(handle, make_pq_dataset_extents(0))}, indices_{make_device_mdarray(handle, make_extents(0))}, rotation_matrix_{ make_device_mdarray(handle, make_extents(this->rot_dim(), this->dim()))}, list_offsets_{make_device_mdarray(handle, make_extents(this->n_lists() + 1))}, + list_sizes_{make_device_mdarray(handle, make_extents(this->n_lists()))}, centers_{make_device_mdarray( handle, make_extents(this->n_lists(), this->dim_ext()))}, centers_rot_{make_device_mdarray( @@ -283,35 +311,49 @@ struct index : ann::index { */ void allocate(const handle_t& handle, IdxT index_size) { - pq_dataset_ = - make_device_mdarray(handle, make_extents(index_size, pq_dataset_.extent(1))); - indices_ = make_device_mdarray(handle, make_extents(index_size)); + pq_dataset_ = make_device_mdarray(handle, make_pq_dataset_extents(index_size)); + indices_ = make_device_mdarray(handle, make_extents(index_size)); + if (index_size > 0) { + thrust::fill_n( + handle.get_thrust_policy(), indices_.data_handle(), index_size, kInvalidRecord); + } check_consistency(); } + using pq_centers_extents = + std::experimental::extents; /** * PQ cluster centers * - * - codebook_gen::PER_SUBSPACE: [pq_dim , pq_book_size, pq_len] - * - codebook_gen::PER_CLUSTER: [n_lists, pq_book_size, pq_len] + * - codebook_gen::PER_SUBSPACE: [pq_dim , pq_len, pq_book_size] + * - codebook_gen::PER_CLUSTER: [n_lists, pq_len, pq_book_size] */ - inline auto pq_centers() noexcept -> device_mdspan, row_major> + inline auto pq_centers() noexcept -> device_mdspan { return pq_centers_.view(); } [[nodiscard]] inline auto pq_centers() const noexcept - -> device_mdspan, row_major> + -> device_mdspan { return pq_centers_.view(); } - /** PQ-encoded data [size, pq_dim * pq_bits / 8]. */ - inline auto pq_dataset() noexcept -> device_mdspan, row_major> + using pq_dataset_extents = std::experimental:: + extents; + /** PQ-encoded data stored in the interleaved format: + * + * [ ceildiv(size, kIndexGroupSize) + * , ceildiv(pq_dim, (kIndexGroupVecLen * 8u) / pq_bits) + * , kIndexGroupSize + * , kIndexGroupVecLen + * ]. + */ + inline auto pq_dataset() noexcept -> device_mdspan { return pq_dataset_.view(); } [[nodiscard]] inline auto pq_dataset() const noexcept - -> device_mdspan, row_major> + -> device_mdspan { return pq_dataset_.view(); } @@ -352,6 +394,17 @@ struct index : ann::index { return list_offsets_.view(); } + /** Sizes of the lists [n_lists]. */ + inline auto list_sizes() noexcept -> device_mdspan, row_major> + { + return list_sizes_.view(); + } + [[nodiscard]] inline auto list_sizes() const noexcept + -> device_mdspan, row_major> + { + return list_sizes_.view(); + } + /** Cluster centers corresponding to the lists in the original space [n_lists, dim_ext] */ inline auto centers() noexcept -> device_mdspan, row_major> { @@ -374,6 +427,18 @@ struct index : ann::index { return centers_rot_.view(); } + /** A helper function to determine the extents of an array enough to hold a given amount of data. + */ + auto make_pq_dataset_extents(IdxT n_rows) -> pq_dataset_extents + { + // how many elems of pq_dim fit into one kIndexGroupVecLen-byte chunk + auto pq_chunk = (kIndexGroupVecLen * 8u) / pq_bits(); + return make_extents(raft::div_rounding_up_safe(n_rows, kIndexGroupSize), + raft::div_rounding_up_safe(pq_dim(), pq_chunk), + kIndexGroupSize, + kIndexGroupVecLen); + } + private: raft::distance::DistanceType metric_; codebook_gen codebook_kind_; @@ -383,11 +448,12 @@ struct index : ann::index { uint32_t pq_dim_; uint32_t n_nonempty_lists_; - device_mdarray, row_major> pq_centers_; - device_mdarray, row_major> pq_dataset_; + device_mdarray pq_centers_; + device_mdarray pq_dataset_; device_mdarray, row_major> indices_; device_mdarray, row_major> rotation_matrix_; device_mdarray, row_major> list_offsets_; + device_mdarray, row_major> list_sizes_; device_mdarray, row_major> centers_; device_mdarray, row_major> centers_rot_; @@ -404,13 +470,13 @@ struct index : ann::index { pq_bits() * pq_dim()); } - auto make_pq_centers_extents() -> extent_3d + auto make_pq_centers_extents() -> pq_centers_extents { switch (codebook_kind()) { case codebook_gen::PER_SUBSPACE: - return make_extents(pq_dim(), pq_book_size(), pq_len()); + return make_extents(pq_dim(), pq_len(), pq_book_size()); case codebook_gen::PER_CLUSTER: - return make_extents(n_lists(), pq_book_size(), pq_len()); + return make_extents(n_lists(), pq_len(), pq_book_size()); default: RAFT_FAIL("Unreachable code"); } } @@ -420,7 +486,7 @@ struct index : ann::index { // If the dimensionality is large enough, we can reduce it to improve performance if (dim >= 128) { dim /= 2; } // Round it down to 32 to improve performance. - uint32_t r = raft::round_down_safe(dim, 32); + auto r = raft::round_down_safe(dim, 32); if (r > 0) return r; // If the dimensionality is really low, round it to the closest power-of-two r = 1; diff --git a/cpp/include/raft/spatial/knn/detail/ann_utils.cuh b/cpp/include/raft/spatial/knn/detail/ann_utils.cuh index f27aeb24f9..ec03476252 100644 --- a/cpp/include/raft/spatial/knn/detail/ann_utils.cuh +++ b/cpp/include/raft/spatial/knn/detail/ann_utils.cuh @@ -54,8 +54,9 @@ struct pointer_residency_count { cudaPointerAttributes attr; RAFT_CUDA_TRY(cudaPointerGetAttributes(&attr, ptr)); switch (attr.type) { - case cudaMemoryTypeUnregistered: - case cudaMemoryTypeHost: return std::make_tuple(on_device, on_host + 1); + case cudaMemoryTypeUnregistered: return std::make_tuple(on_device, on_host + 1); + case cudaMemoryTypeHost: + return std::make_tuple(on_device + int(attr.devicePointer == ptr), on_host + 1); case cudaMemoryTypeDevice: return std::make_tuple(on_device + 1, on_host); case cudaMemoryTypeManaged: return std::make_tuple(on_device + 1, on_host + 1); default: return std::make_tuple(on_device, on_host); @@ -75,6 +76,58 @@ auto check_pointer_residency(const Types*... ptrs) -> pointer_residency return pointer_residency::mixed; } +/** RAII helper to access the host data from gpu when necessary. */ +template +struct with_mapped_memory_t { + with_mapped_memory_t(PtrT ptr, size_t size, Action action) : action_(action) + { + if (ptr == nullptr) { return; } + switch (utils::check_pointer_residency(ptr)) { + case utils::pointer_residency::device_only: + case utils::pointer_residency::host_and_device: { + dev_ptr_ = (void*)ptr; // NOLINT + } break; + default: { + host_ptr_ = (void*)ptr; // NOLINT + RAFT_CUDA_TRY(cudaHostRegister(host_ptr_, size, choose_flags(ptr))); + RAFT_CUDA_TRY(cudaHostGetDevicePointer(&dev_ptr_, host_ptr_, 0)); + } break; + } + } + + ~with_mapped_memory_t() + { + if (host_ptr_ != nullptr) { cudaHostUnregister(host_ptr_); } + } + + auto operator()() { return action_((PtrT)dev_ptr_); } // NOLINT + + private: + Action action_; + void* host_ptr_ = nullptr; + void* dev_ptr_ = nullptr; + + template + static auto choose_flags(const T*) -> unsigned int + { + int dev_id, readonly_supported; + RAFT_CUDA_TRY(cudaGetDevice(&dev_id)); + RAFT_CUDA_TRY(cudaDeviceGetAttribute( + &readonly_supported, cudaDevAttrHostRegisterReadOnlySupported, dev_id)); + if (readonly_supported) { + return cudaHostRegisterMapped | cudaHostRegisterReadOnly; + } else { + return cudaHostRegisterMapped; + } + } + + template + static auto choose_flags(T*) -> unsigned int + { + return cudaHostRegisterMapped; + } +}; + template struct config { }; diff --git a/cpp/include/raft/spatial/knn/detail/ivf_pq_build.cuh b/cpp/include/raft/spatial/knn/detail/ivf_pq_build.cuh index 0577d24349..de7fa1b2f7 100644 --- a/cpp/include/raft/spatial/knn/detail/ivf_pq_build.cuh +++ b/cpp/include/raft/spatial/knn/detail/ivf_pq_build.cuh @@ -16,15 +16,17 @@ #pragma once -#include "../ivf_pq_types.hpp" #include "ann_kmeans_balanced.cuh" #include "ann_utils.cuh" +#include + #include #include #include #include #include +#include #include #include #include @@ -50,6 +52,14 @@ namespace raft::spatial::knn::ivf_pq::detail { using namespace raft::spatial::knn::detail; // NOLINT +using raft::neighbors::ivf_pq::codebook_gen; +using raft::neighbors::ivf_pq::index; +using raft::neighbors::ivf_pq::index_params; +using raft::neighbors::ivf_pq::kIndexGroupSize; +using raft::neighbors::ivf_pq::kIndexGroupVecLen; + +using pq_codes_exts = extents; + namespace { /** @@ -108,55 +118,72 @@ struct bitfield_view_t { NB: label type is uint32_t although it can only contain values up to `1 << pq_bits`. We keep it this way to not force one more overload for kmeans::predict. */ -template -HDI void ivfpq_encode_core(uint32_t n_rows, uint32_t pq_dim, const uint32_t* label, uint8_t* output) +template +__device__ void ivfpq_encode_core(uint32_t n_rows, + uint32_t pq_dim, + const uint32_t* label, + uint8_t* output) { - bitfield_view_t out{output}; - for (uint32_t j = 0; j < pq_dim; j++, label += n_rows) { - out[j] = static_cast(*label); + constexpr uint32_t kChunkSize = (VecLen * 8u) / PqBits; + TxN_t vec; + for (uint32_t j = 0; j < pq_dim;) { + vec.fill(0); + bitfield_view_t out{vec.val.data}; +#pragma unroll + for (uint32_t k = 0; k < kChunkSize && j < pq_dim; k++, j++, label += n_rows) { + out[k] = static_cast(*label); + } + vec.store(output, 0); + output += VecLen; } } template __launch_bounds__(BlockDim) __global__ - void ivfpq_encode_kernel(uint32_t n_rows, - uint32_t pq_dim, + void ivfpq_encode_kernel(uint32_t pq_dim, const uint32_t* label, // [pq_dim, n_rows] - uint8_t* output // [n_rows, pq_dim] + device_mdspan output // [n_rows, ..] ) { uint32_t i = threadIdx.x + BlockDim * blockIdx.x; - if (i >= n_rows) return; - ivfpq_encode_core(n_rows, pq_dim, label + i, output + (pq_dim * PqBits / 8) * i); + if (i >= output.extent(0)) return; + ivfpq_encode_core( + output.extent(0), + pq_dim, + label + i, + output.data_handle() + output.extent(1) * output.extent(2) * i); } } // namespace -inline void ivfpq_encode(uint32_t n_rows, - uint32_t pq_dim, +/** + * Compress the cluster labels into an encoding with pq_bits bits, and transform it into a form to + * facilitate vectorized loads + */ +inline void ivfpq_encode(uint32_t pq_dim, uint32_t pq_bits, // 4 <= pq_bits <= 8 const uint32_t* label, // [pq_dim, n_rows] - uint8_t* output, // [n_rows, pq_dim] + device_mdspan output, // [n_rows, ..] rmm::cuda_stream_view stream) { constexpr uint32_t kBlockDim = 128; dim3 threads(kBlockDim, 1, 1); - dim3 blocks(raft::ceildiv(n_rows, kBlockDim), 1, 1); + dim3 blocks(raft::ceildiv(output.extent(0), kBlockDim), 1, 1); switch (pq_bits) { case 4: return ivfpq_encode_kernel - <<>>(n_rows, pq_dim, label, output); + <<>>(pq_dim, label, output); case 5: return ivfpq_encode_kernel - <<>>(n_rows, pq_dim, label, output); + <<>>(pq_dim, label, output); case 6: return ivfpq_encode_kernel - <<>>(n_rows, pq_dim, label, output); + <<>>(pq_dim, label, output); case 7: return ivfpq_encode_kernel - <<>>(n_rows, pq_dim, label, output); + <<>>(pq_dim, label, output); case 8: return ivfpq_encode_kernel - <<>>(n_rows, pq_dim, label, output); + <<>>(pq_dim, label, output); default: RAFT_FAIL("Invalid pq_bits (%u), the value must be within [4, 8]", pq_bits); } } @@ -226,7 +253,7 @@ void select_residuals(const handle_t& handle, ) { auto stream = handle.get_stream(); - rmm::device_uvector tmp(n_rows * dim, stream, device_memory); + rmm::device_uvector tmp(size_t(n_rows) * size_t(dim), stream, device_memory); utils::copy_selected( n_rows, (IdxT)dim, dataset, row_ids, (IdxT)dim, tmp.data(), (IdxT)dim, stream); @@ -260,6 +287,7 @@ void select_residuals(const handle_t& handle, } /** + * * @param handle, * @param n_rows * @param data_dim @@ -278,30 +306,35 @@ void select_residuals(const handle_t& handle, * it should be partitioned by the clusters by now. * @param cluster_sizes // [n_clusters] * @param cluster_offsets // [n_clusters + 1] - * @param pq_centers // [...] - * @param pq_dataset // [n_rows, pq_dim * pq_bits / 8] + * @param pq_centers // [...] (see ivf_pq::index::pq_centers() layout) + * @param pq_dataset + * // [n_rows, ceildiv(pq_dim, (kIndexGroupVecLen * 8u) / pq_bits), kIndexGroupVecLen] + * NB: in contrast to the final interleaved layout in ivf_pq::index::pq_dataset(), this function + * produces a non-interleaved data; it gets interleaved later when adding the data to the + * index. * @param device_memory */ template -void compute_pq_codes(const handle_t& handle, - IdxT n_rows, - uint32_t data_dim, - uint32_t rot_dim, - uint32_t pq_dim, - uint32_t pq_len, - uint32_t pq_bits, - uint32_t n_clusters, - codebook_gen codebook_kind, - uint32_t max_cluster_size, - float* cluster_centers, - const float* rotation_matrix, - const T* dataset, - const IdxT* data_indices, - const uint32_t* cluster_sizes, - const IdxT* cluster_offsets, - const float* pq_centers, - uint8_t* pq_dataset, - rmm::mr::device_memory_resource* device_memory) +void compute_pq_codes( + const handle_t& handle, + IdxT n_rows, + uint32_t data_dim, + uint32_t rot_dim, + uint32_t pq_dim, + uint32_t pq_len, + uint32_t pq_bits, + uint32_t n_clusters, + codebook_gen codebook_kind, + uint32_t max_cluster_size, + float* cluster_centers, + const float* rotation_matrix, + const T* dataset, + const IdxT* data_indices, + const uint32_t* cluster_sizes, + const IdxT* cluster_offsets, + device_mdspan::pq_centers_extents, row_major> pq_centers, + device_mdspan pq_dataset, + rmm::mr::device_memory_resource* device_memory) { common::nvtx::range fun_scope( "ivf_pq::compute_pq_codes(n_rows = %zu, data_dim = %u, rot_dim = %u (%u * %u), n_clusters = " @@ -317,15 +350,15 @@ void compute_pq_codes(const handle_t& handle, // // Compute PQ code // - utils::memzero(pq_dataset, n_rows * pq_dim * pq_bits / 8, stream); - rmm::device_uvector rot_vectors(max_cluster_size * rot_dim, stream, device_memory); - rmm::device_uvector sub_vectors(max_cluster_size * pq_dim * pq_len, stream, device_memory); - rmm::device_uvector sub_vector_labels(max_cluster_size * pq_dim, stream, device_memory); - rmm::device_uvector my_pq_dataset( - max_cluster_size * pq_dim * pq_bits / 8 /* NB: pq_dim * bitPQ % 8 == 0 */, - stream, - device_memory); + uint32_t pq_width = 1 << pq_bits; + rmm::device_uvector pq_centers_tmp(pq_len * pq_width, stream, device_memory); + rmm::device_uvector rot_vectors( + size_t(max_cluster_size) * size_t(rot_dim), stream, device_memory); + rmm::device_uvector sub_vectors( + size_t(max_cluster_size) * size_t(pq_dim * pq_len), stream, device_memory); + rmm::device_uvector sub_vector_labels( + size_t(max_cluster_size) * size_t(pq_dim), stream, device_memory); for (uint32_t l = 0; l < n_clusters; l++) { auto cluster_size = cluster_sizes[l]; @@ -339,7 +372,7 @@ void compute_pq_codes(const handle_t& handle, data_dim, rot_dim, rotation_matrix, - cluster_centers + uint64_t(l) * data_dim, + cluster_centers + size_t(l) * size_t(data_dim), dataset, data_indices + cluster_offsets[l], device_memory); @@ -351,37 +384,51 @@ void compute_pq_codes(const handle_t& handle, // output: sub_vectors[pq_dim, cluster_size, pq_len] // for (uint32_t i = 0; i < pq_dim; i++) { - RAFT_CUDA_TRY(cudaMemcpy2DAsync(sub_vectors.data() + i * pq_len * cluster_size, - sizeof(float) * pq_len, - rot_vectors.data() + i * pq_len, - sizeof(float) * rot_dim, - sizeof(float) * pq_len, - cluster_size, - cudaMemcpyDefault, - stream)); + RAFT_CUDA_TRY( + cudaMemcpy2DAsync(sub_vectors.data() + size_t(i) * size_t(pq_len) * size_t(cluster_size), + sizeof(float) * pq_len, + rot_vectors.data() + i * pq_len, + sizeof(float) * rot_dim, + sizeof(float) * pq_len, + cluster_size, + cudaMemcpyDefault, + stream)); + } + + if (codebook_kind == codebook_gen::PER_CLUSTER) { + linalg::writeOnlyUnaryOp( + pq_centers_tmp.data(), + pq_len * pq_width, + [pq_centers, pq_width, pq_len, l] __device__(float* out, uint32_t i) { + auto i0 = i / pq_len; + auto i1 = i % pq_len; + *out = pq_centers(l, i1, i0); + }, + stream); } // // Find a label (cluster ID) for each vector subspace. // for (uint32_t j = 0; j < pq_dim; j++) { - const float* sub_pq_centers = nullptr; - switch (codebook_kind) { - case codebook_gen::PER_SUBSPACE: - sub_pq_centers = pq_centers + ((1 << pq_bits) * pq_len) * j; - break; - case codebook_gen::PER_CLUSTER: - sub_pq_centers = pq_centers + ((1 << pq_bits) * pq_len) * l; - break; - default: RAFT_FAIL("Unreachable code"); + if (codebook_kind == codebook_gen::PER_SUBSPACE) { + linalg::writeOnlyUnaryOp( + pq_centers_tmp.data(), + pq_len * pq_width, + [pq_centers, pq_width, pq_len, j] __device__(float* out, uint32_t i) { + auto i0 = i / pq_len; + auto i1 = i % pq_len; + *out = pq_centers(j, i1, i0); + }, + stream); } kmeans::predict(handle, - sub_pq_centers, - (1 << pq_bits), + pq_centers_tmp.data(), + pq_width, pq_len, - sub_vectors.data() + j * (cluster_size * pq_len), + sub_vectors.data() + size_t(j) * size_t(cluster_size) * size_t(pq_len), cluster_size, - sub_vector_labels.data() + j * cluster_size, + sub_vector_labels.data() + size_t(j) * size_t(cluster_size), raft::distance::DistanceType::L2Expanded, stream, device_memory); @@ -391,11 +438,14 @@ void compute_pq_codes(const handle_t& handle, // PQ encoding // ivfpq_encode( - cluster_size, pq_dim, pq_bits, sub_vector_labels.data(), my_pq_dataset.data(), stream); - copy(pq_dataset + cluster_offsets[l] * uint64_t{pq_dim * pq_bits / 8}, - my_pq_dataset.data(), - cluster_size * pq_dim * pq_bits / 8, - stream); + pq_dim, + pq_bits, + sub_vector_labels.data(), + make_mdspan( + pq_dataset.data_handle() + + size_t(cluster_offsets[l]) * pq_dataset.extent(1) * pq_dataset.extent(2), + make_extents(cluster_size, pq_dataset.extent(1), pq_dataset.static_extent(2))), + stream); } } @@ -405,7 +455,7 @@ __launch_bounds__(BlockDim) __global__ void fill_indices_kernel(IdxT n_rows, IdxT* data_offsets, const uint32_t* labels) { - const auto i = BlockDim * IdxT(blockIdx.x) + IdxT(threadIdx.x); + const auto i = IdxT(BlockDim) * IdxT(blockIdx.x) + IdxT(threadIdx.x); if (i >= n_rows) { return; } data_indices[atomicAdd(data_offsets + labels[i], 1)] = i; } @@ -453,10 +503,36 @@ auto calculate_offsets_and_indices(IdxT n_rows, return max_cluster_size; } +template +void transpose_pq_centers(index& index, + const float* pq_centers_source, + rmm::cuda_stream_view stream) +{ + auto extents = index.pq_centers().extents(); + static_assert(extents.rank() == 3); + auto extents_source = + make_extents(extents.extent(0), extents.extent(2), extents.extent(1)); + auto span_source = + make_mdspan(pq_centers_source, extents_source); + linalg::writeOnlyUnaryOp( + index.pq_centers().data_handle(), + index.pq_centers().size(), + [span_source, extents] __device__(float* out, size_t i) { + uint32_t ii[3]; + for (int r = 2; r > 0; r--) { + ii[r] = i % extents.extent(r); + i /= extents.extent(r); + } + ii[0] = i; + *out = span_source(ii[0], ii[2], ii[1]); + }, + stream); +} + template void train_per_subset(const handle_t& handle, index& index, - IdxT n_rows, + size_t n_rows, const float* trainset, // [n_rows, dim] const uint32_t* labels, // [n_rows] uint32_t kmeans_n_iters, @@ -465,7 +541,8 @@ void train_per_subset(const handle_t& handle, { auto stream = handle.get_stream(); - rmm::device_uvector sub_trainset(n_rows * index.pq_len(), stream, device_memory); + rmm::device_uvector pq_centers_tmp(index.pq_centers().size(), stream, device_memory); + rmm::device_uvector sub_trainset(n_rows * size_t(index.pq_len()), stream, device_memory); rmm::device_uvector sub_labels(n_rows, stream, device_memory); rmm::device_uvector pq_cluster_sizes(index.pq_book_size(), stream, device_memory); @@ -476,14 +553,15 @@ void train_per_subset(const handle_t& handle, // Get the rotated cluster centers for each training vector. // This will be subtracted from the input vectors afterwards. - utils::copy_selected(n_rows, - (IdxT)index.pq_len(), - index.centers_rot().data_handle() + index.pq_len() * j, - labels, - (IdxT)index.rot_dim(), - sub_trainset.data(), - (IdxT)index.pq_len(), - stream); + utils::copy_selected( + n_rows, + index.pq_len(), + index.centers_rot().data_handle() + index.pq_len() * j, + labels, + index.rot_dim(), + sub_trainset.data(), + index.pq_len(), + stream); // sub_trainset is the slice of: rotate(trainset) - centers_rot float alpha = 1.0; @@ -505,26 +583,26 @@ void train_per_subset(const handle_t& handle, stream); // train PQ codebook for this subspace - kmeans::build_clusters( - handle, - kmeans_n_iters, - index.pq_len(), - sub_trainset.data(), - n_rows, - index.pq_book_size(), - index.pq_centers().data_handle() + (index.pq_book_size() * index.pq_len()) * j, - sub_labels.data(), - pq_cluster_sizes.data(), - raft::distance::DistanceType::L2Expanded, - stream, - device_memory); + kmeans::build_clusters(handle, + kmeans_n_iters, + index.pq_len(), + sub_trainset.data(), + n_rows, + index.pq_book_size(), + pq_centers_tmp.data() + index.pq_book_size() * index.pq_len() * j, + sub_labels.data(), + pq_cluster_sizes.data(), + raft::distance::DistanceType::L2Expanded, + stream, + device_memory); } + transpose_pq_centers(index, pq_centers_tmp.data(), stream); } template void train_per_cluster(const handle_t& handle, index& index, - IdxT n_rows, + size_t n_rows, const float* trainset, // [n_rows, dim] const uint32_t* labels, // [n_rows] uint32_t kmeans_n_iters, @@ -532,26 +610,30 @@ void train_per_cluster(const handle_t& handle, rmm::mr::device_memory_resource* device_memory) { auto stream = handle.get_stream(); + + rmm::device_uvector pq_centers_tmp(index.pq_centers().size(), stream, device_memory); rmm::device_uvector cluster_sizes(index.n_lists(), stream, managed_memory); rmm::device_uvector indices_buf(n_rows, stream, device_memory); rmm::device_uvector offsets_buf(index.list_offsets().size(), stream, managed_memory); - raft::stats::histogram(raft::stats::HistTypeAuto, - reinterpret_cast(cluster_sizes.data()), - IdxT(index.n_lists()), - labels, - n_rows, - 1, - stream); + raft::stats::histogram(raft::stats::HistTypeAuto, + reinterpret_cast(cluster_sizes.data()), + index.n_lists(), + labels, + n_rows, + 1, + stream); auto cluster_offsets = offsets_buf.data(); auto indices = indices_buf.data(); uint32_t max_cluster_size = calculate_offsets_and_indices( - n_rows, index.n_lists(), labels, cluster_sizes.data(), cluster_offsets, indices, stream); + IdxT(n_rows), index.n_lists(), labels, cluster_sizes.data(), cluster_offsets, indices, stream); - rmm::device_uvector pq_labels(max_cluster_size * index.pq_dim(), stream, device_memory); + rmm::device_uvector pq_labels( + size_t(max_cluster_size) * size_t(index.pq_dim()), stream, device_memory); rmm::device_uvector pq_cluster_sizes(index.pq_book_size(), stream, device_memory); - rmm::device_uvector rot_vectors(max_cluster_size * index.rot_dim(), stream, device_memory); + rmm::device_uvector rot_vectors( + size_t(max_cluster_size) * size_t(index.rot_dim()), stream, device_memory); handle.sync_stream(); // make sure cluster offsets are up-to-date for (uint32_t l = 0; l < index.n_lists(); l++) { @@ -566,15 +648,15 @@ void train_per_cluster(const handle_t& handle, index.dim(), index.rot_dim(), index.rotation_matrix().data_handle(), - index.centers().data_handle() + uint64_t(l) * index.dim_ext(), + index.centers().data_handle() + size_t(l) * size_t(index.dim_ext()), trainset, indices + cluster_offsets[l], device_memory); // limit the cluster size to bound the training time. // [sic] we interpret the data as pq_len-dimensional - size_t big_enough = 256 * std::max(index.pq_book_size(), index.pq_dim()); - size_t available_rows = cluster_size * index.pq_dim(); + size_t big_enough = 256ul * std::max(index.pq_book_size(), index.pq_dim()); + size_t available_rows = size_t(cluster_size) * size_t(index.pq_dim()); auto pq_n_rows = uint32_t(std::min(big_enough, available_rows)); // train PQ codebook for this cluster kmeans::build_clusters( @@ -584,22 +666,27 @@ void train_per_cluster(const handle_t& handle, rot_vectors.data(), pq_n_rows, index.pq_book_size(), - index.pq_centers().data_handle() + index.pq_book_size() * index.pq_len() * l, + pq_centers_tmp.data() + size_t(index.pq_book_size()) * size_t(index.pq_len()) * size_t(l), pq_labels.data(), pq_cluster_sizes.data(), raft::distance::DistanceType::L2Expanded, stream, device_memory); } + transpose_pq_centers(index, pq_centers_tmp.data(), stream); } -/** See raft::spatial::knn::ivf_pq::extend docs */ +/** + * See raft::spatial::knn::ivf_pq::extend docs. + * + * This version requires `new_vectors` and `new_indices` (if non-null) to be on-device. + */ template -inline auto extend(const handle_t& handle, - const index& orig_index, - const T* new_vectors, - const IdxT* new_indices, - IdxT n_rows) -> index +inline auto extend_device(const handle_t& handle, + const index& orig_index, + const T* new_vectors, + const IdxT* new_indices, + IdxT n_rows) -> index { common::nvtx::range fun_scope( "ivf_pq::extend(%zu, %u)", size_t(n_rows), orig_index.dim()); @@ -611,6 +698,13 @@ inline auto extend(const handle_t& handle, static_assert(std::is_same_v || std::is_same_v || std::is_same_v, "Unsupported data type"); + switch (new_indices != nullptr ? utils::check_pointer_residency(new_vectors, new_indices) + : utils::check_pointer_residency(new_vectors)) { + case utils::pointer_residency::device_only: + case utils::pointer_residency::host_and_device: break; + default: RAFT_FAIL("[ivf_pq::extend_device] The added data must be available on device."); + } + rmm::mr::device_memory_resource* device_memory = nullptr; auto pool_guard = raft::get_pool_memory_resource(device_memory, 1024 * 1024); if (pool_guard) { @@ -629,7 +723,8 @@ inline auto extend(const handle_t& handle, // const auto n_clusters = orig_index.n_lists(); - rmm::device_uvector cluster_centers(n_clusters * orig_index.dim(), stream, device_memory); + rmm::device_uvector cluster_centers( + size_t(n_clusters) * size_t(orig_index.dim()), stream, device_memory); RAFT_CUDA_TRY(cudaMemcpy2DAsync(cluster_centers.data(), sizeof(float) * orig_index.dim(), orig_index.centers().data_handle(), @@ -683,8 +778,9 @@ inline auto extend(const handle_t& handle, // // Compute PQ code for new vectors // - rmm::device_uvector new_pq_codes( - n_rows * orig_index.pq_dim() * orig_index.pq_bits() / 8, stream, device_memory); + pq_codes_exts new_pq_exts = make_extents( + n_rows, orig_index.pq_dataset().extent(1), orig_index.pq_dataset().static_extent(3)); + auto new_pq_codes = make_device_mdarray(handle, device_memory, new_pq_exts); compute_pq_codes(handle, n_rows, orig_index.dim(), @@ -701,8 +797,8 @@ inline auto extend(const handle_t& handle, new_data_indices.data(), new_cluster_sizes, new_cluster_offsets.data(), - orig_index.pq_centers().data_handle(), - new_pq_codes.data(), + orig_index.pq_centers(), + new_pq_codes.view(), device_memory); // Get the combined cluster sizes and sort the clusters in decreasing order @@ -711,31 +807,27 @@ inline auto extend(const handle_t& handle, rmm::device_uvector ext_cluster_sizes_buf(n_clusters, stream, &managed_memory); rmm::device_uvector old_cluster_offsets_buf(n_clusters + 1, stream, &managed_memory); rmm::device_uvector ext_cluster_offsets_buf(n_clusters + 1, stream, &managed_memory); - rmm::device_uvector cluster_ordering(n_clusters, stream, &managed_memory); + rmm::device_uvector cluster_ordering_buf(n_clusters, stream, &managed_memory); auto old_cluster_sizes = old_cluster_sizes_buf.data(); auto ext_cluster_sizes = ext_cluster_sizes_buf.data(); auto old_cluster_offsets = old_cluster_offsets_buf.data(); auto ext_cluster_offsets = ext_cluster_offsets_buf.data(); + auto cluster_ordering = cluster_ordering_buf.data(); copy(old_cluster_offsets, orig_index.list_offsets().data_handle(), orig_index.list_offsets().size(), stream); + copy(old_cluster_sizes, + orig_index.list_sizes().data_handle(), + orig_index.list_sizes().size(), + stream); uint32_t n_nonempty_lists = 0; { rmm::device_uvector ext_cluster_sizes_buf_in(n_clusters, stream, device_memory); rmm::device_uvector cluster_ordering_in(n_clusters, stream, device_memory); auto ext_cluster_sizes_in = ext_cluster_sizes_buf_in.data(); - linalg::writeOnlyUnaryOp( - old_cluster_sizes, - n_clusters, - [ext_cluster_sizes_in, new_cluster_sizes, old_cluster_offsets] __device__(uint32_t * out, - size_t i) { - auto old_size = old_cluster_offsets[i + 1] - old_cluster_offsets[i]; - ext_cluster_sizes_in[i] = old_size + new_cluster_sizes[i]; - *out = old_size; - }, - stream); + linalg::add(ext_cluster_sizes_in, old_cluster_sizes, new_cluster_sizes, n_clusters, stream); thrust::sequence(handle.get_thrust_policy(), cluster_ordering_in.data(), @@ -749,7 +841,7 @@ inline auto extend(const handle_t& handle, ext_cluster_sizes_in, ext_cluster_sizes, cluster_ordering_in.data(), - cluster_ordering.data(), + cluster_ordering, n_clusters, begin_bit, end_bit, @@ -760,7 +852,7 @@ inline auto extend(const handle_t& handle, ext_cluster_sizes_in, ext_cluster_sizes, cluster_ordering_in.data(), - cluster_ordering.data(), + cluster_ordering, n_clusters, begin_bit, end_bit, @@ -775,43 +867,49 @@ inline auto extend(const handle_t& handle, } // Assemble the extended index - ivf_pq::index ext_index(handle, - orig_index.metric(), - orig_index.codebook_kind(), - n_clusters, - orig_index.dim(), - orig_index.pq_bits(), - orig_index.pq_dim(), - n_nonempty_lists); - ext_index.allocate(handle, orig_index.size() + n_rows); - - // Copy the unchanged parts - copy(ext_index.rotation_matrix().data_handle(), - orig_index.rotation_matrix().data_handle(), - orig_index.rotation_matrix().size(), - stream); - + index ext_index(handle, + orig_index.metric(), + orig_index.codebook_kind(), + n_clusters, + orig_index.dim(), + orig_index.pq_bits(), + orig_index.pq_dim(), + n_nonempty_lists); // calculate extended cluster offsets - auto ext_indices = ext_index.indices().data_handle(); { - IdxT zero = 0; - update_device(ext_cluster_offsets, &zero, 1, stream); - thrust::inclusive_scan(handle.get_thrust_policy(), - ext_cluster_sizes, - ext_cluster_sizes + n_clusters, - ext_cluster_offsets + 1, - [] __device__(IdxT s, uint32_t l) { return s + l; }); + using group_align = Pow2; + IdxT size = 0; + update_device(ext_cluster_offsets, &size, 1, stream); + thrust::inclusive_scan( + handle.get_thrust_policy(), + ext_cluster_sizes, + ext_cluster_sizes + n_clusters, + ext_cluster_offsets + 1, + [] __device__(IdxT a, IdxT b) { return group_align::roundUp(a) + group_align::roundUp(b); }); + update_host(&size, ext_cluster_offsets + n_clusters, 1, stream); + handle.sync_stream(); copy(ext_index.list_offsets().data_handle(), ext_cluster_offsets, ext_index.list_offsets().size(), stream); + copy(ext_index.list_sizes().data_handle(), + ext_cluster_sizes, + ext_index.list_sizes().size(), + stream); + ext_index.allocate(handle, size); } + // Copy the unchanged parts + copy(ext_index.rotation_matrix().data_handle(), + orig_index.rotation_matrix().data_handle(), + orig_index.rotation_matrix().size(), + stream); + // copy cluster-ordering-dependent data utils::copy_selected(n_clusters, ext_index.dim_ext(), orig_index.centers().data_handle(), - cluster_ordering.data(), + cluster_ordering, orig_index.dim_ext(), ext_index.centers().data_handle(), ext_index.dim_ext(), @@ -819,7 +917,7 @@ inline auto extend(const handle_t& handle, utils::copy_selected(n_clusters, ext_index.rot_dim(), orig_index.centers_rot().data_handle(), - cluster_ordering.data(), + cluster_ordering, orig_index.rot_dim(), ext_index.centers_rot().data_handle(), ext_index.rot_dim(), @@ -836,7 +934,7 @@ inline auto extend(const handle_t& handle, utils::copy_selected(n_clusters, d, orig_index.pq_centers().data_handle(), - cluster_ordering.data(), + cluster_ordering, d, ext_index.pq_centers().data_handle(), d, @@ -847,8 +945,9 @@ inline auto extend(const handle_t& handle, // Make ext_indices handle.sync_stream(); // make sure cluster sizes are up-to-date + auto ext_indices = ext_index.indices().data_handle(); for (uint32_t l = 0; l < ext_index.n_lists(); l++) { - auto k = cluster_ordering.data()[l]; + auto k = cluster_ordering[l]; auto old_cluster_size = old_cluster_sizes[k]; auto new_cluster_size = new_cluster_sizes[k]; if (old_cluster_size > 0) { @@ -878,27 +977,91 @@ inline auto extend(const handle_t& handle, } /* Extend the pq_dataset */ - auto ext_pq_dataset = ext_index.pq_dataset().data_handle(); - size_t pq_dataset_unit = ext_index.pq_dim() * ext_index.pq_bits() / 8; + // For simplicity and performance, we reinterpret the last dimension of the dataset + // as a single vector element. + using vec_t = TxN_t::io_t; + + auto data_unit = ext_index.pq_dataset().extent(1); + auto ext_pq_dataset = make_mdspan( + reinterpret_cast(ext_index.pq_dataset().data_handle()), + make_extents( + ext_index.pq_dataset().extent(0), data_unit, ext_index.pq_dataset().extent(2))); + for (uint32_t l = 0; l < ext_index.n_lists(); l++) { - auto k = cluster_ordering.data()[l]; + // Extend the data cluster-by-cluster; + // The original/old index stores the data interleaved; + // the new data produced by `compute_pq_codes` is not interleaved. + auto k = cluster_ordering[l]; auto old_cluster_size = old_cluster_sizes[k]; - copy(ext_pq_dataset + pq_dataset_unit * ext_cluster_offsets[l], - orig_index.pq_dataset().data_handle() + pq_dataset_unit * old_cluster_offsets[k], - pq_dataset_unit * old_cluster_size, - stream); - copy(ext_pq_dataset + pq_dataset_unit * (ext_cluster_offsets[l] + old_cluster_size), - new_pq_codes.data() + pq_dataset_unit * new_cluster_offsets.data()[k], - pq_dataset_unit * new_cluster_sizes[k], - stream); + auto old_pq_dataset = make_mdspan( + reinterpret_cast(orig_index.pq_dataset().data_handle()) + + data_unit * old_cluster_offsets[k], + make_extents(div_rounding_up_safe(old_cluster_size, kIndexGroupSize), + data_unit, + ext_pq_dataset.extent(2))); + auto new_pq_data = make_mdspan( + reinterpret_cast(new_pq_codes.data_handle()) + + data_unit * new_cluster_offsets.data()[k], + make_extents(new_cluster_sizes[k], data_unit)); + // Write all cluster data, vec-by-vec + linalg::writeOnlyUnaryOp( + ext_pq_dataset.data_handle() + data_unit * ext_cluster_offsets[l], + data_unit * size_t(ext_cluster_offsets[l + 1] - ext_cluster_offsets[l]), + [old_pq_dataset, new_pq_data, old_cluster_size] __device__(vec_t * out, size_t i_flat) { + // find the proper 3D index from the flat offset + size_t i[3]; + for (int r = 2; r > 0; r--) { + i[r] = i_flat % old_pq_dataset.extent(r); + i_flat /= old_pq_dataset.extent(r); + } + i[0] = i_flat; + auto row_ix = i[0] * old_pq_dataset.extent(2) + i[2]; + if (row_ix < old_cluster_size) { + // First, pack the original/old data + *out = old_pq_dataset(i[0], i[1], i[2]); + } else { + // Then add the new data + row_ix -= old_cluster_size; + if (row_ix < new_pq_data.extent(0)) { + *out = new_pq_data(row_ix, i[1]); + } else { + *out = vec_t{}; + } + } + }, + stream); } return ext_index; } -/** See raft::spatial::knn::ivf_pq::build docs */ +/** See raft::spatial::knn::ivf_pq::extend docs */ template -inline auto build( +inline auto extend(const handle_t& handle, + const index& orig_index, + const T* new_vectors, + const IdxT* new_indices, + IdxT n_rows) -> index +{ + size_t vec_size = sizeof(T) * size_t(n_rows) * size_t(orig_index.dim()); + size_t ind_size = sizeof(IdxT) * size_t(n_rows); + return utils::with_mapped_memory_t{ + new_vectors, vec_size, [&](const T* new_vectors_dev) { + return utils::with_mapped_memory_t{ + new_indices, ind_size, [&](const IdxT* new_indices_dev) { + return extend_device( + handle, orig_index, new_vectors_dev, new_indices_dev, n_rows); + }}(); + }}(); +} + +/** + * See raft::spatial::knn::ivf_pq::build docs. + * + * This version requires `dataset` to be on-device. + */ +template +inline auto build_device( const handle_t& handle, const index_params& params, const T* dataset, IdxT n_rows, uint32_t dim) -> index { @@ -909,14 +1072,22 @@ inline auto build( RAFT_EXPECTS(n_rows > 0 && dim > 0, "empty dataset"); + switch (utils::check_pointer_residency(dataset)) { + case utils::pointer_residency::device_only: + case utils::pointer_residency::host_and_device: break; + default: RAFT_FAIL("[ivf_pq::build_device] The dataset pointer must be available on device."); + } + auto stream = handle.get_stream(); - ivf_pq::index index(handle, params, dim); + index index(handle, params, dim); utils::memzero(index.list_offsets().data_handle(), index.list_offsets().size(), stream); + utils::memzero(index.list_sizes().data_handle(), index.list_sizes().size(), stream); - auto trainset_ratio = std::max( - 1, n_rows / std::max(params.kmeans_trainset_fraction * n_rows, index.n_lists())); - auto n_rows_train = n_rows / trainset_ratio; + auto trainset_ratio = std::max( + 1, + size_t(n_rows) / std::max(params.kmeans_trainset_fraction * n_rows, index.n_lists())); + size_t n_rows_train = n_rows / trainset_ratio; rmm::mr::device_memory_resource* device_memory = nullptr; auto pool_guard = raft::get_pool_memory_resource(device_memory, 1024 * 1024); @@ -929,9 +1100,21 @@ inline auto build( rmm::mr::pool_memory_resource managed_memory( &managed_memory_upstream, 1024 * 1024); + // If the trainset is small enough to comfortably fit into device memory, put it there. + // Otherwise, use the managed memory. + rmm::mr::device_memory_resource* big_memory_resource = &managed_memory; + { + size_t free_mem, total_mem; + constexpr size_t kTolerableRatio = 4; + RAFT_CUDA_TRY(cudaMemGetInfo(&free_mem, &total_mem)); + if (sizeof(float) * n_rows_train * index.dim() * kTolerableRatio < free_mem) { + big_memory_resource = device_memory; + } + } + // Besides just sampling, we transform the input dataset into floats to make it easier // to use gemm operations from cublas. - rmm::device_uvector trainset(n_rows_train * index.dim(), stream, device_memory); + rmm::device_uvector trainset(n_rows_train * index.dim(), stream, big_memory_resource); // TODO: a proper sampling if constexpr (std::is_same_v) { RAFT_CUDA_TRY(cudaMemcpy2DAsync(trainset.data(), @@ -946,10 +1129,10 @@ inline auto build( auto dim = index.dim(); linalg::writeOnlyUnaryOp( trainset.data(), - index.dim() * n_rows_train, + size_t(index.dim()) * n_rows_train, [dataset, trainset_ratio, dim] __device__(float* out, size_t i) { auto col = i % dim; - *out = utils::mapping{}(dataset[(i - col) * trainset_ratio + col]); + *out = utils::mapping{}(dataset[(i - col) * size_t(trainset_ratio) + col]); }, stream); } @@ -971,7 +1154,7 @@ inline auto build( stream); // Trainset labels are needed for training PQ codebooks - rmm::device_uvector labels(n_rows_train, stream, device_memory); + rmm::device_uvector labels(n_rows_train, stream, big_memory_resource); kmeans::predict(handle, cluster_centers, index.n_lists(), @@ -1060,10 +1243,23 @@ inline auto build( // add the data if necessary if (params.add_data_on_build) { - return detail::extend(handle, index, dataset, nullptr, n_rows); + return detail::extend_device(handle, index, dataset, nullptr, n_rows); } else { return index; } } +/** See raft::spatial::knn::ivf_pq::build docs */ +template +inline auto build( + const handle_t& handle, const index_params& params, const T* dataset, IdxT n_rows, uint32_t dim) + -> index +{ + size_t data_size = sizeof(T) * size_t(n_rows) * size_t(dim); + return utils::with_mapped_memory_t{dataset, data_size, [&](const T* dataset_dev) { + return build_device( + handle, params, dataset_dev, n_rows, dim); + }}(); +} + } // namespace raft::spatial::knn::ivf_pq::detail diff --git a/cpp/include/raft/spatial/knn/detail/ivf_pq_search.cuh b/cpp/include/raft/spatial/knn/detail/ivf_pq_search.cuh index a4c7ce1fe5..0ff659ae5d 100644 --- a/cpp/include/raft/spatial/knn/detail/ivf_pq_search.cuh +++ b/cpp/include/raft/spatial/knn/detail/ivf_pq_search.cuh @@ -16,11 +16,12 @@ #pragma once -#include "../ivf_pq_types.hpp" #include "ann_utils.cuh" #include "topk.cuh" #include "topk/warpsort_topk.cuh" +#include + #include #include #include @@ -38,6 +39,7 @@ #include #include +#include #include #include @@ -56,6 +58,12 @@ static_assert((kMaxCapacity >= 32) && !(kMaxCapacity & (kMaxCapacity - 1)), using namespace raft::spatial::knn::detail; // NOLINT +using raft::neighbors::ivf_pq::codebook_gen; +using raft::neighbors::ivf_pq::index; +using raft::neighbors::ivf_pq::kIndexGroupSize; +using raft::neighbors::ivf_pq::kIndexGroupVecLen; +using raft::neighbors::ivf_pq::search_params; + /** 8-bit floating-point storage type. * * This is a custom type for the current IVF-PQ implementation. No arithmetic operations defined @@ -78,6 +86,7 @@ struct fp_8bit { return *this; } HDI explicit operator float() const { return fp_8bit2float(*this); } + HDI explicit operator half() const { return half(fp_8bit2float(*this)); } private: static constexpr float kMin = 1.0f / float(1u << ExpMask); @@ -226,10 +235,10 @@ void select_clusters(const handle_t& handle, * in chunk_indices. Essentially this is a segmented inclusive scan of the cluster sizes. The total * number of samples per query (sum of the cluster sizes that we probe) is returned in n_samples. */ -template +template __launch_bounds__(BlockDim) __global__ void calc_chunk_indices_kernel(uint32_t n_probes, - const IdxT* cluster_offsets, // [n_clusters + 1] + const uint32_t* cluster_sizes, // [n_clusters] const uint32_t* clusters_to_probe, // [n_queries, n_probes] uint32_t* chunk_indices, // [n_queries, n_probes] uint32_t* n_samples // [n_queries] @@ -247,9 +256,7 @@ __launch_bounds__(BlockDim) __global__ uint32_t total = 0; for (uint32_t probe_ix = threadIdx.x; probe_ix < n_probes_aligned; probe_ix += BlockDim) { auto label = probe_ix < n_probes ? clusters_to_probe[probe_ix] : 0u; - auto chunk = probe_ix < n_probes - ? static_cast(cluster_offsets[label + 1] - cluster_offsets[label]) - : 0u; + auto chunk = probe_ix < n_probes ? cluster_sizes[label] : 0u; if (threadIdx.x == 0) { chunk += total; } block_scan(shm).InclusiveSum(chunk, chunk, total); __syncthreads(); @@ -259,7 +266,6 @@ __launch_bounds__(BlockDim) __global__ if (threadIdx.x == 0) { n_samples[blockIdx.x] = total; } } -template struct calc_chunk_indices { public: struct configured { @@ -268,19 +274,19 @@ struct calc_chunk_indices { dim3 grid_dim; uint32_t n_probes; - void operator()(const IdxT* cluster_offsets, - const uint32_t* clusters_to_probe, - uint32_t* chunk_indices, - uint32_t* n_samples, - rmm::cuda_stream_view stream) + inline void operator()(const uint32_t* cluster_sizes, + const uint32_t* clusters_to_probe, + uint32_t* chunk_indices, + uint32_t* n_samples, + rmm::cuda_stream_view stream) { void* args[] = // NOLINT - {&n_probes, &cluster_offsets, &clusters_to_probe, &chunk_indices, &n_samples}; + {&n_probes, &cluster_sizes, &clusters_to_probe, &chunk_indices, &n_samples}; RAFT_CUDA_TRY(cudaLaunchKernel(kernel, grid_dim, block_dim, args, 0, stream)); } }; - static auto configure(uint32_t n_probes, uint32_t n_queries) -> configured + static inline auto configure(uint32_t n_probes, uint32_t n_queries) -> configured { return try_block_dim<1024>(n_probes, n_queries); } @@ -292,7 +298,7 @@ struct calc_chunk_indices { if constexpr (BlockDim >= WarpSize * 2) { if (BlockDim >= n_probes * 2) { return try_block_dim<(BlockDim / 2)>(n_probes, n_queries); } } - return {reinterpret_cast(calc_chunk_indices_kernel), + return {reinterpret_cast(calc_chunk_indices_kernel), dim3(BlockDim, 1, 1), dim3(n_queries, 1, 1), n_probes}; @@ -321,7 +327,7 @@ __device__ auto find_db_row(IdxT& x, // NOLINT uint32_t ix_max = n_probes; do { uint32_t i = (ix_min + ix_max) / 2; - if (IdxT(chunk_indices[i]) < x) { + if (IdxT(chunk_indices[i]) <= x) { ix_min = i + 1; } else { ix_max = i; @@ -359,7 +365,7 @@ __launch_bounds__(BlockDim) __global__ clusters_to_probe + n_probes * query_ix, chunk_indices + n_probes * query_ix); } - neighbors[k] = valid ? db_indices[data_ix] : std::numeric_limits::max(); + neighbors[k] = valid ? db_indices[data_ix] : index::kOutOfBoundsRecord; } /** @@ -444,68 +450,16 @@ void postprocess_distances(float* out, // [n_queries, topk] } } -/** - * @brief Compute the similarity score between a vector from `pq_dataset` and a query vector. - * - * @tparam OpT an unsigned integer type that is used for bit operations on multiple PQ codes - * at once; it's selected to maximize throughput while matching criteria: - * 1. `pq_bits * vec_len % 8 * sizeof(OpT) == 0`. - * 2. `pq_dim % vec_len == 0` - * - * @tparam LutT type of the elements in the lookup table. - * - * @param pq_bits The bit length of an encoded vector element after compression by PQ - * @param vec_len == 8 * sizeof(OpT) / gcd(8 * sizeof(OpT), pq_bits) - * @param pq_dim - * @param[in] pq_code_ptr - * a device pointer to the dataset at the indexed position (`pq_dim * pq_bits` bits-wide) - * @param[in] lut_scores - * a device or shared memory pointer to the lookup table [pq_dim, pq_book_size] - * - * @return the score for the entry `data_ix` in the `pq_dataset`. - */ -template -__device__ auto ivfpq_compute_score( - uint32_t pq_bits, uint32_t vec_len, uint32_t pq_dim, const OpT* pq_head, const LutT* lut_scores) - -> float -{ - float score = 0.0; - constexpr uint32_t kBitsTotal = 8 * sizeof(OpT); - for (; pq_dim > 0; pq_dim -= vec_len) { - OpT pq_code = pq_head[0]; - pq_head++; - auto bits_left = kBitsTotal; - for (uint32_t k = 0; k < vec_len; k++) { - uint8_t code = pq_code; - if (bits_left > pq_bits) { - pq_code >>= pq_bits; - bits_left -= pq_bits; - } else { - if (k < vec_len - 1) { - pq_code = pq_head[0]; - pq_head++; - } - code |= (pq_code << bits_left); - pq_code >>= (pq_bits - bits_left); - bits_left += (kBitsTotal - pq_bits); - } - code &= (1 << pq_bits) - 1; - score += float(lut_scores[code]); - lut_scores += (1 << pq_bits); - } - } - return score; -} - template struct dummy_block_sort_t { - using queue_t = topk::warp_sort_immediate; - __device__ dummy_block_sort_t(int k, uint8_t* smem_buf){}; + using queue_t = topk::warp_sort_distributed; + template + __device__ dummy_block_sort_t(int k, uint8_t* smem_buf, Args...){}; }; template struct pq_block_sort { - using type = topk::block_sort; + using type = topk::block_sort; }; template @@ -516,6 +470,82 @@ struct pq_block_sort<0, T, IdxT> : dummy_block_sort_t { template using block_sort_t = typename pq_block_sort::type; +/* Manually unrolled loop over a chunk of pq_dataset that fits into one VecT. */ +template +__device__ __forceinline__ void ivfpq_compute_chunk(OutT& score /* NOLINT */, + typename VecT::math_t& pq_code, + const VecT& pq_codes, + const LutT*& lut_head, + const LutT*& lut_end) +{ + if constexpr (CheckBounds) { + if (lut_head >= lut_end) { return; } + } + constexpr uint32_t kTotalBits = 8 * sizeof(typename VecT::math_t); + constexpr uint32_t kPqShift = 1u << PqBits; + constexpr uint32_t kPqMask = kPqShift - 1u; + if constexpr (BitsLeft >= PqBits) { + uint8_t code = pq_code & kPqMask; + pq_code >>= PqBits; + score += OutT(lut_head[code]); + lut_head += kPqShift; + return ivfpq_compute_chunk( + score, pq_code, pq_codes, lut_head, lut_end); + } else if constexpr (Ix < VecT::Ratio) { + uint8_t code = pq_code; + pq_code = pq_codes.val.data[Ix]; + constexpr uint32_t kRemBits = PqBits - BitsLeft; + constexpr uint32_t kRemMask = (1u << kRemBits) - 1u; + code |= (pq_code & kRemMask) << BitsLeft; + pq_code >>= kRemBits; + score += OutT(lut_head[code]); + lut_head += kPqShift; + return ivfpq_compute_chunk(score, pq_code, pq_codes, lut_head, lut_end); + } +} + +/* Compute the similarity for one vector in the pq_dataset */ +template +__device__ auto ivfpq_compute_score(uint32_t pq_dim, + const typename VecT::io_t* pq_head, + const LutT* lut_scores, + OutT early_stop_limit) -> OutT +{ + constexpr uint32_t kChunkSize = sizeof(VecT) * 8u / PqBits; + auto lut_head = lut_scores; + auto lut_end = lut_scores + (pq_dim << PqBits); + VecT pq_codes; + OutT score{0}; + for (; pq_dim >= kChunkSize; pq_dim -= kChunkSize) { + *pq_codes.vectorized_data() = *pq_head; + pq_head += kIndexGroupSize; + typename VecT::math_t pq_code = 0; + ivfpq_compute_chunk( + score, pq_code, pq_codes, lut_head, lut_end); + // Early stop when it makes sense (otherwise early_stop_limit is kDummy/infinity). + if (score >= early_stop_limit) { return score; } + } + if (pq_dim > 0) { + *pq_codes.vectorized_data() = *pq_head; + typename VecT::math_t pq_code = 0; + ivfpq_compute_chunk( + score, pq_code, pq_codes, lut_head, lut_end); + } + return score; +} + /** * The main kernel that computes similarity scores across multiple queries and probes. * When `Capacity > 0`, it also selects top K candidates for each query and probe @@ -524,14 +554,15 @@ using block_sort_t = typename pq_block_sort::type; * Each block processes a (query, probe) pair: it calculates the distance between the single query * vector and all the dataset vector in the cluster that we are probing. * - * @tparam OpT is a carrier integer type selected to maximize throughput; - * Used solely in `ivfpq_compute_score`; * @tparam IdxT * The type of data indices * @tparam OutT * The output type - distances. * @tparam LutT * The lookup table element type (lut_scores). + * @tparam PqBits + * The bit length of an encoded vector element after compression by PQ + * (NB: pq_book_size = 1 << PqBits). * @tparam Capacity * Power-of-two; the maximum possible `k` in top-k. Value zero disables fused top-k search. * @tparam PrecompBaseDiff @@ -546,8 +577,6 @@ using block_sort_t = typename pq_block_sort::type; * @param n_rows the number of records in the dataset * @param dim the dimensionality of the data (NB: after rotation transform, i.e. `index.rot_dim()`). * @param n_probes the number of clusters to search for each query - * @param pq_bits the bit length of an encoded vector element after compression by PQ - * (NB: pq_book_size = 1 << pq_bits). * @param pq_dim * The dimensionality of an encoded vector after compression by PQ. * @param n_queries the number of queries. @@ -561,7 +590,7 @@ using block_sort_t = typename pq_block_sort::type; * The device pointer to the cluster centers in the PQ space * [pq_dim, pq_book_size, pq_len] or [n_clusters, pq_book_size, pq_len,]. * @param pq_dataset - * The device pointer to the PQ index (data) [n_rows, pq_dim * pq_bits / 8]. + * The device pointer to the PQ index (data) [n_rows, ...]. * @param cluster_offsets * The device pointer to the cluster offsets [n_clusters + 1]. * @param cluster_labels @@ -574,7 +603,7 @@ using block_sort_t = typename pq_block_sort::type; * An optional device pointer to the enforced order of search [n_queries, n_probes]. * One can pass reordered indices here to try to improve data reading locality. * @param lut_scores - * The device pointer for storing the lookup table globally [gridDim.x, pq_dim << pq_bits]. + * The device pointer for storing the lookup table globally [gridDim.x, pq_dim << PqBits]. * Ignored when `EnableSMemLut == true`. * @param _out_scores * The device pointer to the output scores @@ -583,75 +612,80 @@ using block_sort_t = typename pq_block_sort::type; * The device pointer to the output indices [n_queries, n_probes, topk]. * Ignored when `Capacity == 0`. */ -template -__launch_bounds__(1024) __global__ - void ivfpq_compute_similarity_kernel(uint32_t n_rows, - uint32_t dim, - uint32_t n_probes, - uint32_t pq_bits, - uint32_t pq_dim, - uint32_t n_queries, - distance::DistanceType metric, - codebook_gen codebook_kind, - uint32_t topk, - const float* cluster_centers, - const float* pq_centers, - const uint8_t* pq_dataset, - const IdxT* cluster_offsets, - const uint32_t* cluster_labels, - const uint32_t* _chunk_indices, - const float* queries, - const uint32_t* index_list, - LutT* lut_scores, - OutT* _out_scores, - IdxT* _out_indices) +__global__ void ivfpq_compute_similarity_kernel(uint32_t n_rows, + uint32_t dim, + uint32_t n_probes, + uint32_t pq_dim, + uint32_t n_queries, + distance::DistanceType metric, + codebook_gen codebook_kind, + uint32_t topk, + const float* cluster_centers, + const float* pq_centers, + const uint8_t* pq_dataset, + const IdxT* cluster_offsets, + const uint32_t* cluster_labels, + const uint32_t* _chunk_indices, + const float* queries, + const uint32_t* index_list, + float* query_kths, + LutT* lut_scores, + OutT* _out_scores, + IdxT* _out_indices) { /* Shared memory: - * lut_scores: lookup table (LUT) of size = `pq_dim << pq_bits` (when EnableSMemLut) - * base_diff: size = dim (which is equal to `pq_dim * pq_len`) + * lut_scores: lookup table (LUT) of size = `pq_dim << PqBits` (when EnableSMemLut) + * base_diff: size = dim (which is equal to `pq_dim * pq_len`) or dim*2 * topk::block_sort: some amount of shared memory, but overlaps with the rest: block_sort only needs shared memory for `.done()` operation, which can come very last. */ extern __shared__ __align__(256) uint8_t smem_buf[]; // NOLINT constexpr bool kManageLocalTopK = Capacity > 0; - constexpr uint32_t kOpBits = 8 * sizeof(OpT); - const uint32_t pq_len = dim / pq_dim; - const uint32_t vec_len = kOpBits / gcd(kOpBits, pq_bits); + constexpr uint32_t PqShift = 1u << PqBits; // NOLINT + constexpr uint32_t PqMask = PqShift - 1u; // NOLINT + + const uint32_t pq_len = dim / pq_dim; + const uint32_t lut_size = pq_dim * PqShift; if constexpr (EnableSMemLut) { lut_scores = reinterpret_cast(smem_buf); } else { - lut_scores += (pq_dim << pq_bits) * blockIdx.x; + lut_scores += lut_size * blockIdx.x; } float* base_diff = nullptr; if constexpr (PrecompBaseDiff) { if constexpr (EnableSMemLut) { - base_diff = reinterpret_cast(lut_scores + (pq_dim << pq_bits)); + base_diff = reinterpret_cast(lut_scores + lut_size); } else { base_diff = reinterpret_cast(smem_buf); } } for (int ib = blockIdx.x; ib < n_queries * n_probes; ib += gridDim.x) { + if (ib >= gridDim.x) { + // sync shared memory accesses on the second and further iterations + __syncthreads(); + } uint32_t query_ix; uint32_t probe_ix; if (index_list == nullptr) { query_ix = ib % n_queries; probe_ix = ib / n_queries; } else { - query_ix = index_list[ib] / n_probes; - probe_ix = index_list[ib] % n_probes; + auto ordered_ix = index_list[ib]; + query_ix = ordered_ix / n_probes; + probe_ix = ordered_ix % n_probes; } - if (query_ix >= n_queries || probe_ix >= n_probes) continue; const uint32_t* chunk_indices = _chunk_indices + (n_probes * query_ix); const float* query = queries + (dim * query_ix); @@ -672,103 +706,221 @@ __launch_bounds__(1024) __global__ if (codebook_kind == codebook_gen::PER_SUBSPACE) { pq_center = pq_centers; } else { - pq_center = pq_centers + (pq_len << pq_bits) * label; + pq_center = pq_centers + (pq_len << PqBits) * label; } if constexpr (PrecompBaseDiff) { - // Reduce computational complexity by pre-computing the difference - // between the cluster centroid and the query. - for (uint32_t i = threadIdx.x; i < dim; i += blockDim.x) { - base_diff[i] = query[i] - cluster_center[i]; - } - __syncthreads(); - } - - // Create a lookup table - // For each subspace, the lookup table stores the distance between the actual query vector - // (projected into the subspace) and all possible pq vectors in that subspace. - for (uint32_t i = threadIdx.x; i < (pq_dim << pq_bits); i += blockDim.x) { - uint32_t i_pq = i >> pq_bits; - uint32_t i_code = codebook_kind == codebook_gen::PER_CLUSTER ? i & ((1 << pq_bits) - 1) : i; - float score = 0.0; + // Reduce number of memory reads later by pre-computing parts of the score switch (metric) { case distance::DistanceType::L2Expanded: { - for (uint32_t j = 0; j < pq_len; j++) { - uint32_t k = j + (pq_len * i_pq); - float diff; - if constexpr (PrecompBaseDiff) { - diff = base_diff[k]; - } else { - diff = query[k] - cluster_center[k]; - } - diff -= pq_center[j + pq_len * i_code]; - score += diff * diff; + for (uint32_t i = threadIdx.x; i < dim; i += blockDim.x) { + base_diff[i] = query[i] - cluster_center[i]; } } break; case distance::DistanceType::InnerProduct: { - for (uint32_t j = 0; j < pq_len; j++) { - uint32_t k = j + (pq_len * i_pq); - score += query[k] * (cluster_center[k] + pq_center[j + pq_len * i_code]); + float2 pvals; + for (uint32_t i = threadIdx.x; i < dim; i += blockDim.x) { + pvals.x = query[i]; + pvals.y = cluster_center[i] * pvals.x; + reinterpret_cast(base_diff)[i] = pvals; } } break; + default: __builtin_unreachable(); } - lut_scores[i] = LutT(score); + __syncthreads(); } - uint32_t sample_offset = 0; - if (probe_ix > 0) { sample_offset = chunk_indices[probe_ix - 1]; } - uint32_t n_samples = chunk_indices[probe_ix] - sample_offset; - uint32_t n_samples32 = Pow2<32>::roundUp(n_samples); - IdxT cluster_offset = cluster_offsets[label]; + { + // Create a lookup table + // For each subspace, the lookup table stores the distance between the actual query vector + // (projected into the subspace) and all possible pq vectors in that subspace. + for (uint32_t i = threadIdx.x; i < lut_size; i += blockDim.x) { + const uint32_t i_pq = i >> PqBits; + uint32_t j = i_pq * pq_len; + const uint32_t j_end = pq_len + j; + auto cur_pq_center = pq_center + (i & PqMask) + + (codebook_kind == codebook_gen::PER_SUBSPACE ? j * PqShift : 0u); + float score = 0.0; + do { + float pq_c = *cur_pq_center; + cur_pq_center += PqShift; + switch (metric) { + case distance::DistanceType::L2Expanded: { + float diff; + if constexpr (PrecompBaseDiff) { + diff = base_diff[j]; + } else { + diff = query[j] - cluster_center[j]; + } + diff -= pq_c; + score += diff * diff; + } break; + case distance::DistanceType::InnerProduct: { + // NB: we negate the scores as we hardcoded select-topk to always compute the minimum + float q; + if constexpr (PrecompBaseDiff) { + float2 pvals = reinterpret_cast(base_diff)[j]; + q = pvals.x; + score -= pvals.y; + } else { + q = query[j]; + score -= q * cluster_center[j]; + } + score -= q * pq_c; + } break; + default: __builtin_unreachable(); + } + } while (++j < j_end); + lut_scores[i] = LutT(score); + } + } + // Define helper types for efficient access to the pq_dataset, which is stored in an interleaved + // format. The chunks of PQ data are stored in kIndexGroupVecLen-bytes-long chunks, interleaved + // in groups of kIndexGroupSize elems (which is normally equal to the warp size) for the fastest + // possible access by thread warps. + // + // Consider one record in the pq_dataset is `pq_dim * pq_bits`-bit-long. + // Assuming `kIndexGroupVecLen = 16`, one chunk of data read by a thread at once is 128-bits. + // Then, such a chunk contains `chunk_size = 128 / pq_bits` record elements, and the record + // consists of `ceildiv(pq_dim, chunk_size)` chunks. The chunks are interleaved in groups of 32, + // so that the warp can achieve the best coalesced read throughput. + using group_align = Pow2; + using vec_align = Pow2; using local_topk_t = block_sort_t; - local_topk_t block_topk(topk, smem_buf); + using op_t = uint32_t; + using vec_t = TxN_t; + + uint32_t sample_offset = 0; + if (probe_ix > 0) { sample_offset = chunk_indices[probe_ix - 1]; } + uint32_t n_samples = chunk_indices[probe_ix] - sample_offset; + uint32_t n_samples_aligned = group_align::roundUp(n_samples); + IdxT cluster_offset = cluster_offsets[label]; + constexpr uint32_t kChunkSize = (kIndexGroupVecLen * 8u) / PqBits; + uint32_t pq_line_width = div_rounding_up_unsafe(pq_dim, kChunkSize) * kIndexGroupVecLen; + auto pq_thread_data = + pq_dataset + (size_t(cluster_offset) + group_align::roundDown(threadIdx.x)) * pq_line_width + + group_align::mod(threadIdx.x) * vec_align::Value; + pq_line_width *= blockDim.x; + + constexpr OutT kDummy = upper_bound(); + OutT query_kth = kDummy; + if constexpr (kManageLocalTopK) { query_kth = OutT(query_kths[query_ix]); } + local_topk_t block_topk(topk, smem_buf, query_kth); + OutT early_stop_limit = kDummy; + switch (metric) { + // If the metric is non-negative, we can use the query_kth approximation as an early stop + // threshold to skip some iterations when computing the score. Add such metrics here. + case distance::DistanceType::L2Expanded: { + early_stop_limit = query_kth; + } break; + default: break; + } - // Ensure lut_scores is written by all threads before using it in ivfpq_compute_score + // Ensure lut_scores is written by all threads before using it in ivfpq-compute-score __threadfence_block(); __syncthreads(); // Compute a distance for each sample - const uint32_t pq_line_width = pq_dim * pq_bits / 8; - for (uint32_t i = threadIdx.x; i < n_samples32; i += blockDim.x) { - OutT score = local_topk_t::queue_t::kDummy; - if (i < n_samples) { - auto pq_ptr = - reinterpret_cast(pq_dataset + uint64_t(pq_line_width) * (cluster_offset + i)); - float fscore = ivfpq_compute_score(pq_bits, vec_len, pq_dim, pq_ptr, lut_scores); - switch (metric) { - // For similarity metrics, - // we negate the scores as we hardcoded select-topk to always take the minimum - case distance::DistanceType::InnerProduct: fscore = -fscore; break; - default: break; - } - if (fscore < float(score)) { score = OutT{fscore}; } + for (uint32_t i = threadIdx.x; i < n_samples_aligned; + i += blockDim.x, pq_thread_data += pq_line_width) { + OutT score = kDummy; + bool valid = i < n_samples; + if (valid) { + score = ivfpq_compute_score( + pq_dim, + reinterpret_cast(pq_thread_data), + lut_scores, + early_stop_limit); } if constexpr (kManageLocalTopK) { block_topk.add(score, cluster_offset + i); } else { - if (i < n_samples) { out_scores[i + sample_offset] = score; } + if (valid) { out_scores[i + sample_offset] = score; } } } - __syncthreads(); if constexpr (kManageLocalTopK) { - // sync threads before and after the topk merging operation, because we reuse smem_buf + // sync threads before the topk merging operation, because we reuse smem_buf + __syncthreads(); block_topk.done(); block_topk.store(out_scores, out_indices); - __syncthreads(); + if (threadIdx.x == 0) { atomicMin(query_kths + query_ix, float(out_scores[topk - 1])); } } else { // fill in the rest of the out_scores with dummy values uint32_t max_samples = uint32_t(Pow2<128>::roundUp(cluster_offsets[n_probes])); if (probe_ix + 1 == n_probes) { for (uint32_t i = threadIdx.x + sample_offset + n_samples; i < max_samples; i += blockDim.x) { - out_scores[i] = local_topk_t::queue_t::kDummy; + out_scores[i] = kDummy; } } } } } +/** + * An approximation to the number of times each cluster appears in a batched sample. + * + * If the pairs (probe_ix, query_ix) are sorted by the probe_ix, there is a good chance that + * the same probe_ix (cluster) is processed by several blocks on a single SM. This greatly + * increases the L1 cache hit rate (i.e. increases the data locality). + * + * This function gives an estimate of how many times a specific cluster may appear in the + * batch. Thus, it gives a practical limit to how many blocks should be active on the same SM + * to improve the L1 cache hit rate. + */ +constexpr inline auto expected_probe_coresidency(uint32_t n_clusters, + uint32_t n_probes, + uint32_t n_queries) -> uint32_t +{ + /* + Let say: + n = n_clusters + k = n_probes + m = n_queries + r = # of times a specific block appears in the batched sample. + + Then, r has the Binomial distribution (p = k / n): + P(r) = C(m,r) * k^r * (n - k)^(m - r) / n^m + E[r] = m * k / n + E[r | r > 0] = m * k / n / (1 - (1 - k/n)^m) + + The latter can be approximated by a much simpler formula, assuming (k / n) -> 0: + E[r | r > 0] = 1 + (m - 1) * k / (2 * n) + O( (k/n)^2 ) + */ + return 1 + (n_queries - 1) * n_probes / (2 * n_clusters); +} + +/** + * Estimate a carveout value as expected by `cudaFuncAttributePreferredSharedMemoryCarveout` + * (which does not take into account `reservedSharedMemPerBlock`), + * given by a desired schmem-L1 split and a per-block memory requirement in bytes. + * + * NB: As per the programming guide, the memory carveout setting is just a hint for the driver; it's + * free to choose any shmem-L1 configuration it deems appropriate. For example, if you set the + * carveout to zero, it will choose a non-zero config that will allow to run at least one active + * block per SM. + * + * @param shmem_fraction + * a fraction representing a desired split (shmem / (shmem + L1)) [0, 1]. + * @param shmem_per_block + * a shared memory usage per block (dynamic + static shared memory sizes), in bytes. + * @param dev_props + * device properties. + * @return + * a carveout value in percents [0, 100]. + */ +constexpr inline auto estimate_carveout(double shmem_fraction, + size_t shmem_per_block, + const cudaDeviceProp& dev_props) -> int +{ + using shmem_unit = Pow2<128>; + size_t m = shmem_unit::roundUp(shmem_per_block); + size_t r = dev_props.reservedSharedMemPerBlock; + size_t s = dev_props.sharedMemPerMultiprocessor; + return (size_t(100 * s * m * shmem_fraction) - (m - 1) * r) / (s * (m + r)); +} + /** * This structure selects configurable template parameters (instance) based on * the search/index parameters at runtime. @@ -778,78 +930,68 @@ __launch_bounds__(1024) __global__ */ template struct ivfpq_compute_similarity { - using kernel_t = void (*)(uint32_t, - uint32_t, - uint32_t, - uint32_t, - uint32_t, - uint32_t, - distance::DistanceType, - codebook_gen, - uint32_t, - const float*, - const float*, - const uint8_t*, - const IdxT*, - const uint32_t*, - const uint32_t*, - const float*, - const uint32_t*, - LutT*, - OutT*, - IdxT*); + using kernel_t = decltype(&ivfpq_compute_similarity_kernel); template struct configured { public: - /** - * Select a proper kernel instance based on the runtime parameters. - * - * @param pq_bits - * @param pq_dim - * @param k_max - */ - static auto kernel(uint32_t pq_bits, uint32_t pq_dim, uint32_t k_max) -> kernel_t + /** Select a proper kernel instance based on the runtime parameters. */ + static auto kernel(uint32_t pq_bits, uint32_t k_max) -> kernel_t { - return kernel_base(pq_bits, pq_dim, k_max); + switch (pq_bits) { + case 4: return kernel_try_capacity<4, kMaxCapacity>(k_max); + case 5: return kernel_try_capacity<5, kMaxCapacity>(k_max); + case 6: return kernel_try_capacity<6, kMaxCapacity>(k_max); + case 7: return kernel_try_capacity<7, kMaxCapacity>(k_max); + case 8: return kernel_try_capacity<8, kMaxCapacity>(k_max); + default: RAFT_FAIL("Invalid pq_bits (%u), the value must be within [4, 8]", pq_bits); + } } private: - template + template static auto kernel_try_capacity(uint32_t k_max) -> kernel_t { if constexpr (Capacity > 0) { - if (k_max == 0 || k_max > Capacity) { return kernel_try_capacity(k_max); } + if (k_max == 0 || k_max > Capacity) { return kernel_try_capacity(k_max); } } - if constexpr (Capacity > 32) { - if (k_max * 2 <= Capacity) { return kernel_try_capacity(k_max); } + if constexpr (Capacity > 1) { + if (k_max * 2 <= Capacity) { return kernel_try_capacity(k_max); } } - return ivfpq_compute_similarity_kernel; } + }; + + /** Estimate the occupancy for the given kernel on the given device. */ + struct occupancy_t { + using shmem_unit = Pow2<128>; - static auto kernel_base(uint32_t pq_bits, uint32_t pq_dim, uint32_t k_max) -> kernel_t + int blocks_per_sm = 0; + double occupancy = 0.0; + double shmem_use = 1.0; + + inline occupancy_t() = default; + inline occupancy_t(size_t smem, + uint32_t n_threads, + kernel_t kernel, + const cudaDeviceProp& dev_props) { - switch (gcd(pq_bits * pq_dim, 64)) { - case 64: return kernel_try_capacity(k_max); - case 32: return kernel_try_capacity(k_max); - case 16: return kernel_try_capacity(k_max); - case 8: return kernel_try_capacity(k_max); - default: - RAFT_FAIL("`pq_bits * pq_dim` must be a multiple of 8 (pq_bits = %u, pq_dim = %u).", - pq_bits, - pq_dim); - } + RAFT_CUDA_TRY( + cudaOccupancyMaxActiveBlocksPerMultiprocessor(&blocks_per_sm, kernel, n_threads, smem)); + occupancy = double(blocks_per_sm * n_threads) / double(dev_props.maxThreadsPerMultiProcessor); + shmem_use = double(shmem_unit::roundUp(smem) * blocks_per_sm) / + double(dev_props.sharedMemPerMultiprocessor); } }; struct selected { - void* kernel; + kernel_t kernel; dim3 grid_dim; dim3 block_dim; size_t smem_size; @@ -858,8 +1000,8 @@ struct ivfpq_compute_similarity { template void operator()(rmm::cuda_stream_view stream, Args... args) { - void* xs[] = {&args...}; // NOLINT - RAFT_CUDA_TRY(cudaLaunchKernel(kernel, grid_dim, block_dim, xs, smem_size, stream)); + kernel<<>>(args...); + RAFT_CHECK_CUDA(stream); } }; @@ -873,128 +1015,236 @@ struct ivfpq_compute_similarity { * whether use the fused calculate+select or just calculate the distances for each * query and probed cluster. * + * @param locality_hint + * beyond this limit do not consider increasing the number of active blocks per SM + * would improve locality anymore. */ static inline auto select(bool manage_local_topk, + int locality_hint, + double preferred_shmem_carveout, uint32_t pq_bits, uint32_t pq_dim, - uint32_t rot_dim, - uint32_t preferred_thread_block_size, + uint32_t precomp_data_count, uint32_t n_queries, uint32_t n_probes, uint32_t topk) -> selected { - using conf_fast = configured; - using conf_no_basediff = configured; - using conf_no_smem_lut = configured; - - kernel_t kernel_fast = conf_fast::kernel(pq_bits, pq_dim, manage_local_topk ? topk : 0u); - kernel_t kernel_no_basediff = - conf_no_basediff::kernel(pq_bits, pq_dim, manage_local_topk ? topk : 0u); - kernel_t kernel_no_smem_lut = - conf_no_smem_lut::kernel(pq_bits, pq_dim, manage_local_topk ? topk : 0u); - - const size_t smem_threshold = 48 * 1024; - size_t smem_size = sizeof(LutT) * (pq_dim << pq_bits); - size_t smem_size_base_diff = sizeof(float) * rot_dim; - - uint32_t n_blocks = n_queries * n_probes; - uint32_t n_threads = 1024; - // preferred_thread_block_size == 0 means using auto thread block size calculation mode - if (preferred_thread_block_size == 0) { - const uint32_t thread_min = 256; + cudaDeviceProp dev_props; + { int cur_dev; - cudaDeviceProp dev_props; RAFT_CUDA_TRY(cudaGetDevice(&cur_dev)); RAFT_CUDA_TRY(cudaGetDeviceProperties(&dev_props, cur_dev)); - while (n_threads > thread_min) { - if (n_blocks < uint32_t(getMultiProcessorCount() * (1024 / (n_threads / 2)))) { break; } - if (dev_props.sharedMemPerMultiprocessor * 2 / 3 < smem_size * (1024 / (n_threads / 2))) { - break; + } + // Shared memory for storing the lookup table + size_t lut_mem = sizeof(LutT) * (pq_dim << pq_bits); + // Shared memory for storing pre-computed pieces to speedup the lookup table construction + // (e.g. the distance between a cluster center and the query for L2). + size_t bdf_mem = sizeof(float) * precomp_data_count; + // Shared memory for the fused top-k component; it may overlap with the other uses of shared + // memory and depends on the number of threads. + struct ltk_mem_t { + uint32_t subwarp_size; + uint32_t topk; + bool manage_local_topk; + ltk_mem_t(bool manage_local_topk, uint32_t topk) + : manage_local_topk(manage_local_topk), topk(topk) + { + subwarp_size = WarpSize; + while (topk * 2 <= subwarp_size) { + subwarp_size /= 2; } - n_threads /= 2; } - } else { - n_threads = preferred_thread_block_size; + + [[nodiscard]] auto operator()(uint32_t n_threads) const -> size_t + { + return manage_local_topk ? topk::template calc_smem_size_for_block_wide( + n_threads / subwarp_size, topk) + : 0; + } + } ltk_mem{manage_local_topk, topk}; + + // Total amount of work; should be enough to occupy the GPU. + uint32_t n_blocks = n_queries * n_probes; + + // The minimum block size we may want: + // 1. It's a power-of-two for efficient L1 caching of pq_centers values + // (multiples of `1 << pq_bits`). + // 2. It should be large enough to fully utilize an SM. + uint32_t n_threads_min = WarpSize; + while (dev_props.maxBlocksPerMultiProcessor * int(n_threads_min) < + dev_props.maxThreadsPerMultiProcessor) { + n_threads_min *= 2; } - size_t smem_size_local_topk = - manage_local_topk - ? topk::template calc_smem_size_for_block_wide(n_threads / WarpSize, topk) - : 0; - smem_size = max(smem_size, smem_size_local_topk); - - kernel_t kernel = kernel_no_basediff; - - bool kernel_no_basediff_available = true; - bool use_smem_lut = true; - if (smem_size > smem_threshold) { - cudaError_t cuda_status = cudaFuncSetAttribute( - kernel_no_basediff, cudaFuncAttributeMaxDynamicSharedMemorySize, smem_size); + // Further increase the minimum block size to make sure full device occupancy + // (NB: this may lead to `n_threads_min` being larger than the kernel's maximum) + while (int(n_blocks * n_threads_min) < + dev_props.multiProcessorCount * dev_props.maxThreadsPerMultiProcessor && + int(n_threads_min) < dev_props.maxThreadsPerBlock) { + n_threads_min *= 2; + } + // Even further, increase it to allow less blocks per SM if there not enough queries. + // With this, we reduce the chance of different clusters being processed by two blocks + // on the same SM and thus improve the data locality for L1 caching. + while (int(n_queries * n_threads_min) < dev_props.maxThreadsPerMultiProcessor && + int(n_threads_min) < dev_props.maxThreadsPerBlock) { + n_threads_min *= 2; + } + + // Granularity of changing the number of threads when computing the maximum block size. + // It's good to have it multiple of the PQ book width. + uint32_t n_threads_gty = round_up_safe(1u << pq_bits, WarpSize); + + /* + Shared memory / L1 cache balance is the main limiter of this kernel. + The more blocks per SM we launch, the more shared memory we need. Besides that, we have + three versions of the kernel varying in performance and shmem usage. + + We try the most demanding and the fastest kernel first, trying to maximize occupancy with + the minimum number of blocks (just one, really). Then, we tweak the `n_threads` to further + optimize occupancy and data locality for the L1 cache. + */ + using conf_fast = configured; + using conf_no_basediff = configured; + using conf_no_smem_lut = configured; + auto topk_or_zero = manage_local_topk ? topk : 0u; + std::array candidates{ + std::make_tuple(conf_fast::kernel(pq_bits, topk_or_zero), lut_mem + bdf_mem, true), + std::make_tuple(conf_no_basediff::kernel(pq_bits, topk_or_zero), lut_mem, true), + std::make_tuple(conf_no_smem_lut::kernel(pq_bits, topk_or_zero), bdf_mem, false)}; + + // we may allow slightly lower than 100% occupancy; + constexpr double kTargetOccupancy = 0.75; + // This struct is used to select the better candidate + occupancy_t selected_perf{}; + selected selected_config; + for (auto [kernel, smem_size_const, lut_is_in_shmem] : candidates) { + if (smem_size_const > dev_props.sharedMemPerBlockOptin) { + // Even a single block cannot fit into an SM due to shmem requirements. Skip the candidate. + continue; + } + + // First, we set the carveout hint to the preferred value. The driver will increase this if + // needed to run at least one block per SM. At the same time, if more blocks fit into one SM, + // this carveout value will limit the calculated occupancy. When we're done selecting the best + // launch configuration, we will tighten the carveout once more, based on the final memory + // usage and occupancy. + const int max_carveout = + estimate_carveout(preferred_shmem_carveout, smem_size_const, dev_props); + RAFT_CUDA_TRY( + cudaFuncSetAttribute(kernel, cudaFuncAttributePreferredSharedMemoryCarveout, max_carveout)); + + // Get the theoretical maximum possible number of threads per block + cudaFuncAttributes kernel_attrs; + RAFT_CUDA_TRY(cudaFuncGetAttributes(&kernel_attrs, kernel)); + uint32_t n_threads = + round_down_safe(kernel_attrs.maxThreadsPerBlock, n_threads_gty); + + // Actual required shmem depens on the number of threads + size_t smem_size = max(smem_size_const, ltk_mem(n_threads)); + + // Make sure the kernel can get enough shmem. + cudaError_t cuda_status = + cudaFuncSetAttribute(kernel, cudaFuncAttributeMaxDynamicSharedMemorySize, smem_size); if (cuda_status != cudaSuccess) { RAFT_EXPECTS( cuda_status == cudaGetLastError(), "Tried to reset the expected cuda error code, but it didn't match the expectation"); - kernel_no_basediff_available = false; - - // Use "kernel_no_smem_lut" which just uses small amount of shared memory. - RAFT_LOG_DEBUG( - "Non-shared-mem look-up table kernel is selected, because it wouldn't fit shmem " - "required: " - "%zu bytes)", - smem_size); - kernel = kernel_no_smem_lut; - use_smem_lut = false; - n_threads = 1024; - smem_size_local_topk = - manage_local_topk - ? topk::template calc_smem_size_for_block_wide(n_threads / WarpSize, topk) - : 0; - smem_size = max(smem_size_base_diff, smem_size_local_topk); - n_blocks = getMultiProcessorCount(); + // Failed to request enough shmem for the kernel. Skip the candidate. + continue; } - } - if (kernel_no_basediff_available) { - bool kernel_fast_available = true; - if (smem_size + smem_size_base_diff > smem_threshold) { - cudaError_t cuda_status = cudaFuncSetAttribute(kernel_fast, - cudaFuncAttributeMaxDynamicSharedMemorySize, - smem_size + smem_size_base_diff); - if (cuda_status != cudaSuccess) { - RAFT_EXPECTS( - cuda_status == cudaGetLastError(), - "Tried to reset the expected cuda error code, but it didn't match the expectation"); - kernel_fast_available = false; - RAFT_LOG_DEBUG( - "No-precomputed-basediff kernel is selected, because the basediff wouldn't fit (shmem " - "required: %zu bytes)", - smem_size + smem_size_base_diff); + + occupancy_t cur(smem_size, n_threads, kernel, dev_props); + if (cur.blocks_per_sm <= 0) { + // For some reason, we still cannot make this kernel run. Skip the candidate. + continue; + } + + { + // Try to reduce the number of threads to increase occupancy and data locality + auto n_threads_tmp = n_threads_min; + while (n_threads_tmp * 2 < n_threads) { + n_threads_tmp *= 2; + } + if (n_threads_tmp < n_threads) { + while (n_threads_tmp >= n_threads_min) { + auto smem_size_tmp = max(smem_size_const, ltk_mem(n_threads_tmp)); + occupancy_t tmp(smem_size_tmp, n_threads_tmp, kernel, dev_props); + bool select_it = false; + if (lut_is_in_shmem && locality_hint >= tmp.blocks_per_sm) { + // Normally, the smaller the block the better for L1 cache hit rate. + // Hence, the occupancy should be "just good enough" + select_it = tmp.occupancy >= min(kTargetOccupancy, cur.occupancy); + } else if (lut_is_in_shmem) { + // If we don't have enough repeating probes (locality_hint < tmp.blocks_per_sm), + // the locality is not going to improve with increasing the number of blocks per SM. + // Hence, the only metric here is the occupancy. + select_it = tmp.occupancy > cur.occupancy; + } else { + // If we don't use shared memory for the lookup table, increasing the number of blocks + // is very taxing on the global memory usage. + // In this case, the occupancy must increase a lot to make it worth the cost. + select_it = tmp.occupancy >= min(1.0, cur.occupancy / kTargetOccupancy); + } + if (select_it) { + n_threads = n_threads_tmp; + smem_size = smem_size_tmp; + cur = tmp; + } + n_threads_tmp /= 2; + } } } - if (kernel_fast_available) { - int kernel_no_basediff_n_blocks = 0; - RAFT_CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &kernel_no_basediff_n_blocks, kernel_no_basediff, n_threads, smem_size)); - - int kernel_fast_n_blocks = 0; - RAFT_CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &kernel_fast_n_blocks, kernel_fast, n_threads, smem_size + smem_size_base_diff)); - - // Use "kernel_fast" only if GPU occupancy does not drop - if (kernel_no_basediff_n_blocks == kernel_fast_n_blocks) { - kernel = kernel_fast; - smem_size += smem_size_base_diff; + + { + if (selected_perf.occupancy <= 0.0 // no candidate yet + || (selected_perf.occupancy < cur.occupancy * kTargetOccupancy && + selected_perf.shmem_use >= cur.shmem_use) // much improved occupancy + ) { + selected_perf = cur; + if (lut_is_in_shmem) { + selected_config = { + kernel, dim3(n_blocks, 1, 1), dim3(n_threads, 1, 1), smem_size, size_t(0)}; + } else { + // When the global memory is used for the lookup table, we need to minimize the grid + // size; otherwise, the kernel may quickly run out of memory. + auto n_blocks_min = + std::min(n_blocks, cur.blocks_per_sm * dev_props.multiProcessorCount); + selected_config = {kernel, + dim3(n_blocks_min, 1, 1), + dim3(n_threads, 1, 1), + smem_size, + size_t(n_blocks_min) * size_t(pq_dim << pq_bits)}; + } + // Actual shmem/L1 split wildly rounds up the specified preferred carveout, so we set here + // a rather conservative bar; most likely, the kernel gets more shared memory than this, + // and the occupancy doesn't get hurt. + auto carveout = std::min(max_carveout, std::ceil(100.0 * cur.shmem_use)); + RAFT_CUDA_TRY( + cudaFuncSetAttribute(kernel, cudaFuncAttributePreferredSharedMemoryCarveout, carveout)); + if (cur.occupancy >= kTargetOccupancy) { break; } + } else if (selected_perf.occupancy > 0.0) { + // If we found a reasonable candidate on a previous iteration, and this one is not better, + // then don't try any more candidates because they are much slower anyway. + break; } } } - uint32_t device_lut_size = use_smem_lut ? 0u : n_blocks * (pq_dim << pq_bits); - return {reinterpret_cast(kernel), - dim3(n_blocks, 1, 1), - dim3(n_threads, 1, 1), - smem_size, - device_lut_size}; + RAFT_EXPECTS(selected_perf.occupancy > 0.0, + "Couldn't determine a working kernel launch configuration."); + + return selected_config; } }; +inline auto is_local_topk_feasible(uint32_t k, uint32_t n_probes, uint32_t n_queries) -> bool +{ + if (k > kMaxCapacity) { return false; } // warp_sort not possible + if (n_probes <= 16) { return false; } // too few clusters + if (n_queries * n_probes <= 256) { return false; } // overall amount of work is too small + return true; +} + /** * The "main part" of the search, which assumes that outer-level `search` has already: * @@ -1009,13 +1259,13 @@ void ivfpq_search_worker(const handle_t& handle, uint32_t max_samples, uint32_t n_probes, uint32_t topK, - uint32_t preferred_thread_block_size, uint32_t n_queries, const uint32_t* clusters_to_probe, // [n_queries, n_probes] const float* query, // [n_queries, rot_dim] IdxT* neighbors, // [n_queries, topK] float* distances, // [n_queries, topK] float scaling_factor, + double preferred_shmem_carveout, rmm::mr::device_memory_resource* mr) { auto stream = handle.get_stream(); @@ -1025,13 +1275,10 @@ void ivfpq_search_worker(const handle_t& handle, auto data_indices = index.indices().data_handle(); auto cluster_centers = index.centers_rot().data_handle(); auto cluster_offsets = index.list_offsets().data_handle(); + auto cluster_sizes = index.list_sizes().data_handle(); - bool manage_local_topk = topK <= kMaxCapacity // depth is not too large - && n_probes >= 16 // not too few clusters looked up - && - n_queries * n_probes >= 256 // overall amount of work is not too small - ; - auto topk_len = manage_local_topk ? n_probes * topK : max_samples; + bool manage_local_topk = is_local_topk_feasible(topK, n_probes, n_queries); + auto topk_len = manage_local_topk ? n_probes * topK : max_samples; if (manage_local_topk) { RAFT_LOG_DEBUG("Fused version of the search kernel is selected (manage_local_topk == true)"); } else { @@ -1052,10 +1299,12 @@ void ivfpq_search_worker(const handle_t& handle, neighbors_ptr = neighbors_buf.data(); } - calc_chunk_indices::configure(n_probes, n_queries)( - cluster_offsets, clusters_to_probe, chunk_index.data(), num_samples.data(), stream); + calc_chunk_indices::configure(n_probes, n_queries)( + cluster_sizes, clusters_to_probe, chunk_index.data(), num_samples.data(), stream); + + auto coresidency = expected_probe_coresidency(index.n_lists(), n_probes, n_queries); - if (n_queries * n_probes > 256) { + if (coresidency > 1) { // Sorting index by cluster number (label). // The goal is to incrase the L2 cache hit rate to read the vectors // of a cluster by processing the cluster at the same time as much as @@ -1096,22 +1345,48 @@ void ivfpq_search_worker(const handle_t& handle, } // select and run the main search kernel + uint32_t precomp_data_count = 0; + switch (index.metric()) { + case distance::DistanceType::L2SqrtExpanded: + case distance::DistanceType::L2SqrtUnexpanded: + case distance::DistanceType::L2Unexpanded: + case distance::DistanceType::L2Expanded: { + // stores basediff (query[i] - center[i]) + precomp_data_count = index.rot_dim(); + } break; + case distance::DistanceType::InnerProduct: { + // stores two components (query[i] * center[i], query[i] * center[i]) + precomp_data_count = index.rot_dim() * 2; + } break; + default: { + RAFT_FAIL("Unsupported metric"); + } break; + } + auto search_instance = ivfpq_compute_similarity::select(manage_local_topk, + coresidency, + preferred_shmem_carveout, index.pq_bits(), index.pq_dim(), - index.rot_dim(), - preferred_thread_block_size, + precomp_data_count, n_queries, n_probes, topK); rmm::device_uvector device_lut(search_instance.device_lut_size, stream, mr); + rmm::device_uvector query_kths(0, stream, mr); + if (manage_local_topk) { + query_kths.resize(n_queries, stream); + thrust::fill_n(handle.get_thrust_policy(), + query_kths.data(), + n_queries, + float(dummy_block_sort_t::queue_t::kDummy)); + } search_instance(stream, index.size(), index.rot_dim(), n_probes, - index.pq_bits(), index.pq_dim(), n_queries, index.metric(), @@ -1125,6 +1400,7 @@ void ivfpq_search_worker(const handle_t& handle, chunk_index.data(), query, index_list_sorted, + query_kths.data(), device_lut.data(), distances_buf.data(), neighbors_ptr); @@ -1164,19 +1440,7 @@ void ivfpq_search_worker(const handle_t& handle, template struct ivfpq_search { public: - using fun_t = void (*)(const handle_t&, - const ivf_pq::index&, - uint32_t, - uint32_t, - uint32_t, - uint32_t, - uint32_t, - const uint32_t*, - const float*, - IdxT*, - float*, - float, - rmm::mr::device_memory_resource*); + using fun_t = decltype(&ivfpq_search_worker); /** * Select an instance of the ivf-pq search function based on search tuning parameters, @@ -1226,12 +1490,18 @@ struct ivfpq_search { * A heuristic for bounding the number of queries per batch, to improve GPU utilization. * (based on the number of SMs and the work size). * + * @param k top-k + * @param n_probes number of selected clusters per query * @param n_queries number of queries hoped to be processed at once. * (maximum value for the returned batch size) + * @param max_samples maximum possible number of samples to be processed for the given `n_probes` * * @return maximum recommended batch size. */ -inline auto get_max_batch_size(uint32_t n_queries) -> uint32_t +inline auto get_max_batch_size(uint32_t k, + uint32_t n_probes, + uint32_t n_queries, + uint32_t max_samples) -> uint32_t { uint32_t max_batch_size = n_queries; uint32_t n_ctas_total = getMultiProcessorCount() * 2; @@ -1243,6 +1513,23 @@ inline auto get_max_batch_size(uint32_t n_queries) -> uint32_t float utilization_1 = float(n_ctas_total_per_batch_1 * max_batch_size_1) / n_ctas_total; if (utilization < utilization_1) { max_batch_size = max_batch_size_1; } } + // Check in the tmp distance buffer is not too big + auto ws_size = [k, n_probes, max_samples](uint32_t bs) -> uint64_t { + return uint64_t(is_local_topk_feasible(k, n_probes, bs) ? k * n_probes : max_samples) * bs; + }; + constexpr uint64_t kMaxWsSize = 1024 * 1024 * 1024; + if (ws_size(max_batch_size) > kMaxWsSize) { + uint32_t smaller_batch_size = 1; + // take powers of two for better alignment + while (smaller_batch_size * 2 <= max_batch_size) { + smaller_batch_size <<= 1; + } + // gradually reduce the batch size until we fit into the max size limit. + while (smaller_batch_size > 1 && ws_size(smaller_batch_size) > kMaxWsSize) { + smaller_batch_size >>= 1; + } + return smaller_batch_size; + } return max_batch_size; } @@ -1261,7 +1548,11 @@ inline void search(const handle_t& handle, static_assert(std::is_same_v || std::is_same_v || std::is_same_v, "Unsupported element type."); common::nvtx::range fun_scope( - "ivf_pq::search(k = %u, n_queries = %u, dim = %zu)", k, n_queries, index.dim()); + "ivf_pq::search(n_queries = %u, n_probes = %u, k = %u, dim = %zu)", + n_queries, + params.n_probes, + k, + index.dim()); RAFT_EXPECTS( params.internal_distance_dtype == CUDA_R_16F || params.internal_distance_dtype == CUDA_R_32F, @@ -1269,11 +1560,6 @@ inline void search(const handle_t& handle, RAFT_EXPECTS(params.lut_dtype == CUDA_R_16F || params.lut_dtype == CUDA_R_32F || params.lut_dtype == CUDA_R_8U, "lut_dtype must be CUDA_R_16F, CUDA_R_32F or CUDA_R_8U"); - RAFT_EXPECTS( - params.preferred_thread_block_size == 256 || params.preferred_thread_block_size == 512 || - params.preferred_thread_block_size == 1024 || params.preferred_thread_block_size == 0, - "preferred_thread_block_size must be 0, 256, 512 or 1024, but %u is given.", - params.preferred_thread_block_size); RAFT_EXPECTS(k > 0, "parameter `k` in top-k must be positive."); RAFT_EXPECTS( k <= index.size(), @@ -1326,7 +1612,7 @@ inline void search(const handle_t& handle, // Maximum number of query vectors to search at the same time. const auto max_queries = std::min(std::max(n_queries, 1), 4096); - auto max_batch_size = get_max_batch_size(max_queries); + auto max_batch_size = get_max_batch_size(k, n_probes, max_queries, max_samples); rmm::device_uvector float_queries(max_queries * dim_ext, stream, mr); rmm::device_uvector rot_queries(max_queries * index.rot_dim(), stream, mr); @@ -1380,13 +1666,13 @@ inline void search(const handle_t& handle, max_samples, params.n_probes, k, - params.preferred_thread_block_size, batch_size, clusters_to_probe.data() + uint64_t(params.n_probes) * offset_b, rot_queries.data() + uint64_t(index.rot_dim()) * offset_b, neighbors + uint64_t(k) * (offset_q + offset_b), distances + uint64_t(k) * (offset_q + offset_b), utils::config::kDivisor / utils::config::kDivisor, + params.preferred_shmem_carveout, mr); } } diff --git a/cpp/include/raft/spatial/knn/detail/topk.cuh b/cpp/include/raft/spatial/knn/detail/topk.cuh index 5adf6df472..f4dcb53088 100644 --- a/cpp/include/raft/spatial/knn/detail/topk.cuh +++ b/cpp/include/raft/spatial/knn/detail/topk.cuh @@ -19,6 +19,8 @@ #include "topk/radix_topk.cuh" #include "topk/warpsort_topk.cuh" +#include + #include #include @@ -73,7 +75,11 @@ void select_topk(const T* in, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr = nullptr) { - if (k <= raft::spatial::knn::detail::topk::kMaxCapacity) { + common::nvtx::range fun_scope( + "matrix::select_topk(batch_size = %zu, len = %zu, k = %d)", batch_size, len, k); + // TODO (achirkin): investigate the trade-off for a wider variety of inputs. + const bool radix_faster = batch_size >= 64 && len >= 102400 && k >= 128; + if (k <= raft::spatial::knn::detail::topk::kMaxCapacity && !radix_faster) { topk::warp_sort_topk( in, in_idx, batch_size, len, k, out, out_idx, select_min, stream, mr); } else { diff --git a/cpp/include/raft/spatial/knn/detail/topk/warpsort_topk.cuh b/cpp/include/raft/spatial/knn/detail/topk/warpsort_topk.cuh index 3f67e7b03a..cbe9f36e97 100644 --- a/cpp/include/raft/spatial/knn/detail/topk/warpsort_topk.cuh +++ b/cpp/include/raft/spatial/knn/detail/topk/warpsort_topk.cuh @@ -33,7 +33,7 @@ Three APIs of different scopes are provided: 1. host function: warp_sort_topk() 2. block-wide API: class block_sort - 3. warp-wide API: class warp_sort_filtered and class warp_sort_immediate + 3. warp-wide API: several implementations of warp_sort_* 1. warp_sort_topk() @@ -42,7 +42,7 @@ 2. class block_sort It can be regarded as a fixed size priority queue for a thread block, although the API is not typical. - class warp_sort_filtered and warp_sort_immediate can be used to instantiate block_sort. + one of the classes `warp_sort_*` can be used to instantiate block_sort. It uses dynamic shared memory as an intermediate buffer. So the required shared memory size should be calculated using @@ -70,7 +70,7 @@ kernel<<>>(); - 3. class warp_sort_filtered and class warp_sort_immediate + 3. class warp_sort_* These two classes can be regarded as fixed size priority queue for a warp. Usage is similar to class block_sort. No shared memory is needed. @@ -276,8 +276,8 @@ class warp_sort_filtered : public warp_sort { using warp_sort::kWarpWidth; using warp_sort::k; - __device__ warp_sort_filtered(int k) - : warp_sort(k), buf_len_(0), k_th_(kDummy) + __device__ warp_sort_filtered(int k, T limit) + : warp_sort(k), buf_len_(0), k_th_(limit) { #pragma unroll for (int i = 0; i < kMaxBufLen; i++) { @@ -286,6 +286,11 @@ class warp_sort_filtered : public warp_sort { } } + __device__ __forceinline__ explicit warp_sort_filtered(int k) + : warp_sort_filtered(k, kDummy) + { + } + __device__ void add(T val, IdxT idx) { // comparing for k_th should reduce the total amount of updates: @@ -356,6 +361,108 @@ class warp_sort_filtered : public warp_sort { T k_th_; }; +/** + * This version of warp_sort compares each input element against the current + * estimate of k-th value before adding it to the intermediate sorting buffer. + * In contrast to `warp_sort_filtered`, it keeps one distributed buffer for + * all threads in a warp (independently of the subwarp size), which makes its flushing less often. + */ +template +class warp_sort_distributed : public warp_sort { + public: + using warp_sort::kDummy; + using warp_sort::kWarpWidth; + using warp_sort::k; + + __device__ warp_sort_distributed(int k, T limit) + : warp_sort(k), + buf_val_(kDummy), + buf_idx_(IdxT{}), + buf_len_(0), + k_th_(limit) + { + } + + __device__ __forceinline__ explicit warp_sort_distributed(int k) + : warp_sort_distributed(k, kDummy) + { + } + + __device__ void add(T val, IdxT idx) + { + // mask tells which lanes in the warp have valid items to be added + uint32_t mask = ballot(is_ordered(val, k_th_)); + if (mask == 0) { return; } + // how many elements to be added + uint32_t n_valid = __popc(mask); + // index of the source lane containing the value to put into the current lane. + uint32_t src_ix = 0; + // remove a few smallest set bits from the mask. + for (uint32_t i = std::min(n_valid, Pow2::mod(uint32_t(laneId()) - buf_len_)); i > 0; + i--) { + src_ix = __ffs(mask) - 1; + mask ^= (0x1u << src_ix); + } + // now the least significant bit of the mask corresponds to the lane id we want to get. + // for not-added (invalid) indices, the mask is zeroed by now. + src_ix = __ffs(mask) - 1; + // rearrange the inputs to be ready to put them into the tmp buffer + val = shfl(val, src_ix); + idx = shfl(idx, src_ix); + // for non-valid lanes, src_ix should be uint(-1) + if (mask == 0) { val = kDummy; } + // save the values into the free slots of the warp tmp buffer + if (laneId() >= buf_len_) { + buf_val_ = val; + buf_idx_ = idx; + } + buf_len_ += n_valid; + if (buf_len_ < WarpSize) { return; } + // merge the warp tmp buffer into the queue + merge_buf_(); + buf_len_ -= WarpSize; + // save the inputs that couldn't fit before the merge + if (laneId() < buf_len_) { + buf_val_ = val; + buf_idx_ = idx; + } + } + + __device__ void done() + { + if (buf_len_ != 0) { + merge_buf_(); + buf_len_ = 0; + } + } + + private: + __device__ __forceinline__ void set_k_th_() + { + // NB on using srcLane: it's ok if it is outside the warp size / width; + // the modulo op will be done inside the __shfl_sync. + k_th_ = shfl(val_arr_[kMaxArrLen - 1], k - 1, kWarpWidth); + } + + __device__ __forceinline__ void merge_buf_() + { + topk::bitonic<1>(!Ascending, kWarpWidth).sort(buf_val_, buf_idx_); + this->merge_in<1>(&buf_val_, &buf_idx_); + set_k_th_(); // contains warp sync + buf_val_ = kDummy; + } + + using warp_sort::kMaxArrLen; + using warp_sort::val_arr_; + using warp_sort::idx_arr_; + + T buf_val_; + IdxT buf_idx_; + uint32_t buf_len_; // 0 <= buf_len_ <= WarpSize + + T k_th_; +}; + /** * This version of warp_sort adds every input element into the intermediate sorting * buffer, and thus does the sorting step every `Capacity` input elements. @@ -436,7 +543,8 @@ class block_sort { public: using queue_t = WarpSortWarpWide; - __device__ block_sort(int k, uint8_t* smem_buf) : queue_(k) + template + __device__ block_sort(int k, uint8_t* smem_buf, Args... args) : queue_(k, args...) { val_smem_ = reinterpret_cast(smem_buf); const int num_of_warp = subwarp_align::div(blockDim.x); diff --git a/cpp/include/raft/util/cuda_utils.cuh b/cpp/include/raft/util/cuda_utils.cuh index e5b58718a0..a64fbdb1be 100644 --- a/cpp/include/raft/util/cuda_utils.cuh +++ b/cpp/include/raft/util/cuda_utils.cuh @@ -626,6 +626,16 @@ DI bool all(bool inFlag, uint32_t mask = 0xffffffffu) return inFlag; } +/** For every thread in the warp, set the corresponding bit to the thread's flag value. */ +DI uint32_t ballot(bool inFlag, uint32_t mask = 0xffffffffu) +{ +#if CUDART_VERSION >= 9000 + return __ballot_sync(mask, inFlag); +#else + return __ballot(inFlag); +#endif +} + /** * @brief Shuffle the data inside a warp * @tparam T the data type (currently assumed to be 4B) diff --git a/cpp/test/neighbors/ann_ivf_flat.cu b/cpp/test/neighbors/ann_ivf_flat.cu index c57e8c7548..735d569318 100644 --- a/cpp/test/neighbors/ann_ivf_flat.cu +++ b/cpp/test/neighbors/ann_ivf_flat.cu @@ -20,9 +20,9 @@ #include #include #include +#include #include #include -#include #include #include @@ -41,9 +41,7 @@ #include #include -namespace raft { -namespace spatial { -namespace knn { +namespace raft::neighbors::ivf_flat { template struct AnnIvfFlatInputs { @@ -356,6 +354,4 @@ TEST_P(AnnIVFFlatTestF_int8, AnnIVFFlat) { this->testIVFFlat(); } INSTANTIATE_TEST_CASE_P(AnnIVFFlatTest, AnnIVFFlatTestF_int8, ::testing::ValuesIn(inputs)); -} // namespace knn -} // namespace spatial -} // namespace raft +} // namespace raft::neighbors::ivf_flat diff --git a/cpp/test/neighbors/ann_ivf_pq.cuh b/cpp/test/neighbors/ann_ivf_pq.cuh index a247f0101f..9d6ad11ccb 100644 --- a/cpp/test/neighbors/ann_ivf_pq.cuh +++ b/cpp/test/neighbors/ann_ivf_pq.cuh @@ -20,10 +20,10 @@ #include #include +#include #include -#include #if defined RAFT_NN_COMPILED -#include +#include #else #pragma message("NN specializations are not enabled; expect very long building times.") #endif @@ -35,6 +35,8 @@ #include +#include +#include #include #include @@ -42,15 +44,15 @@ #include #include -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { struct ivf_pq_inputs { uint32_t num_db_vecs = 4096; uint32_t num_queries = 1024; uint32_t dim = 64; uint32_t k = 32; - raft::spatial::knn::ivf_pq::index_params index_params; - raft::spatial::knn::ivf_pq::search_params search_params; + ivf_pq::index_params index_params; + ivf_pq::search_params search_params; // Set some default parameters for tests ivf_pq_inputs() @@ -102,11 +104,22 @@ inline auto operator<<(std::ostream& os, const ivf_pq_inputs& p) -> std::ostream PRINT_DIFF_V(.search_params.lut_dtype, print_dtype{p.search_params.lut_dtype}); PRINT_DIFF_V(.search_params.internal_distance_dtype, print_dtype{p.search_params.internal_distance_dtype}); - PRINT_DIFF(.search_params.preferred_thread_block_size); os << "}"; return os; } +template +auto min_output_size(const handle_t& handle, const ivf_pq::index& index, uint32_t n_probes) + -> IdxT +{ + uint32_t skip = index.n_nonempty_lists() > n_probes ? index.n_nonempty_lists() - n_probes : 0; + auto map_type = [] __device__(uint32_t x) { return IdxT(x); }; + using iter = cub::TransformInputIterator; + iter start(index.list_sizes().data_handle() + skip, map_type); + iter end(index.list_sizes().data_handle() + index.n_nonempty_lists(), map_type); + return thrust::reduce(handle.get_thrust_policy(), start, end); +} + template class ivf_pq_test : public ::testing::TestWithParam { public: @@ -190,7 +203,7 @@ class ivf_pq_test : public ::testing::TestWithParam { } template - auto run(BuildIndex build_index) + void run(BuildIndex build_index) { auto index = build_index(); @@ -229,6 +242,39 @@ class ivf_pq_test : public ::testing::TestWithParam { ps.k, 0.001 / low_precision_factor, min_recall)); + + // Test a few extra invariants + IdxT min_results = min_output_size(handle_, index, ps.search_params.n_probes); + IdxT max_oob = ps.k <= min_results ? 0 : ps.k - min_results; + IdxT found_oob = 0; + for (uint32_t query_ix = 0; query_ix < ps.num_queries; query_ix++) { + for (uint32_t k = 0; k < ps.k; k++) { + auto flat_i = query_ix * ps.k + k; + auto found_ix = indices_ivf_pq[flat_i]; + if (found_ix == ivf_pq::index::kOutOfBoundsRecord) { + found_oob++; + continue; + } + ASSERT_NE(found_ix, ivf_pq::index::kInvalidRecord) + << "got an invalid record at query_ix = " << query_ix << ", k = " << k + << " (distance = " << distances_ivf_pq[flat_i] << ")"; + ASSERT_LT(found_ix, ps.num_db_vecs) + << "got an impossible index = " << found_ix << " at query_ix = " << query_ix + << ", k = " << k << " (distance = " << distances_ivf_pq[flat_i] << ")"; + } + } + ASSERT_LE(found_oob, max_oob) + << "got too many records out-of-bounds (see ivf_pq::index::kOutOfBoundsRecord)."; + if (found_oob > 0) { + RAFT_LOG_WARN( + "Got %zu results out-of-bounds because of large top-k (%zu) and small n_probes (%u) and " + "small DB size/n_lists ratio (%zu / %u)", + size_t(found_oob), + size_t(ps.k), + ps.search_params.n_probes, + size_t(ps.num_db_vecs), + ps.index_params.n_lists); + } } void SetUp() override // NOLINT @@ -365,10 +411,6 @@ inline auto enum_variety() -> test_cases_t ADD_CASE({ x.search_params.internal_distance_dtype = CUDA_R_32F; }); ADD_CASE({ x.search_params.internal_distance_dtype = CUDA_R_16F; }); - ADD_CASE({ x.search_params.preferred_thread_block_size = 256; }); - ADD_CASE({ x.search_params.preferred_thread_block_size = 512; }); - ADD_CASE({ x.search_params.preferred_thread_block_size = 1024; }); - return xs; } @@ -464,6 +506,30 @@ inline auto special_cases() -> test_cases_t x.search_params.n_probes = 50; }); + ADD_CASE({ + x.num_db_vecs = 10000; + x.dim = 16; + x.num_queries = 500; + x.k = 128; + x.index_params.metric = distance::DistanceType::L2Expanded; + x.index_params.codebook_kind = ivf_pq::codebook_gen::PER_SUBSPACE; + x.index_params.pq_bits = 8; + x.index_params.n_lists = 100; + x.search_params.n_probes = 100; + }); + + ADD_CASE({ + x.num_db_vecs = 10000; + x.dim = 16; + x.num_queries = 500; + x.k = 129; + x.index_params.metric = distance::DistanceType::L2Expanded; + x.index_params.codebook_kind = ivf_pq::codebook_gen::PER_SUBSPACE; + x.index_params.pq_bits = 8; + x.index_params.n_lists = 100; + x.search_params.n_probes = 100; + }); + return xs; } @@ -484,4 +550,4 @@ inline auto special_cases() -> test_cases_t #define INSTANTIATE(type, vals) \ INSTANTIATE_TEST_SUITE_P(IvfPq, type, ::testing::ValuesIn(vals)); /* NOLINT */ -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_ivf_pq/test_float_int64_t.cu b/cpp/test/neighbors/ann_ivf_pq/test_float_int64_t.cu index 30203fd2f0..ecb2faa6a0 100644 --- a/cpp/test/neighbors/ann_ivf_pq/test_float_int64_t.cu +++ b/cpp/test/neighbors/ann_ivf_pq/test_float_int64_t.cu @@ -16,7 +16,7 @@ #include "../ann_ivf_pq.cuh" -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { using f32_f32_i64 = ivf_pq_test; @@ -24,4 +24,4 @@ TEST_BUILD_SEARCH(f32_f32_i64) TEST_BUILD_EXTEND_SEARCH(f32_f32_i64) INSTANTIATE(f32_f32_i64, enum_variety_l2() + enum_variety_ip() + big_dims_small_lut()); -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_ivf_pq/test_float_uint32_t.cu b/cpp/test/neighbors/ann_ivf_pq/test_float_uint32_t.cu index cf2cf1ac54..57d87f47f6 100644 --- a/cpp/test/neighbors/ann_ivf_pq/test_float_uint32_t.cu +++ b/cpp/test/neighbors/ann_ivf_pq/test_float_uint32_t.cu @@ -16,11 +16,11 @@ #include "../ann_ivf_pq.cuh" -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { using f32_f32_u32 = ivf_pq_test; TEST_BUILD_SEARCH(f32_f32_u32) INSTANTIATE(f32_f32_u32, defaults() + var_n_probes() + var_k() + special_cases()); -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_ivf_pq/test_float_uint64_t.cu b/cpp/test/neighbors/ann_ivf_pq/test_float_uint64_t.cu index 5321783a32..2c203d12e7 100644 --- a/cpp/test/neighbors/ann_ivf_pq/test_float_uint64_t.cu +++ b/cpp/test/neighbors/ann_ivf_pq/test_float_uint64_t.cu @@ -16,11 +16,11 @@ #include "../ann_ivf_pq.cuh" -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { using f32_f32_u64 = ivf_pq_test; TEST_BUILD_EXTEND_SEARCH(f32_f32_u64) INSTANTIATE(f32_f32_u64, defaults() + small_dims() + big_dims_moderate_lut()); -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_ivf_pq/test_int8_t_uint64_t.cu b/cpp/test/neighbors/ann_ivf_pq/test_int8_t_uint64_t.cu index e6cafb0ab4..c1029d590c 100644 --- a/cpp/test/neighbors/ann_ivf_pq/test_int8_t_uint64_t.cu +++ b/cpp/test/neighbors/ann_ivf_pq/test_int8_t_uint64_t.cu @@ -16,11 +16,11 @@ #include "../ann_ivf_pq.cuh" -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { using f32_i08_u64 = ivf_pq_test; TEST_BUILD_SEARCH(f32_i08_u64) INSTANTIATE(f32_i08_u64, defaults() + big_dims() + var_k()); -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_ivf_pq/test_uint8_t_uint64_t.cu b/cpp/test/neighbors/ann_ivf_pq/test_uint8_t_uint64_t.cu index 23a4c87e14..729e99d22c 100644 --- a/cpp/test/neighbors/ann_ivf_pq/test_uint8_t_uint64_t.cu +++ b/cpp/test/neighbors/ann_ivf_pq/test_uint8_t_uint64_t.cu @@ -16,7 +16,7 @@ #include "../ann_ivf_pq.cuh" -namespace raft::spatial::knn { +namespace raft::neighbors::ivf_pq { using f32_u08_u64 = ivf_pq_test; @@ -24,4 +24,4 @@ TEST_BUILD_SEARCH(f32_u08_u64) TEST_BUILD_EXTEND_SEARCH(f32_u08_u64) INSTANTIATE(f32_u08_u64, small_dims_per_cluster() + enum_variety()); -} // namespace raft::spatial::knn +} // namespace raft::neighbors::ivf_pq diff --git a/cpp/test/neighbors/ann_utils.cuh b/cpp/test/neighbors/ann_utils.cuh index faf6fad115..3e3735719f 100644 --- a/cpp/test/neighbors/ann_utils.cuh +++ b/cpp/test/neighbors/ann_utils.cuh @@ -25,7 +25,7 @@ #include #include -namespace raft::spatial::knn { +namespace raft::neighbors { struct print_dtype { cudaDataType_t value; @@ -169,16 +169,17 @@ void naiveBfKnn(EvalT* dist_topk, naive_distance_kernel<<>>( dist.data(), x + offset * dim, y, batch_size, input_len, dim, type); - detail::select_topk(dist.data(), - nullptr, - batch_size, - input_len, - static_cast(k), - dist_topk + offset * k, - indices_topk + offset * k, - type != raft::distance::DistanceType::InnerProduct, - stream, - mr); + spatial::knn::detail::select_topk( + dist.data(), + nullptr, + batch_size, + input_len, + static_cast(k), + dist_topk + offset * k, + indices_topk + offset * k, + type != raft::distance::DistanceType::InnerProduct, + stream, + mr); } RAFT_CUDA_TRY(cudaStreamSynchronize(stream)); } @@ -244,4 +245,4 @@ auto eval_neighbours(const std::vector& expected_idx, return testing::AssertionSuccess(); } -} // namespace raft::spatial::knn +} // namespace raft::neighbors diff --git a/python/pylibraft/pylibraft/test/test_ivf_pq.py b/python/pylibraft/pylibraft/test/test_ivf_pq.py index 367ff6d44a..78a02ad77e 100644 --- a/python/pylibraft/pylibraft/test/test_ivf_pq.py +++ b/python/pylibraft/pylibraft/test/test_ivf_pq.py @@ -131,7 +131,7 @@ def run_ivf_pq_build_search_test( index = ivf_pq.extend(index, dataset_1_device, indices_1_device) index = ivf_pq.extend(index, dataset_2_device, indices_2_device) - assert index.size == n_rows + assert index.size >= n_rows queries = generate_data((n_queries, n_cols), dtype) out_idx = np.zeros((n_queries, k), dtype=np.uint64)