Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Miscellaneous tech debts/cleanups #286

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f774b0b
Replace RAFT buffer adapter with RMM buffer
viclafargue Jun 29, 2021
360bebb
Remove references to RAFT allocator adapter
viclafargue Jun 30, 2021
33ea321
Merge branch-21.08
viclafargue Jun 30, 2021
0a5cbc5
Merge branch-21.08
viclafargue Jun 30, 2021
565cf68
RAFT handle update
viclafargue Jul 1, 2021
eb1253a
Use raft::handle_t::get_thrust_policy to create thrust policy
viclafargue Jul 1, 2021
0bc3d2b
Use of rmm::device_scalar
viclafargue Jul 2, 2021
f6fe37a
Use rmm::exec_policy instead of thrust::cuda::par.on + remove rmm::de…
viclafargue Jul 5, 2021
1b5dbfd
Update raft::allocate to use RMM
viclafargue Jul 7, 2021
e571293
RAFT alloc + dealloc helper system
viclafargue Jul 7, 2021
192882a
Update handle exec policy
viclafargue Aug 3, 2021
7725a80
Merge branch-21.10
viclafargue Aug 3, 2021
e13696e
Small updates
viclafargue Aug 4, 2021
afd1a1b
Use of CUDA stream view
viclafargue Aug 4, 2021
e552d7d
Merge branch-21.10
viclafargue Aug 5, 2021
598463f
Apply requested changes
viclafargue Aug 10, 2021
819f698
Completing adoption of rmm::exec_policy
viclafargue Aug 10, 2021
f990cf9
get_rmm.cmake beck to default
viclafargue Aug 10, 2021
e24bb50
Merge branch 'branch-21.10' into miscellaneous-raft-updates
viclafargue Aug 10, 2021
5f2b781
Merge changes in #283
viclafargue Aug 16, 2021
be9b5bb
Answer requested changes
viclafargue Aug 16, 2021
c9e0fa8
Replace stream.value() occurences
viclafargue Aug 18, 2021
c2a3fbd
Update RAFT handle
viclafargue Aug 19, 2021
d372e74
Merge branch 'branch-21.10' into miscellaneous-raft-updates
viclafargue Aug 24, 2021
77b7043
Merge branch-21.10
viclafargue Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/include/raft/common/cub_wrappers.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#pragma once

#include <cub/cub.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {

