Skip to content

Commit

Permalink
Convert device_memory_resource* to device_async_resource_ref (#4365)
Browse files Browse the repository at this point in the history
Closes #4333

For reviewers:
Many of changes are simple textual replace of `rmm::mr::device_memory_resource *` with `rmm::device_async_resource_ref`. 

However, I found that `concurrent_unordered_map` used in cuGraph is not up to date with the
latest changes from the version in libcudf, so I copied those over as well. This gets rid
of the `hash_allocator.cuh`, and uses `rmm::mr::polymorphic_allocator` instead of the custom
allocator previously used. This obviates the need to update any `device_memory_resource*` used 
in the old custom allocator.

Authors:
  - Mark Harris (https://github.com/harrism)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)

URL: #4365
  • Loading branch information
harrism authored Apr 24, 2024
1 parent 450f987 commit 3393b06
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 178 deletions.
4 changes: 3 additions & 1 deletion cpp/include/cugraph/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <cugraph/legacy/graph.hpp>
#include <cugraph/legacy/internals.hpp>

#include <rmm/resource_ref.hpp>

#ifndef NO_CUGRAPH_OPS
#include <cugraph-ops/graph/sampling.hpp>
#endif
Expand Down Expand Up @@ -830,7 +832,7 @@ template <typename vertex_t, typename edge_t, typename weight_t>
std::unique_ptr<legacy::GraphCOO<vertex_t, edge_t, weight_t>> minimum_spanning_tree(
raft::handle_t const& handle,
legacy::GraphCSRView<vertex_t, edge_t, weight_t> const& graph,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

namespace subgraph {
/**
Expand Down
5 changes: 3 additions & 2 deletions cpp/include/cugraph/dendrogram.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
#pragma once

#include <rmm/device_uvector.hpp>
#include <rmm/resource_ref.hpp>

#include <memory>
#include <vector>
Expand All @@ -28,7 +29,7 @@ class Dendrogram {
void add_level(vertex_t first_index,
vertex_t num_verts,
rmm::cuda_stream_view stream_view,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
{
level_ptr_.push_back(
std::make_unique<rmm::device_uvector<vertex_t>>(num_verts, stream_view, mr));
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/cugraph/legacy/functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <raft/core/handle.hpp>

#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

namespace cugraph {

Expand All @@ -43,7 +44,7 @@ namespace cugraph {
template <typename VT, typename ET, typename WT>
std::unique_ptr<legacy::GraphCSR<VT, ET, WT>> coo_to_csr(
legacy::GraphCOOView<VT, ET, WT> const& graph,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
* @brief Broadcast using handle communicator
Expand Down
19 changes: 10 additions & 9 deletions cpp/include/cugraph/legacy/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <raft/core/handle.hpp>

#include <rmm/device_buffer.hpp>
#include <rmm/resource_ref.hpp>

#include <unistd.h>

Expand Down Expand Up @@ -349,9 +350,9 @@ class GraphCOO {
*/
GraphCOO(vertex_t number_of_vertices,
edge_t number_of_edges,
bool has_data = false,
cudaStream_t stream = nullptr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
bool has_data = false,
cudaStream_t stream = nullptr,
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
: number_of_vertices_p(number_of_vertices),
number_of_edges_p(number_of_edges),
src_indices_p(sizeof(vertex_t) * number_of_edges, stream, mr),
Expand All @@ -361,8 +362,8 @@ class GraphCOO {
}

GraphCOO(GraphCOOView<vertex_t, edge_t, weight_t> const& graph,
cudaStream_t stream = nullptr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
cudaStream_t stream = nullptr,
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
: number_of_vertices_p(graph.number_of_vertices),
number_of_edges_p(graph.number_of_edges),
src_indices_p(graph.src_indices, graph.number_of_edges * sizeof(vertex_t), stream, mr),
Expand Down Expand Up @@ -457,7 +458,7 @@ class GraphCompressedSparseBase {
edge_t number_of_edges,
bool has_data,
cudaStream_t stream,
rmm::mr::device_memory_resource* mr)
rmm::device_async_resource_ref mr)
: number_of_vertices_p(number_of_vertices),
number_of_edges_p(number_of_edges),
offsets_p(sizeof(edge_t) * (number_of_vertices + 1), stream, mr),
Expand Down Expand Up @@ -525,9 +526,9 @@ class GraphCSR : public GraphCompressedSparseBase<vertex_t, edge_t, weight_t> {
*/
GraphCSR(vertex_t number_of_vertices_,
edge_t number_of_edges_,
bool has_data_ = false,
cudaStream_t stream = nullptr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource())
bool has_data_ = false,
cudaStream_t stream = nullptr,
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource())
: GraphCompressedSparseBase<vertex_t, edge_t, weight_t>(
number_of_vertices_, number_of_edges_, has_data_, stream, mr)
{
Expand Down
101 changes: 56 additions & 45 deletions cpp/libcugraph_etl/include/hash/concurrent_unordered_map.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@
*/
#pragma once

#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/hashing/detail/default_hash.cuh>
#include <cudf/hashing/detail/hash_functions.cuh>
#include <cudf/utilities/error.hpp>
#include <raft/util/cuda_rt_essentials.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/polymorphic_allocator.hpp>

#include <cuda/atomic>
#include <thrust/pair.h>

#include <hash/hash_allocator.cuh>
#include <hash/helper_functions.cuh>
#include <hash/managed.cuh>

#include <functional>
#include <iostream>
#include <iterator>
#include <limits>
Expand Down Expand Up @@ -78,8 +78,8 @@ template <typename pair_type,
typename value_type = typename pair_type::second_type>
constexpr bool is_packable()
{
return std::is_integral<key_type>::value and std::is_integral<value_type>::value and
not std::is_void<packed_t<pair_type>>::value and
return std::is_integral_v<key_type> and std::is_integral_v<value_type> and
not std::is_void_v<packed_t<pair_type>> and
std::has_unique_object_representations_v<pair_type>;
}

Expand All @@ -95,8 +95,8 @@ union pair_packer;
template <typename pair_type>
union pair_packer<pair_type, std::enable_if_t<is_packable<pair_type>()>> {
using packed_type = packed_t<pair_type>;
packed_type const packed;
pair_type const pair;
packed_type packed;
pair_type pair;

__device__ pair_packer(pair_type _pair) : pair{_pair} {}

Expand All @@ -120,7 +120,7 @@ template <typename Key,
typename Element,
typename Hasher = cudf::hashing::detail::default_hash<Key>,
typename Equality = equal_to<Key>,
typename Allocator = default_allocator<thrust::pair<Key, Element>>>
typename Allocator = rmm::mr::polymorphic_allocator<thrust::pair<Key, Element>>>
class concurrent_unordered_map {
public:
using size_type = size_t;
Expand All @@ -131,7 +131,7 @@ class concurrent_unordered_map {
using mapped_type = Element;
using value_type = thrust::pair<Key, Element>;
using iterator = cycle_iterator_adapter<value_type*>;
using const_iterator = const cycle_iterator_adapter<value_type*>;
using const_iterator = cycle_iterator_adapter<value_type*> const;

public:
/**
Expand Down Expand Up @@ -163,12 +163,12 @@ class concurrent_unordered_map {
* storage
*/
static auto create(size_type capacity,
rmm::cuda_stream_view stream = rmm::cuda_stream_default,
const mapped_type unused_element = std::numeric_limits<mapped_type>::max(),
const key_type unused_key = std::numeric_limits<key_type>::max(),
const Hasher& hash_function = hasher(),
const Equality& equal = key_equal(),
const allocator_type& allocator = allocator_type())
rmm::cuda_stream_view stream,
mapped_type const unused_element = std::numeric_limits<mapped_type>::max(),
key_type const unused_key = std::numeric_limits<key_type>::max(),
Hasher const& hash_function = hasher(),
Equality const& equal = key_equal(),
allocator_type const& allocator = allocator_type())
{
using Self = concurrent_unordered_map<Key, Element, Hasher, Equality, Allocator>;

Expand Down Expand Up @@ -246,7 +246,7 @@ class concurrent_unordered_map {

__host__ __device__ mapped_type get_unused_element() const { return m_unused_element; }

__host__ __device__ size_type capacity() const { return m_capacity; }
[[nodiscard]] __host__ __device__ size_type capacity() const { return m_capacity; }

private:
/**
Expand All @@ -271,16 +271,21 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
pair_packer<pair_type> const unused{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> const new_pair{insert_pair};
pair_packer<pair_type> const old{
atomicCAS(reinterpret_cast<typename pair_packer<pair_type>::packed_type*>(insert_location),
unused.packed,
new_pair.packed)};
pair_packer<pair_type> expected{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> desired{insert_pair};

if (old.packed == unused.packed) { return insert_result::SUCCESS; }
using packed_type = typename pair_packer<pair_type>::packed_type;

if (m_equal(old.pair.first, insert_pair.first)) { return insert_result::DUPLICATE; }
auto* insert_ptr = reinterpret_cast<packed_type*>(insert_location);
cuda::atomic_ref<packed_type, cuda::thread_scope_device> ref{*insert_ptr};
auto const success =
ref.compare_exchange_strong(expected.packed, desired.packed, cuda::std::memory_order_relaxed);

if (success) {
return insert_result::SUCCESS;
} else if (m_equal(expected.pair.first, insert_pair.first)) {
return insert_result::DUPLICATE;
}
return insert_result::CONTINUE;
}

Expand All @@ -295,16 +300,20 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<not is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
key_type const old_key{atomicCAS(&(insert_location->first), m_unused_key, insert_pair.first)};
auto expected = m_unused_key;
cuda::atomic_ref<key_type, cuda::thread_scope_device> ref{insert_location->first};
auto const key_success =
ref.compare_exchange_strong(expected, insert_pair.first, cuda::std::memory_order_relaxed);

// Hash bucket empty
if (m_unused_key == old_key) {
if (key_success) {
insert_location->second = insert_pair.second;
return insert_result::SUCCESS;
}

// Key already exists
if (m_equal(old_key, insert_pair.first)) { return insert_result::DUPLICATE; }
else if (m_equal(expected, insert_pair.first)) {
return insert_result::DUPLICATE;
}

return insert_result::CONTINUE;
}
Expand All @@ -330,7 +339,7 @@ class concurrent_unordered_map {
*/
__device__ thrust::pair<iterator, bool> insert(value_type const& insert_pair)
{
const size_type key_hash{m_hf(insert_pair.first)};
size_type const key_hash{m_hf(insert_pair.first)};
size_type index{key_hash % m_capacity};

insert_result status{insert_result::CONTINUE};
Expand All @@ -343,7 +352,7 @@ class concurrent_unordered_map {
index = (index + 1) % m_capacity;
}

bool const insert_success = (status == insert_result::SUCCESS) ? true : false;
bool const insert_success = status == insert_result::SUCCESS;

return thrust::make_pair(
iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket), insert_success);
Expand Down Expand Up @@ -424,8 +433,7 @@ class concurrent_unordered_map {
}
}

void assign_async(const concurrent_unordered_map& other,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
void assign_async(concurrent_unordered_map const& other, rmm::cuda_stream_view stream)
{
if (other.m_capacity <= m_capacity) {
m_capacity = other.m_capacity;
Expand All @@ -443,7 +451,7 @@ class concurrent_unordered_map {
stream.value()));
}

void clear_async(rmm::cuda_stream_view stream = rmm::cuda_stream_default)
void clear_async(rmm::cuda_stream_view stream)
{
constexpr int block_size = 128;
init_hashtbl<<<((m_capacity - 1) / block_size) + 1, block_size, 0, stream.value()>>>(
Expand All @@ -458,7 +466,7 @@ class concurrent_unordered_map {
}
}

void prefetch(const int dev_id, rmm::cuda_stream_view stream = rmm::cuda_stream_default)
void prefetch(int const dev_id, rmm::cuda_stream_view stream)
{
cudaPointerAttributes hashtbl_values_ptr_attributes;
cudaError_t status = cudaPointerGetAttributes(&hashtbl_values_ptr_attributes, m_hashtbl_values);
Expand All @@ -478,7 +486,7 @@ class concurrent_unordered_map {
*
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
void destroy(rmm::cuda_stream_view stream = rmm::cuda_stream_default)
void destroy(rmm::cuda_stream_view stream)
{
m_allocator.deallocate(m_hashtbl_values, m_capacity, stream);
delete this;
Expand Down Expand Up @@ -514,12 +522,12 @@ class concurrent_unordered_map {
* @param stream CUDA stream used for device memory operations and kernel launches.
*/
concurrent_unordered_map(size_type capacity,
const mapped_type unused_element,
const key_type unused_key,
const Hasher& hash_function,
const Equality& equal,
const allocator_type& allocator,
rmm::cuda_stream_view stream = rmm::cuda_stream_default)
mapped_type const unused_element,
key_type const unused_key,
Hasher const& hash_function,
Equality const& equal,
allocator_type const& allocator,
rmm::cuda_stream_view stream)
: m_hf(hash_function),
m_equal(equal),
m_allocator(allocator),
Expand All @@ -542,8 +550,11 @@ class concurrent_unordered_map {
}
}

init_hashtbl<<<((m_capacity - 1) / block_size) + 1, block_size, 0, stream.value()>>>(
m_hashtbl_values, m_capacity, m_unused_key, m_unused_element);
RAFT_CUDA_TRY(cudaGetLastError());
if (m_capacity > 0) {
init_hashtbl<<<((m_capacity - 1) / block_size) + 1, block_size, 0, stream.value()>>>(
m_hashtbl_values, m_capacity, m_unused_key, m_unused_element);
}

CUDF_CHECK_CUDA(stream.value());
}
};
Loading

0 comments on commit 3393b06

Please sign in to comment.