Skip to content

Commit

Permalink
Miscellaneous tech debts/cleanups (#286)
Browse files Browse the repository at this point in the history
Miscellaneous updates to solve tech debts in RAFT :
- [x] Removal of handle host and device allocators
- [x] Addition of a `get_thrust_policy` method to the handle
- [x] Usage of `get_thrust_policy` where handle is available
- [x] Removal of `rmm::device_vector`
- [x] Use of RMM device allocator in the `raft::allocate` function
- [x] Creation of an allocation + deallocation helper system
- [x] Usage of `rmm::exec_policy` instead of `thrust::cuda::par.on` when no handle is available

Authors:
  - Victor Lafargue (https://github.com/viclafargue)

Approvers:
  - Dante Gama Dessavre (https://github.com/dantegd)

URL: #286
  • Loading branch information
viclafargue authored Aug 27, 2021
1 parent aab9b95 commit 1c4e1e6
Show file tree
Hide file tree
Showing 123 changed files with 1,114 additions and 1,583 deletions.
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_rmm.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ endfunction()

set(RAFT_MIN_VERSION_rmm "${RAFT_VERSION_MAJOR}.${RAFT_VERSION_MINOR}.00")

find_and_configure_rmm(${RAFT_MIN_VERSION_rmm})
find_and_configure_rmm(${RAFT_MIN_VERSION_rmm})
4 changes: 2 additions & 2 deletions cpp/include/raft/common/cub_wrappers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#pragma once

#include <cub/cub.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {

Expand All @@ -34,7 +34,7 @@ namespace raft {
* @param stream cuda stream
*/
template <typename KeyT, typename ValueT>
void sortPairs(raft::mr::device::buffer<char> &workspace, const KeyT *inKeys,
void sortPairs(rmm::device_uvector<char> &workspace, const KeyT *inKeys,
KeyT *outKeys, const ValueT *inVals, ValueT *outVals, int len,
cudaStream_t stream) {
size_t worksize;
Expand Down
10 changes: 4 additions & 6 deletions cpp/include/raft/comms/helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ namespace comms {
*/
void build_comms_nccl_only(handle_t *handle, ncclComm_t nccl_comm,
int num_ranks, int rank) {
auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, num_ranks, rank, d_alloc, stream)));
new raft::comms::std_comms(nccl_comm, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down Expand Up @@ -80,12 +79,11 @@ void build_comms_nccl_ucx(handle_t *handle, ncclComm_t nccl_comm,
}
}

auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, (ucp_worker_h)ucp_worker, eps_sp,
num_ranks, rank, d_alloc, stream)));
auto communicator = std::make_shared<comms_t>(
std::unique_ptr<comms_iface>(new raft::comms::std_comms(
nccl_comm, (ucp_worker_h)ucp_worker, eps_sp, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down
40 changes: 15 additions & 25 deletions cpp/include/raft/comms/std_comms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#include <raft/comms/ucp_helper.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

#include <raft/error.hpp>

Expand Down Expand Up @@ -64,17 +64,16 @@ class std_comms : public comms_iface {
*/
std_comms(ncclComm_t nccl_comm, ucp_worker_h ucp_worker,
std::shared_ptr<ucp_ep_h *> eps, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream, bool subcomms_ucp = true)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(subcomms_ucp),
ucp_worker_(ucp_worker),
ucp_eps_(eps),
next_request_id_(0),
device_allocator_(device_allocator) {
next_request_id_(0) {
initialize();
};

Expand All @@ -85,36 +84,28 @@ class std_comms : public comms_iface {
* @param rank rank of the current worker
*/
std_comms(const ncclComm_t nccl_comm, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(false),
device_allocator_(device_allocator) {
subcomms_ucp_(false) {
initialize();
};

virtual ~std_comms() {
device_allocator_->deallocate(sendbuff_, sizeof(int), stream_);
device_allocator_->deallocate(recvbuff_, sizeof(int), stream_);
}

void initialize() {
sendbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
recvbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
sendbuff_ = status_.data();
recvbuff_ = status_.data() + 1;
}

int get_size() const { return num_ranks_; }

int get_rank() const { return rank_; }

std::unique_ptr<comms_iface> comm_split(int color, int key) const {
mr::device::buffer<int> d_colors(device_allocator_, stream_, get_size());
mr::device::buffer<int> d_keys(device_allocator_, stream_, get_size());
rmm::device_uvector<int> d_colors(get_size(), stream_);
rmm::device_uvector<int> d_keys(get_size(), stream_);

update_device(d_colors.data() + get_rank(), &color, 1, stream_);
update_device(d_keys.data() + get_rank(), &key, 1, stream_);
Expand Down Expand Up @@ -167,12 +158,12 @@ class std_comms : public comms_iface {

if (ucp_worker_ != nullptr && subcomms_ucp_) {
auto eps_sp = std::make_shared<ucp_ep_h *>(new_ucx_ptrs.data());
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp, subcomm_ranks.size(), key,
device_allocator_, stream_, subcomms_ucp_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp,
subcomm_ranks.size(), key, stream_, subcomms_ucp_));
} else {
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, subcomm_ranks.size(), key, device_allocator_, stream_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, subcomm_ranks.size(), key, stream_));
}
}

Expand Down Expand Up @@ -465,6 +456,7 @@ class std_comms : public comms_iface {
cudaStream_t stream_;

int *sendbuff_, *recvbuff_;
rmm::device_uvector<int> status_;

int num_ranks_;
int rank_;
Expand All @@ -478,8 +470,6 @@ class std_comms : public comms_iface {
mutable std::unordered_map<request_t, struct ucp_request *>
requests_in_flight_;
mutable std::unordered_set<request_t> free_requests_;

std::shared_ptr<mr::device::allocator> device_allocator_;
};
} // end namespace comms
} // end namespace raft
41 changes: 13 additions & 28 deletions cpp/include/raft/comms/test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <raft/comms/comms.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>

Expand All @@ -44,8 +43,7 @@ bool test_collective_allreduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);
CUDA_CHECK(
cudaMemcpyAsync(temp_d.data(), &send, 1, cudaMemcpyHostToDevice, stream));

Expand Down Expand Up @@ -76,8 +74,7 @@ bool test_collective_broadcast(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);

if (communicator.get_rank() == root)
CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
Expand All @@ -104,8 +101,7 @@ bool test_collective_reduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -134,11 +130,8 @@ bool test_collective_allgather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
communicator.get_size());
rmm::device_scalar<int> temp_d(stream);
rmm::device_uvector<int> recv_d(communicator.get_size(), stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -169,12 +162,9 @@ bool test_collective_gather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? communicator.get_size() : 0);
rmm::device_scalar<int> temp_d(stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? communicator.get_size() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -211,12 +201,9 @@ bool test_collective_gatherv(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(sends.size(), stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? displacements.back() : 0);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? displacements.back() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down Expand Up @@ -256,10 +243,8 @@ bool test_collective_reducescatter(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream,
sends.size());
raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
1);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_scalar<int> recv_d(stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down
46 changes: 38 additions & 8 deletions cpp/include/raft/cudart_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#include <raft/error.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <cuda_runtime.h>

Expand All @@ -25,6 +27,8 @@
#include <cstdio>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <unordered_map>

///@todo: enable once logging has been enabled in raft
//#include "logger.hpp"
Expand Down Expand Up @@ -200,7 +204,8 @@ class grid_1d_block_t {
* @param stream cuda stream
*/
template <typename Type>
void copy(Type* dst, const Type* src, size_t len, cudaStream_t stream) {
void copy(Type* dst, const Type* src, size_t len,
rmm::cuda_stream_view stream) {
CUDA_CHECK(
cudaMemcpyAsync(dst, src, len * sizeof(Type), cudaMemcpyDefault, stream));
}
Expand All @@ -214,20 +219,20 @@ void copy(Type* dst, const Type* src, size_t len, cudaStream_t stream) {
/** performs a host to device copy */
template <typename Type>
void update_device(Type* d_ptr, const Type* h_ptr, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
copy(d_ptr, h_ptr, len, stream);
}

/** performs a device to host copy */
template <typename Type>
void update_host(Type* h_ptr, const Type* d_ptr, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
copy(h_ptr, d_ptr, len, stream);
}

template <typename Type>
void copy_async(Type* d_ptr1, const Type* d_ptr2, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
CUDA_CHECK(cudaMemcpyAsync(d_ptr1, d_ptr2, len * sizeof(Type),
cudaMemcpyDeviceToDevice, stream));
}
Expand Down Expand Up @@ -259,11 +264,36 @@ void print_device_vector(const char* variable_name, const T* devMem,
}
/** @} */

/** cuda malloc */
static std::mutex mutex_;
static std::unordered_map<void*, size_t> allocations;

template <typename Type>
void allocate(Type*& ptr, size_t len, rmm::cuda_stream_view stream,
bool setZero = false) {
size_t size = len * sizeof(Type);
ptr = (Type*)rmm::mr::get_current_device_resource()->allocate(size, stream);
if (setZero) CUDA_CHECK(cudaMemsetAsync((void*)ptr, 0, size, stream));

std::lock_guard<std::mutex> _(mutex_);
allocations[ptr] = size;
}

template <typename Type>
void allocate(Type*& ptr, size_t len, bool setZero = false) {
CUDA_CHECK(cudaMalloc((void**)&ptr, sizeof(Type) * len));
if (setZero) CUDA_CHECK(cudaMemset(ptr, 0, sizeof(Type) * len));
void deallocate(Type*& ptr, rmm::cuda_stream_view stream) {
std::lock_guard<std::mutex> _(mutex_);
size_t size = allocations[ptr];
allocations.erase(ptr);
rmm::mr::get_current_device_resource()->deallocate((void*)ptr, size, stream);
}

inline void deallocate_all(rmm::cuda_stream_view stream) {
std::lock_guard<std::mutex> _(mutex_);
for (auto& alloc : allocations) {
void* ptr = alloc.first;
size_t size = alloc.second;
rmm::mr::get_current_device_resource()->deallocate(ptr, size, stream);
}
allocations.clear();
}

/** helper method to get max usable shared mem per block parameter */
Expand Down
7 changes: 3 additions & 4 deletions cpp/include/raft/distance/distance.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <raft/distance/l1.cuh>
#include <raft/distance/minkowski.cuh>
#include <raft/distance/russell_rao.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {
namespace distance {
Expand Down Expand Up @@ -376,7 +376,7 @@ void distance(const InType *x, const InType *y, OutType *dist, Index_ m,
template <typename Type, typename Index_, raft::distance::DistanceType DistType>
void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
rmm::device_uvector<char> &workspace,
cudaStream_t stream, bool isRowMajor,
Type metric_arg = 2.0f) {
auto worksize =
Expand All @@ -389,8 +389,7 @@ void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,

template <typename Type, typename Index_ = int>
void pairwise_distance(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
Index_ n, Index_ k, rmm::device_uvector<char> &workspace,
raft::distance::DistanceType metric, cudaStream_t stream,
bool isRowMajor = true, Type metric_arg = 2.0f) {
switch (metric) {
Expand Down
Loading

0 comments on commit 1c4e1e6

Please sign in to comment.