Expand All @@ -34,7 +34,7 @@ namespace raft {
* @param stream cuda stream
*/
template <typename KeyT, typename ValueT>
void sortPairs(raft::mr::device::buffer<char> &workspace, const KeyT *inKeys,
void sortPairs(rmm::device_uvector<char> &workspace, const KeyT *inKeys,
KeyT *outKeys, const ValueT *inVals, ValueT *outVals, int len,
cudaStream_t stream) {
size_t worksize;
Expand Down
10 changes: 4 additions & 6 deletions cpp/include/raft/comms/helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ namespace comms {
*/
void build_comms_nccl_only(handle_t *handle, ncclComm_t nccl_comm,
int num_ranks, int rank) {
auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, num_ranks, rank, d_alloc, stream)));
new raft::comms::std_comms(nccl_comm, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down Expand Up @@ -80,12 +79,11 @@ void build_comms_nccl_ucx(handle_t *handle, ncclComm_t nccl_comm,
}
}

auto d_alloc = handle->get_device_allocator();
cudaStream_t stream = handle->get_stream();

auto communicator = std::make_shared<comms_t>(std::unique_ptr<comms_iface>(
new raft::comms::std_comms(nccl_comm, (ucp_worker_h)ucp_worker, eps_sp,
num_ranks, rank, d_alloc, stream)));
auto communicator = std::make_shared<comms_t>(
std::unique_ptr<comms_iface>(new raft::comms::std_comms(
nccl_comm, (ucp_worker_h)ucp_worker, eps_sp, num_ranks, rank, stream)));
handle->set_comms(communicator);
}

Expand Down
40 changes: 15 additions & 25 deletions cpp/include/raft/comms/std_comms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

#include <raft/comms/ucp_helper.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

#include <raft/error.hpp>

Expand Down Expand Up @@ -64,17 +64,16 @@ class std_comms : public comms_iface {
*/
std_comms(ncclComm_t nccl_comm, ucp_worker_h ucp_worker,
std::shared_ptr<ucp_ep_h *> eps, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream, bool subcomms_ucp = true)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(subcomms_ucp),
ucp_worker_(ucp_worker),
ucp_eps_(eps),
next_request_id_(0),
device_allocator_(device_allocator) {
next_request_id_(0) {
initialize();
};

Expand All @@ -85,36 +84,28 @@ class std_comms : public comms_iface {
* @param rank rank of the current worker
*/
std_comms(const ncclComm_t nccl_comm, int num_ranks, int rank,
const std::shared_ptr<mr::device::allocator> device_allocator,
cudaStream_t stream)
: nccl_comm_(nccl_comm),
stream_(stream),
status_(2, stream),
num_ranks_(num_ranks),
rank_(rank),
subcomms_ucp_(false),
device_allocator_(device_allocator) {
subcomms_ucp_(false) {
initialize();
};

virtual ~std_comms() {
device_allocator_->deallocate(sendbuff_, sizeof(int), stream_);
device_allocator_->deallocate(recvbuff_, sizeof(int), stream_);
}

void initialize() {
sendbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
recvbuff_ = reinterpret_cast<int *>(
device_allocator_->allocate(sizeof(int), stream_));
sendbuff_ = status_.data();
recvbuff_ = status_.data() + 1;
}

int get_size() const { return num_ranks_; }

int get_rank() const { return rank_; }

std::unique_ptr<comms_iface> comm_split(int color, int key) const {
mr::device::buffer<int> d_colors(device_allocator_, stream_, get_size());
mr::device::buffer<int> d_keys(device_allocator_, stream_, get_size());
rmm::device_uvector<int> d_colors(get_size(), stream_);
rmm::device_uvector<int> d_keys(get_size(), stream_);

update_device(d_colors.data() + get_rank(), &color, 1, stream_);
update_device(d_keys.data() + get_rank(), &key, 1, stream_);
Expand Down Expand Up @@ -167,12 +158,12 @@ class std_comms : public comms_iface {

if (ucp_worker_ != nullptr && subcomms_ucp_) {
auto eps_sp = std::make_shared<ucp_ep_h *>(new_ucx_ptrs.data());
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp, subcomm_ranks.size(), key,
device_allocator_, stream_, subcomms_ucp_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, (ucp_worker_h)ucp_worker_, eps_sp,
subcomm_ranks.size(), key, stream_, subcomms_ucp_));
} else {
return std::unique_ptr<comms_iface>(new std_comms(
nccl_comm, subcomm_ranks.size(), key, device_allocator_, stream_));
return std::unique_ptr<comms_iface>(
new std_comms(nccl_comm, subcomm_ranks.size(), key, stream_));
}
}

Expand Down Expand Up @@ -465,6 +456,7 @@ class std_comms : public comms_iface {
cudaStream_t stream_;

int *sendbuff_, *recvbuff_;
rmm::device_uvector<int> status_;

int num_ranks_;
int rank_;
Expand All @@ -478,8 +470,6 @@ class std_comms : public comms_iface {
mutable std::unordered_map<request_t, struct ucp_request *>
requests_in_flight_;
mutable std::unordered_set<request_t> free_requests_;

std::shared_ptr<mr::device::allocator> device_allocator_;
};
} // end namespace comms
} // end namespace raft
41 changes: 13 additions & 28 deletions cpp/include/raft/comms/test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

#include <raft/comms/comms.hpp>
#include <raft/handle.hpp>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_scalar.hpp>
#include <rmm/device_uvector.hpp>

Expand All @@ -44,8 +43,7 @@ bool test_collective_allreduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_uvector<int> temp_d(1, stream);
CUDA_CHECK(
cudaMemcpyAsync(temp_d.data(), &send, 1, cudaMemcpyHostToDevice, stream));

Expand Down Expand Up @@ -76,8 +74,7 @@ bool test_collective_broadcast(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_uvector<int> temp_d(1, stream);

if (communicator.get_rank() == root)
CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
Expand All @@ -104,8 +101,7 @@ bool test_collective_reduce(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);
rmm::device_uvector<int> temp_d(1, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -134,11 +130,8 @@ bool test_collective_allgather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
communicator.get_size());
rmm::device_uvector<int> temp_d(1, stream);
rmm::device_uvector<int> recv_d(communicator.get_size(), stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -169,12 +162,9 @@ bool test_collective_gather(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(1, stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? communicator.get_size() : 0);
rmm::device_uvector<int> temp_d(1, stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? communicator.get_size() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), &send, sizeof(int),
cudaMemcpyHostToDevice, stream));
Expand Down Expand Up @@ -211,12 +201,9 @@ bool test_collective_gatherv(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream);
temp_d.resize(sends.size(), stream);

raft::mr::device::buffer<int> recv_d(
handle.get_device_allocator(), stream,
communicator.get_rank() == root ? displacements.back() : 0);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_uvector<int> recv_d(
communicator.get_rank() == root ? displacements.back() : 0, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down Expand Up @@ -256,10 +243,8 @@ bool test_collective_reducescatter(const handle_t &handle, int root) {

cudaStream_t stream = handle.get_stream();

raft::mr::device::buffer<int> temp_d(handle.get_device_allocator(), stream,
sends.size());
raft::mr::device::buffer<int> recv_d(handle.get_device_allocator(), stream,
1);
rmm::device_uvector<int> temp_d(sends.size(), stream);
rmm::device_uvector<int> recv_d(1, stream);

CUDA_CHECK(cudaMemcpyAsync(temp_d.data(), sends.data(),
sends.size() * sizeof(int), cudaMemcpyHostToDevice,
Expand Down
7 changes: 3 additions & 4 deletions cpp/include/raft/distance/distance.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <raft/distance/cosine.cuh>
#include <raft/distance/euclidean.cuh>
#include <raft/distance/l1.cuh>
#include <raft/mr/device/buffer.hpp>
#include <rmm/device_uvector.hpp>

namespace raft {
namespace distance {
Expand Down Expand Up @@ -243,7 +243,7 @@ void distance(const InType *x, const InType *y, OutType *dist, Index_ m,
template <typename Type, typename Index_, raft::distance::DistanceType DistType>
void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
rmm::device_uvector<char> &workspace,
cudaStream_t stream, bool isRowMajor) {
auto worksize =
getWorkspaceSize<DistType, Type, Type, Type, Index_>(x, y, m, n, k);
Expand All @@ -254,8 +254,7 @@ void pairwise_distance_impl(const Type *x, const Type *y, Type *dist, Index_ m,

template <typename Type, typename Index_ = int>
void pairwise_distance(const Type *x, const Type *y, Type *dist, Index_ m,
Index_ n, Index_ k,
raft::mr::device::buffer<char> &workspace,
Index_ n, Index_ k, rmm::device_uvector<char> &workspace,
raft::distance::DistanceType metric, cudaStream_t stream,
bool isRowMajor = true) {
switch (metric) {
Expand Down
52 changes: 28 additions & 24 deletions cpp/include/raft/handle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
#include <raft/linalg/cusolver_wrappers.h>
#include <raft/sparse/cusparse_wrappers.h>
#include <raft/comms/comms.hpp>
#include <raft/mr/device/allocator.hpp>
#include <raft/mr/host/allocator.hpp>
#include <rmm/cuda_stream_pool.hpp>
#include <rmm/exec_policy.hpp>
#include "cudart_utils.h"

namespace raft {
Expand All @@ -48,6 +47,9 @@ namespace raft {
* necessary cuda kernels and/or libraries
*/
class handle_t {
using thrust_exec_policy_t = thrust::detail::execute_with_allocator<
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
rmm::mr::thrust_allocator<char>, thrust::cuda_cub::execute_on_stream_base>;

private:
static constexpr int kNumDefaultWorkerStreams = 0;

Expand All @@ -63,9 +65,7 @@ class handle_t {
CUDA_CHECK(cudaGetDevice(&cur_dev));
return cur_dev;
}()),
streams_(n_streams),
device_allocator_(std::make_shared<mr::device::default_allocator>()),
host_allocator_(std::make_shared<mr::host::default_allocator>()) {
streams_(n_streams) {
create_resources();
}

Expand All @@ -86,8 +86,6 @@ class handle_t {
"ERROR: the main handle must have at least one worker stream\n");
prop_ = other.get_device_properties();
device_prop_initialized_ = true;
device_allocator_ = other.get_device_allocator();
host_allocator_ = other.get_host_allocator();
create_resources();
set_stream(other.get_internal_stream(stream_id));
}
Expand All @@ -97,26 +95,15 @@ class handle_t {

int get_device() const { return dev_id_; }

void set_stream(cudaStream_t stream) { user_stream_ = stream; }
void set_stream(cudaStream_t stream) {
thrust_policy_initialized_ = false;
user_stream_ = stream;
}
cudaStream_t get_stream() const { return user_stream_; }
rmm::cuda_stream_view get_stream_view() const {
return rmm::cuda_stream_view(user_stream_);
}

void set_device_allocator(std::shared_ptr<mr::device::allocator> allocator) {
device_allocator_ = allocator;
}
std::shared_ptr<mr::device::allocator> get_device_allocator() const {
return device_allocator_;
}

void set_host_allocator(std::shared_ptr<mr::host::allocator> allocator) {
host_allocator_ = allocator;
}
std::shared_ptr<mr::host::allocator> get_host_allocator() const {
return host_allocator_;
}

cublasHandle_t get_cublas_handle() const {
std::lock_guard<std::mutex> _(mutex_);
if (!cublas_initialized_) {
Expand Down Expand Up @@ -153,6 +140,23 @@ class handle_t {
return cusparse_handle_;
}

thrust_exec_policy_t get_thrust_policy() const {
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
std::lock_guard<std::mutex> _(mutex_);
if (!thrust_policy_initialized_) {
if (!thrust_policy_) {
thrust_policy_ =
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
(thrust_exec_policy_t*)malloc(sizeof(thrust_exec_policy_t));
}
*thrust_policy_ = rmm::exec_policy(this->get_stream());
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
thrust_policy_initialized_ = true;
}
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
return *thrust_policy_;
}

thrust_exec_policy_t get_thrust_policy(cudaStream_t stream) const {
return rmm::exec_policy(stream);
}

// legacy compatibility for cuML
cudaStream_t get_internal_stream(int sid) const {
return streams_.get_stream(sid).value();
Expand Down Expand Up @@ -236,8 +240,8 @@ class handle_t {
mutable bool cusolver_sp_initialized_{false};
mutable cusparseHandle_t cusparse_handle_;
mutable bool cusparse_initialized_{false};
std::shared_ptr<mr::device::allocator> device_allocator_;
std::shared_ptr<mr::host::allocator> host_allocator_;
mutable thrust_exec_policy_t* thrust_policy_{nullptr};
viclafargue marked this conversation as resolved.
Show resolved Hide resolved
mutable bool thrust_policy_initialized_{false};
cudaStream_t user_stream_{nullptr};
cudaEvent_t event_;
mutable cudaDeviceProp prop_;
Expand Down
Loading