Skip to content

Commit

Permalink
random sampling of dataset rows with improved memory utilization (rap…
Browse files Browse the repository at this point in the history
…idsai#2155)

The random sampling of IVF methods was reverted (rapidsai#2144) due to large memory utilization rapidsai#2141.

This PR improves the memory consumption of subsamling: it is O(n_train) where n_train is the size of the subsampled dataset.

This PR adds the following new APIs:
- random::excess_sampling (todo may just call as sample_without_replacement)
- matrix::sample_rows
- matrix::gather for host input matrix

Authors:
  - Tamas Bela Feher (https://github.com/tfeher)

Approvers:
  - Artem M. Chirkin (https://github.com/achirkin)
  - Ben Frederickson (https://github.com/benfred)

URL: rapidsai#2155
  • Loading branch information
tfeher authored Mar 19, 2024
1 parent 413e34e commit 0b9692b
Show file tree
Hide file tree
Showing 11 changed files with 786 additions and 5 deletions.
2 changes: 1 addition & 1 deletion cpp/bench/prims/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ if(BUILD_PRIMS_BENCH)

ConfigureBench(
NAME RANDOM_BENCH PATH bench/prims/random/make_blobs.cu bench/prims/random/permute.cu
bench/prims/random/rng.cu bench/prims/main.cpp
bench/prims/random/rng.cu bench/prims/random/subsample.cu bench/prims/main.cpp
)

ConfigureBench(NAME SPARSE_BENCH PATH bench/prims/sparse/convert_csr.cu bench/prims/main.cpp)
Expand Down
38 changes: 35 additions & 3 deletions cpp/bench/prims/matrix/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,48 @@

#include <common/benchmark.hpp>

#include <raft/core/device_mdarray.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/resource/cuda_stream.hpp>
#include <raft/matrix/gather.cuh>
#include <raft/random/rng.cuh>
#include <raft/util/itertools.hpp>

#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

namespace raft::bench::matrix {

template <typename IdxT>
struct GatherParams {
IdxT rows, cols, map_length;
bool host;
};

template <typename IdxT>
inline auto operator<<(std::ostream& os, const GatherParams<IdxT>& p) -> std::ostream&
{
os << p.rows << "#" << p.cols << "#" << p.map_length;
os << p.rows << "#" << p.cols << "#" << p.map_length << (p.host ? "#host" : "#device");
return os;
}

template <typename T, typename MapT, typename IdxT, bool Conditional = false>
struct Gather : public fixture {
Gather(const GatherParams<IdxT>& p)
: params(p), matrix(this->handle), map(this->handle), out(this->handle), stencil(this->handle)
: params(p),
old_mr(rmm::mr::get_current_device_resource()),
pool_mr(rmm::mr::get_current_device_resource(), 2 * (1ULL << 30)),
matrix(this->handle),
map(this->handle),
out(this->handle),
stencil(this->handle),
matrix_h(this->handle)
{
rmm::mr::set_current_device_resource(&pool_mr);
}

~Gather() { rmm::mr::set_current_device_resource(old_mr); }

void allocate_data(const ::benchmark::State& state) override
{
matrix = raft::make_device_matrix<T, IdxT>(handle, params.rows, params.cols);
Expand All @@ -59,6 +73,11 @@ struct Gather : public fixture {
if constexpr (Conditional) {
raft::random::uniform(handle, rng, stencil.data_handle(), params.map_length, T(-1), T(1));
}

if (params.host) {
matrix_h = raft::make_host_matrix<T, IdxT>(handle, params.rows, params.cols);
raft::copy(matrix_h.data_handle(), matrix.data_handle(), matrix.size(), stream);
}
resource::sync_stream(handle, stream);
}

Expand All @@ -77,14 +96,22 @@ struct Gather : public fixture {
raft::matrix::gather_if(
handle, matrix_const_view, out.view(), map_const_view, stencil_const_view, pred_op);
} else {
raft::matrix::gather(handle, matrix_const_view, map_const_view, out.view());
if (params.host) {
raft::matrix::detail::gather(
handle, make_const_mdspan(matrix_h.view()), map_const_view, out.view());
} else {
raft::matrix::gather(handle, matrix_const_view, map_const_view, out.view());
}
}
});
}

private:
GatherParams<IdxT> params;
rmm::mr::device_memory_resource* old_mr;
rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource> pool_mr;
raft::device_matrix<T, IdxT> matrix, out;
raft::host_matrix<T, IdxT> matrix_h;
raft::device_vector<T, IdxT> stencil;
raft::device_vector<MapT, IdxT> map;
}; // struct Gather
Expand All @@ -100,4 +127,9 @@ RAFT_BENCH_REGISTER((Gather<float, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((Gather<double, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((GatherIf<float, uint32_t, int64_t>), "", gather_inputs_i64);
RAFT_BENCH_REGISTER((GatherIf<double, uint32_t, int64_t>), "", gather_inputs_i64);

auto inputs_host = raft::util::itertools::product<GatherParams<int64_t>>(
{10000000}, {100}, {1000, 1000000, 10000000}, {true});
RAFT_BENCH_REGISTER((Gather<float, uint32_t, int64_t>), "Host", inputs_host);

} // namespace raft::bench::matrix
112 changes: 112 additions & 0 deletions cpp/bench/prims/random/subsample.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <common/benchmark.hpp>

#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_resources.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/operators.hpp>
#include <raft/random/permute.cuh>
#include <raft/random/rng.cuh>
#include <raft/random/sample_without_replacement.cuh>
#include <raft/spatial/knn/detail/ann_utils.cuh>
#include <raft/util/cudart_utils.hpp>

#include <rmm/device_scalar.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <cub/cub.cuh>

namespace raft::bench::random {

struct sample_inputs {
int n_samples;
int n_train;
int method;
}; // struct sample_inputs

inline auto operator<<(std::ostream& os, const sample_inputs& p) -> std::ostream&
{
os << p.n_samples << "#" << p.n_train << "#" << p.method;
return os;
}

// Sample with replacement. We use this as a baseline.
template <typename IdxT>
auto bernoulli_subsample(raft::resources const& res, IdxT n_samples, IdxT n_subsamples, int seed)
-> raft::device_vector<IdxT, IdxT>
{
RAFT_EXPECTS(n_subsamples <= n_samples, "Cannot have more training samples than dataset vectors");

auto indices = raft::make_device_vector<IdxT, IdxT>(res, n_subsamples);
raft::random::RngState state(123456ULL);
raft::random::uniformInt(
res, state, indices.data_handle(), n_subsamples, IdxT(0), IdxT(n_samples));
return indices;
}

template <typename T>
struct sample : public fixture {
sample(const sample_inputs& p)
: params(p),
old_mr(rmm::mr::get_current_device_resource()),
pool_mr(rmm::mr::get_current_device_resource(), 2 * GiB),
in(make_device_vector<T, int64_t>(res, p.n_samples)),
out(make_device_vector<T, int64_t>(res, p.n_train))
{
rmm::mr::set_current_device_resource(&pool_mr);
raft::random::RngState r(123456ULL);
}

~sample() { rmm::mr::set_current_device_resource(old_mr); }
void run_benchmark(::benchmark::State& state) override
{
std::ostringstream label_stream;
label_stream << params;
state.SetLabel(label_stream.str());

raft::random::RngState r(123456ULL);
loop_on_state(state, [this, &r]() {
if (params.method == 1) {
this->out =
bernoulli_subsample<T>(this->res, this->params.n_samples, this->params.n_train, 137);
} else if (params.method == 2) {
this->out = raft::random::excess_subsample<T, int64_t>(
this->res, r, this->params.n_samples, this->params.n_train);
}
});
}

private:
float GiB = 1073741824.0f;
raft::device_resources res;
rmm::mr::device_memory_resource* old_mr;
rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource> pool_mr;
sample_inputs params;
raft::device_vector<T, int64_t> out, in;
}; // struct sample

const std::vector<sample_inputs> input_vecs = {{100000000, 10000000, 1},
{100000000, 50000000, 1},
{100000000, 100000000, 1},
{100000000, 10000000, 2},
{100000000, 50000000, 2},
{100000000, 100000000, 2}};

RAFT_BENCH_REGISTER(sample<int64_t>, "", input_vecs);

} // namespace raft::bench::random
87 changes: 87 additions & 0 deletions cpp/include/raft/matrix/detail/gather.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@

#pragma once

#include <raft/common/nvtx.hpp>
#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/core/host_mdarray.hpp>
#include <raft/core/host_mdspan.hpp>
#include <raft/core/operators.hpp>
#include <raft/core/pinned_mdarray.hpp>
#include <raft/core/pinned_mdspan.hpp>
#include <raft/util/cuda_dev_essentials.cuh>
#include <raft/util/cudart_utils.hpp>

#include <omp.h>

#include <functional>

namespace raft {
Expand Down Expand Up @@ -336,6 +346,83 @@ void gather_if(const InputIteratorT in,
gatherImpl(in, D, N, map, stencil, map_length, out, pred_op, transform_op, stream);
}

/**
* Helper function to gather a set of vectors from a (host) dataset.
*/
template <typename T, typename IdxT, typename MatIdxT = int64_t>
void gather_buff(host_matrix_view<const T, MatIdxT> dataset,
host_vector_view<const IdxT, MatIdxT> indices,
MatIdxT offset,
pinned_matrix_view<T, MatIdxT> buff)
{
raft::common::nvtx::range<common::nvtx::domain::raft> fun_scope("gather_host_buff");
IdxT batch_size = std::min<IdxT>(buff.extent(0), indices.extent(0) - offset);

#pragma omp for
for (IdxT i = 0; i < batch_size; i++) {
IdxT in_idx = indices(offset + i);
for (IdxT k = 0; k < buff.extent(1); k++) {
buff(i, k) = dataset(in_idx, k);
}
}
}

template <typename T, typename IdxT, typename MatIdxT = int64_t>
void gather(raft::resources const& res,
host_matrix_view<const T, MatIdxT> dataset,
device_vector_view<const IdxT, MatIdxT> indices,
raft::device_matrix_view<T, MatIdxT> output)
{
raft::common::nvtx::range<common::nvtx::domain::raft> fun_scope("gather");
IdxT n_dim = output.extent(1);
IdxT n_train = output.extent(0);
auto indices_host = raft::make_host_vector<IdxT, MatIdxT>(n_train);
raft::copy(
indices_host.data_handle(), indices.data_handle(), n_train, resource::get_cuda_stream(res));
resource::sync_stream(res);

const size_t buffer_size = 32768 * 1024; // bytes
const size_t max_batch_size =
std::min<size_t>(round_up_safe<size_t>(buffer_size / n_dim, 32), n_train);
RAFT_LOG_DEBUG("Gathering data with batch size %zu", max_batch_size);

// Gather the vector on the host in tmp buffers. We use two buffers to overlap H2D sync
// and gathering the data.
auto out_tmp1 = raft::make_pinned_matrix<T, MatIdxT>(res, max_batch_size, n_dim);
auto out_tmp2 = raft::make_pinned_matrix<T, MatIdxT>(res, max_batch_size, n_dim);

// Usually a limited number of threads provide sufficient bandwidth for gathering data.
int n_threads = std::min(omp_get_max_threads(), 32);

// The gather_buff function has a parallel for loop. We start the the omp parallel
// region here, to avoid repeated overhead within the device_offset loop.
#pragma omp parallel num_threads(n_threads)
{
auto view1 = out_tmp1.view();
auto view2 = out_tmp2.view();
gather_buff(dataset, make_const_mdspan(indices_host.view()), (MatIdxT)0, view1);
for (MatIdxT device_offset = 0; device_offset < n_train; device_offset += max_batch_size) {
MatIdxT batch_size = std::min<IdxT>(max_batch_size, n_train - device_offset);

#pragma omp master
raft::copy(output.data_handle() + device_offset * n_dim,
view1.data_handle(),
batch_size * n_dim,
resource::get_cuda_stream(res));
// Start gathering the next batch on the host.
MatIdxT host_offset = device_offset + batch_size;
batch_size = std::min<IdxT>(max_batch_size, n_train - host_offset);
if (batch_size > 0) {
gather_buff(dataset, make_const_mdspan(indices_host.view()), host_offset, view2);
}
#pragma omp master
resource::sync_stream(res);
#pragma omp barrier
std::swap(view1, view2);
}
}
}

} // namespace detail
} // namespace matrix
} // namespace raft
57 changes: 57 additions & 0 deletions cpp/include/raft/matrix/detail/sample_rows.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <raft/core/device_mdarray.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/core/logger.hpp>
#include <raft/core/resources.hpp>
#include <raft/matrix/gather.cuh>
#include <raft/random/rng.cuh>
#include <raft/util/cuda_utils.cuh>
#include <raft/util/cudart_utils.hpp>

namespace raft::matrix::detail {

/** Select rows randomly from input and copy to output. */
template <typename T, typename IdxT = int64_t>
void sample_rows(raft::resources const& res,
random::RngState random_state,
const T* input,
IdxT n_rows_input,
raft::device_matrix_view<T, IdxT> output)
{
IdxT n_dim = output.extent(1);
IdxT n_samples = output.extent(0);

raft::device_vector<IdxT, IdxT> train_indices =
raft::random::excess_subsample<IdxT, int64_t>(res, random_state, n_rows_input, n_samples);

cudaPointerAttributes attr;
RAFT_CUDA_TRY(cudaPointerGetAttributes(&attr, input));
T* ptr = reinterpret_cast<T*>(attr.devicePointer);
if (ptr != nullptr) {
raft::matrix::gather(res,
raft::make_device_matrix_view<const T, IdxT>(ptr, n_rows_input, n_dim),
raft::make_const_mdspan(train_indices.view()),
output);
} else {
auto dataset = raft::make_host_matrix_view<const T, IdxT>(input, n_rows_input, n_dim);
raft::matrix::detail::gather(res, dataset, make_const_mdspan(train_indices.view()), output);
}
}
} // namespace raft::matrix::detail
Loading

0 comments on commit 0b9692b

Please sign in to comment.