Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Miscellaneous tech debts/cleanups #286

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f774b0b
Replace RAFT buffer adapter with RMM buffer
viclafargue Jun 29, 2021
360bebb
Remove references to RAFT allocator adapter
viclafargue Jun 30, 2021
33ea321
Merge branch-21.08
viclafargue Jun 30, 2021
0a5cbc5
Merge branch-21.08
viclafargue Jun 30, 2021
565cf68
RAFT handle update
viclafargue Jul 1, 2021
eb1253a
Use raft::handle_t::get_thrust_policy to create thrust policy
viclafargue Jul 1, 2021
0bc3d2b
Use of rmm::device_scalar
viclafargue Jul 2, 2021
f6fe37a
Use rmm::exec_policy instead of thrust::cuda::par.on + remove rmm::de…
viclafargue Jul 5, 2021
1b5dbfd
Update raft::allocate to use RMM
viclafargue Jul 7, 2021
e571293
RAFT alloc + dealloc helper system
viclafargue Jul 7, 2021
192882a
Update handle exec policy
viclafargue Aug 3, 2021
7725a80
Merge branch-21.10
viclafargue Aug 3, 2021
e13696e
Small updates
viclafargue Aug 4, 2021
afd1a1b
Use of CUDA stream view
viclafargue Aug 4, 2021
e552d7d
Merge branch-21.10
viclafargue Aug 5, 2021
598463f
Apply requested changes
viclafargue Aug 10, 2021
819f698
Completing adoption of rmm::exec_policy
viclafargue Aug 10, 2021
f990cf9
get_rmm.cmake beck to default
viclafargue Aug 10, 2021
e24bb50
Merge branch 'branch-21.10' into miscellaneous-raft-updates
viclafargue Aug 10, 2021
5f2b781
Merge changes in #283
viclafargue Aug 16, 2021
be9b5bb
Answer requested changes
viclafargue Aug 16, 2021
c9e0fa8
Replace stream.value() occurences
viclafargue Aug 18, 2021
c2a3fbd
Update RAFT handle
viclafargue Aug 19, 2021
d372e74
Merge branch 'branch-21.10' into miscellaneous-raft-updates
viclafargue Aug 24, 2021
77b7043
Merge branch-21.10
viclafargue Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_rmm.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ endfunction()

set(RAFT_MIN_VERSION_rmm "${RAFT_VERSION_MAJOR}.${RAFT_VERSION_MINOR}.00")

find_and_configure_rmm(${RAFT_MIN_VERSION_rmm})
find_and_configure_rmm(${RAFT_MIN_VERSION_rmm})
4 changes: 2 additions & 2 deletions cpp/include/raft/common/cub_wrappers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#pragma once

#include <cub/cub.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {

Expand All @@ -34,7 +34,7 @@ namespace raft {
* @param stream cuda stream
*/
template <typename KeyT, typename ValueT>
void sortPairs(raft::mr::device::buffer<char> &workspace, const KeyT *inKeys,
void sortPairs(rmm::device_uvector<char> &workspace, const KeyT *inKeys,
KeyT *outKeys, const ValueT *inVals, ValueT *outVals, int len,
cudaStream_t stream) {
size_t worksize;
Expand Down
10 changes: 4 additions & 6 deletions cpp/include/raft/comms/helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ namespace comms {
*/
void build_comms_nccl_only(handle_t *handle, ncclComm_t nccl_comm,
int num_ranks, int rank) {
auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, num_ranks, rank, d_alloc, stream)));
new raft::comms::std_comms(nccl_comm, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down Expand Up @@ -80,12 +79,11 @@ void build_comms_nccl_ucx(handle_t *handle, ncclComm_t nccl_comm,
}
}

auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, (ucp_worker_h)ucp_worker, eps_sp,
num_ranks, rank, d_alloc, stream)));
auto communicator = std::make_shared<comms_t>(
std::unique_ptr<comms_iface>(new raft::comms::std_comms(
nccl_comm, (ucp_worker_h)ucp_worker, eps_sp, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down
40 changes: 15 additions & 25 deletions cpp/include/raft/comms/std_comms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#include <raft/comms/ucp_helper.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

#include <raft/error.hpp>

Expand Down Expand Up @@ -64,17 +64,16 @@ class std_comms : public comms_iface {
*/
std_comms(ncclComm_t nccl_comm, ucp_worker_h ucp_worker,
std::shared_ptr<ucp_ep_h *> eps, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream, bool subcomms_ucp = true)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(subcomms_ucp),
ucp_worker_(ucp_worker),
ucp_eps_(eps),
next_request_id_(0),
device_allocator_(device_allocator) {
next_request_id_(0) {
initialize();
};

Expand All @@ -85,36 +84,28 @@ class std_comms : public comms_iface {
* @param rank rank of the current worker
*/
std_comms(const ncclComm_t nccl_comm, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(false),
device_allocator_(device_allocator) {
subcomms_ucp_(false) {
initialize();
};

virtual ~std_comms() {
device_allocator_->deallocate(sendbuff_, sizeof(int), stream_);
device_allocator_->deallocate(recvbuff_, sizeof(int), stream_);
}

void initialize() {
sendbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
recvbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
sendbuff_ = status_.data();
recvbuff_ = status_.data() + 1;
}

int get_size() const { return num_ranks_; }

int get_rank() const { return rank_; }

std::unique_ptr<comms_iface> comm_split(int color, int key) const {
mr::device::buffer<int> d_colors(device_allocator_, stream_, get_size());
mr::device::buffer<int> d_keys(device_allocator_, stream_, get_size());
rmm::device_uvector<int> d_colors(get_size(), stream_);
rmm::device_uvector<int> d_keys(get_size(), stream_);

update_device(d_colors.data() + get_rank(), &color, 1, stream_);
update_device(d_keys.data() + get_rank(), &key, 1, stream_);
Expand Down Expand Up @@ -167,12 +158,12 @@ class std_comms : public comms_iface {

if (ucp_worker_ != nullptr && subcomms_ucp_) {
auto eps_sp = std::make_shared<ucp_ep_h *>(new_ucx_ptrs.data());
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp, subcomm_ranks.size(), key,
device_allocator_, stream_, subcomms_ucp_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp,
subcomm_ranks.size(), key, stream_, subcomms_ucp_));
} else {
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, subcomm_ranks.size(), key, device_allocator_, stream_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, subcomm_ranks.size(), key, stream_));
}
}

Expand Down Expand Up @@ -465,6 +456,7 @@ class std_comms : public comms_iface {
cudaStream_t stream_;

int *sendbuff_, *recvbuff_;
rmm::device_uvector<int> status_;

int num_ranks_;
int rank_;
Expand All @@ -478,8 +470,6 @@ class std_comms : public comms_iface {
mutable std::unordered_map<request_t, struct ucp_request *>
requests_in_flight_;
mutable std::unordered_set<request_t> free_requests_;

std::shared_ptr<mr::device::allocator> device_allocator_;
};
} // end namespace comms
} // end namespace raft
41 changes: 13 additions & 28 deletions cpp/include/raft/comms/test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <raft/comms/comms.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>

Expand All @@ -44,8 +43,7 @@ bool test_collective_allreduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);
CUDA_CHECK(
cudaMemcpyAsync(temp_d.data(), &send, 1, cudaMemcpyHostToDevice, stream));

Expand Down Expand Up @@ -76,8 +74,7 @@ bool test_collective_broadcast(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);

if (communicator.get_rank() == root)
CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
Expand All @@ -104,8 +101,7 @@ bool test_collective_reduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_scalar<int> temp_d(stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -134,11 +130,8 @@ bool test_collective_allgather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
communicator.get_size());
rmm::device_scalar<int> temp_d(stream);
rmm::device_uvector<int> recv_d(communicator.get_size(), stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -169,12 +162,9 @@ bool test_collective_gather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? communicator.get_size() : 0);
rmm::device_scalar<int> temp_d(stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? communicator.get_size() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -211,12 +201,9 @@ bool test_collective_gatherv(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(sends.size(), stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? displacements.back() : 0);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? displacements.back() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down Expand Up @@ -256,10 +243,8 @@ bool test_collective_reducescatter(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream,
sends.size());
raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
1);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_scalar<int> recv_d(stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down
46 changes: 38 additions & 8 deletions cpp/include/raft/cudart_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#include <raft/error.hpp>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <cuda_runtime.h>

Expand All @@ -25,6 +27,8 @@
#include <cstdio>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <unordered_map>

///@todo: enable once logging has been enabled in raft
//#include "logger.hpp"
Expand Down Expand Up @@ -200,7 +204,8 @@ class grid_1d_block_t {
* @param stream cuda stream
*/
template <typename Type>
void copy(Type* dst, const Type* src, size_t len, cudaStream_t stream) {
void copy(Type* dst, const Type* src, size_t len,
rmm::cuda_stream_view stream) {
CUDA_CHECK(
cudaMemcpyAsync(dst, src, len * sizeof(Type), cudaMemcpyDefault, stream));
}
Expand All @@ -214,20 +219,20 @@ void copy(Type* dst, const Type* src, size_t len, cudaStream_t stream) {
/** performs a host to device copy */
template <typename Type>
void update_device(Type* d_ptr, const Type* h_ptr, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
copy(d_ptr, h_ptr, len, stream);
}

/** performs a device to host copy */
template <typename Type>
void update_host(Type* h_ptr, const Type* d_ptr, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
copy(h_ptr, d_ptr, len, stream);
}

template <typename Type>
void copy_async(Type* d_ptr1, const Type* d_ptr2, size_t len,
cudaStream_t stream) {
rmm::cuda_stream_view stream) {
CUDA_CHECK(cudaMemcpyAsync(d_ptr1, d_ptr2, len * sizeof(Type),
cudaMemcpyDeviceToDevice, stream));
}
Expand Down Expand Up @@ -259,11 +264,36 @@ void print_device_vector(const char* variable_name, const T* devMem,
}
/** @} */

/** cuda malloc */
static std::mutex mutex_;
static std::unordered_map<void*, size_t> allocations;

template <typename Type>
void allocate(Type*& ptr, size_t len, rmm::cuda_stream_view stream,
bool setZero = false) {
size_t size = len * sizeof(Type);
ptr = (Type*)rmm::mr::get_current_device_resource()->allocate(size, stream);
if (setZero) CUDA_CHECK(cudaMemsetAsync((void*)ptr, 0, size, stream));

std::lock_guard<std::mutex> _(mutex_);
allocations[ptr] = size;
}

template <typename Type>
void allocate(Type*& ptr, size_t len, bool setZero = false) {
CUDA_CHECK(cudaMalloc((void**)&ptr, sizeof(Type) * len));
if (setZero) CUDA_CHECK(cudaMemset(ptr, 0, sizeof(Type) * len));
void deallocate(Type*& ptr, rmm::cuda_stream_view stream) {
std::lock_guard<std::mutex> _(mutex_);
size_t size = allocations[ptr];
allocations.erase(ptr);
rmm::mr::get_current_device_resource()->deallocate((void*)ptr, size, stream);
}

inline void deallocate_all(rmm::cuda_stream_view stream) {
std::lock_guard<std::mutex> _(mutex_);
for (auto& alloc : allocations) {
void* ptr = alloc.first;
size_t size = alloc.second;
rmm::mr::get_current_device_resource()->deallocate(ptr, size, stream);
}
allocations.clear();
}

/** helper method to get max usable shared mem per block parameter */
Expand Down
7 changes: 3 additions & 4 deletions cpp/include/raft/distance/distance.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <raft/distance/l1.cuh>
#include <raft/distance/minkowski.cuh>
#include <raft/distance/russell_rao.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {
namespace distance {
Expand Down Expand Up @@ -376,7 +376,7 @@ void distance(const InType *x, const InType *y, OutType *dist, Index_ m,
template <typename Type, typename Index_, raft::distance::DistanceType DistType>
void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
rmm::device_uvector<char> &workspace,
cudaStream_t stream, bool isRowMajor,
Type metric_arg = 2.0f) {
auto worksize =
Expand All @@ -389,8 +389,7 @@ void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,

template <typename Type, typename Index_ = int>
void pairwise_distance(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
Index_ n, Index_ k, rmm::device_uvector<char> &workspace,
raft::distance::DistanceType metric, cudaStream_t stream,
bool isRowMajor = true, Type metric_arg = 2.0f) {
switch (metric) {
Expand Down
Loading