diff --git a/cpp/libcugraph_etl/CMakeLists.txt b/cpp/libcugraph_etl/CMakeLists.txt index 1950032a8b9..41d758b7c5b 100644 --- a/cpp/libcugraph_etl/CMakeLists.txt +++ b/cpp/libcugraph_etl/CMakeLists.txt @@ -1,5 +1,5 @@ #============================================================================= -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -119,6 +119,10 @@ target_compile_options(cugraph_etl PRIVATE "$<$:${CUGRAPH_ETL_CXX_FLAGS}>" ) +target_compile_options(cugraph_etl PRIVATE $<$: + --expt-relaxed-constexpr + --expt-extended-lambda + >) ################################################################################ # - ETL include paths -------------------------------------------------------- target_include_directories(cugraph_etl diff --git a/cpp/libcugraph_etl/include/cugraph_etl/functions.hpp b/cpp/libcugraph_etl/include/cugraph_etl/functions.hpp index 19d50f197c7..111d472741e 100644 --- a/cpp/libcugraph_etl/include/cugraph_etl/functions.hpp +++ b/cpp/libcugraph_etl/include/cugraph_etl/functions.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,8 @@ #include #include +#include + namespace cugraph { namespace etl { @@ -47,7 +49,8 @@ namespace etl { */ std:: tuple, std::unique_ptr, std::unique_ptr> - renumber_cudf_tables(cudf::table_view const& src_table, + renumber_cudf_tables(raft::handle_t const& handle, + cudf::table_view const& src_table, cudf::table_view const& dst_table, cudf::type_id dtype); diff --git a/cpp/libcugraph_etl/include/hash/concurrent_unordered_map.cuh b/cpp/libcugraph_etl/include/hash/concurrent_unordered_map.cuh new file mode 100644 index 00000000000..1f99c9bce2f --- /dev/null +++ b/cpp/libcugraph_etl/include/hash/concurrent_unordered_map.cuh @@ -0,0 +1,550 @@ +/* + * Copyright (c) 2017-2022, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* + * FIXME: This file is copied from cudf because CuCollections doesnt support concurrent + * insert/find for 8 byte key-value pair size. The plan is to migrate to + * using the cuco when the feature is supported. At that point this file can be deleted. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include + +namespace { +template +struct packed { + using type = void; +}; +template <> +struct packed { + using type = uint64_t; +}; +template <> +struct packed { + using type = uint32_t; +}; +template +using packed_t = typename packed::type; + +/** + * @brief Indicates if a pair type can be packed. + * + * When the size of the key,value pair being inserted into the hash table is + * equal in size to a type where atomicCAS is natively supported, it is more + * efficient to "pack" the pair and insert it with a single atomicCAS. + * + * Only integral key and value types may be packed because we use + * bitwise equality comparison, which may not be valid for non-integral + * types. + * + * Also, the `pair_type` must not contain any padding bits otherwise + * accessing the packed value would be undefined. + * + * @tparam pair_type The pair type that will be packed + * @return true If the pair type can be packed + * @return false If the pair type cannot be packed + */ +template +constexpr bool is_packable() +{ + return std::is_integral::value and std::is_integral::value and + not std::is_void>::value and + std::has_unique_object_representations_v; +} + +/** + * @brief Allows viewing a pair in a packed representation + * + * Used as an optimization for inserting when a pair can be inserted with a + * single atomicCAS + */ +template +union pair_packer; + +template +union pair_packer()>> { + using packed_type = packed_t; + packed_type const packed; + pair_type const pair; + + __device__ pair_packer(pair_type _pair) : pair{_pair} {} + + __device__ pair_packer(packed_type _packed) : packed{_packed} {} +}; +} // namespace + +/** + * Supports concurrent insert, but not concurrent insert and find. + * + * @note The user is responsible for the following stream semantics: + * - Either the same stream should be used to create the map as is used by the kernels that access + * it, or + * - the stream used to create the map should be synchronized before it is accessed from a different + * stream or from host code. + * + * TODO: + * - add constructor that takes pointer to hash_table to avoid allocations + */ +template , + typename Equality = equal_to, + typename Allocator = default_allocator>> +class concurrent_unordered_map { + public: + using size_type = size_t; + using hasher = Hasher; + using key_equal = Equality; + using allocator_type = Allocator; + using key_type = Key; + using mapped_type = Element; + using value_type = thrust::pair; + using iterator = cycle_iterator_adapter; + using const_iterator = const cycle_iterator_adapter; + + public: + /** + * @brief Factory to construct a new concurrent unordered map. + * + * Returns a `std::unique_ptr` to a new concurrent unordered map object. The + * map is non-owning and trivially copyable and should be passed by value into + * kernels. The `unique_ptr` contains a custom deleter that will free the + * map's contents. + * + * @note The implementation of this unordered_map uses sentinel values to + * indicate an entry in the hash table that is empty, i.e., if a hash bucket + * is empty, the pair residing there will be equal to (unused_key, + * unused_element). As a result, attempting to insert a key equal to + *`unused_key` results in undefined behavior. + * + * @note All allocations, kernels and copies in the constructor take place + * on stream but the constructor does not synchronize the stream. It is the user's + * responsibility to synchronize or use the same stream to access the map. + * + * @param capacity The maximum number of pairs the map may hold + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param unused_element The sentinel value to use for an empty value + * @param unused_key The sentinel value to use for an empty key + * @param hash_function The hash function to use for hashing keys + * @param equal The equality comparison function for comparing if two keys are + * equal + * @param allocator The allocator to use for allocation the hash table's + * storage + */ + static auto create(size_type capacity, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + const mapped_type unused_element = std::numeric_limits::max(), + const key_type unused_key = std::numeric_limits::max(), + const Hasher& hash_function = hasher(), + const Equality& equal = key_equal(), + const allocator_type& allocator = allocator_type()) + { + CUDF_FUNC_RANGE(); + using Self = concurrent_unordered_map; + + // Note: need `(*p).destroy` instead of `p->destroy` here + // due to compiler bug: https://github.com/rapidsai/cudf/pull/5692 + auto deleter = [stream](Self* p) { (*p).destroy(stream); }; + + return std::unique_ptr>{ + new Self(capacity, unused_element, unused_key, hash_function, equal, allocator, stream), + deleter}; + } + + /** + * @brief Returns an iterator to the first element in the map + * + * @note `__device__` code that calls this function should either run in the + * same stream as `create()`, or the accessing stream either be running on the + * same stream as create(), or the accessing stream should be appropriately + * synchronized with the creating stream. + * + * @returns iterator to the first element in the map. + */ + __device__ iterator begin() + { + return iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values); + } + + /** + * @brief Returns a constant iterator to the first element in the map + * + * @note `__device__` code that calls this function should either run in the + * same stream as `create()`, or the accessing stream either be running on the + * same stream as create(), or the accessing stream should be appropriately + * synchronized with the creating stream. + * + * @returns constant iterator to the first element in the map. + */ + __device__ const_iterator begin() const + { + return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values); + } + + /** + * @brief Returns an iterator to the one past the last element in the map + * + * @note `__device__` code that calls this function should either run in the + * same stream as `create()`, or the accessing stream either be running on the + * same stream as create(), or the accessing stream should be appropriately + * synchronized with the creating stream. + * + * @returns iterator to the one past the last element in the map. + */ + __device__ iterator end() + { + return iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values + m_capacity); + } + + /** + * @brief Returns a constant iterator to the one past the last element in the map + * + * @note When called in a device code, user should make sure that it should + * either be running on the same stream as create(), or the accessing stream + * should be appropriately synchronized with the creating stream. + * + * @returns constant iterator to the one past the last element in the map. + */ + __device__ const_iterator end() const + { + return const_iterator( + m_hashtbl_values, m_hashtbl_values + m_capacity, m_hashtbl_values + m_capacity); + } + __host__ __device__ value_type* data() const { return m_hashtbl_values; } + + __host__ __device__ key_type get_unused_key() const { return m_unused_key; } + + __host__ __device__ mapped_type get_unused_element() const { return m_unused_element; } + + __host__ __device__ size_type capacity() const { return m_capacity; } + + private: + /** + * @brief Enumeration of the possible results of attempting to insert into + *a hash bucket + */ + enum class insert_result { + CONTINUE, ///< Insert did not succeed, continue trying to insert + ///< (collision) + SUCCESS, ///< New pair inserted successfully + DUPLICATE ///< Insert did not succeed, key is already present + }; + + /** + * @brief Specialization for value types that can be packed. + * + * When the size of the key,value pair being inserted is equal in size to + *a type where atomicCAS is natively supported, this optimization path + *will insert the pair in a single atomicCAS operation. + */ + template + __device__ std::enable_if_t(), insert_result> attempt_insert( + value_type* const __restrict__ insert_location, value_type const& insert_pair) + { + pair_packer const unused{thrust::make_pair(m_unused_key, m_unused_element)}; + pair_packer const new_pair{insert_pair}; + pair_packer const old{ + atomicCAS(reinterpret_cast::packed_type*>(insert_location), + unused.packed, + new_pair.packed)}; + + if (old.packed == unused.packed) { return insert_result::SUCCESS; } + + if (m_equal(old.pair.first, insert_pair.first)) { return insert_result::DUPLICATE; } + return insert_result::CONTINUE; + } + + /** + * @brief Attempts to insert a key,value pair at the specified hash bucket. + * + * @param[in] insert_location Pointer to hash bucket to attempt insert + * @param[in] insert_pair The pair to insert + * @return Enum indicating result of insert attempt. + */ + template + __device__ std::enable_if_t(), insert_result> attempt_insert( + value_type* const __restrict__ insert_location, value_type const& insert_pair) + { + key_type const old_key{atomicCAS(&(insert_location->first), m_unused_key, insert_pair.first)}; + + // Hash bucket empty + if (m_unused_key == old_key) { + insert_location->second = insert_pair.second; + return insert_result::SUCCESS; + } + + // Key already exists + if (m_equal(old_key, insert_pair.first)) { return insert_result::DUPLICATE; } + + return insert_result::CONTINUE; + } + + public: + /** + * @brief Attempts to insert a key, value pair into the map. + * + * Returns an iterator, boolean pair. + * + * If the new key already present in the map, the iterator points to + * the location of the existing key and the boolean is `false` indicating + * that the insert did not succeed. + * + * If the new key was not present, the iterator points to the location + * where the insert occurred and the boolean is `true` indicating that the + *insert succeeded. + * + * @param insert_pair The key and value pair to insert + * @return Iterator, Boolean pair. Iterator is to the location of the + *newly inserted pair, or the existing pair that prevented the insert. + *Boolean indicates insert success. + */ + __device__ thrust::pair insert(value_type const& insert_pair) + { + const size_type key_hash{m_hf(insert_pair.first)}; + size_type index{key_hash % m_capacity}; + + insert_result status{insert_result::CONTINUE}; + + value_type* current_bucket{nullptr}; + + while (status == insert_result::CONTINUE) { + current_bucket = &m_hashtbl_values[index]; + status = attempt_insert(current_bucket, insert_pair); + index = (index + 1) % m_capacity; + } + + bool const insert_success = (status == insert_result::SUCCESS) ? true : false; + + return thrust::make_pair( + iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket), insert_success); + } + + /** + * @brief Searches the map for the specified key. + * + * @note `find` is not threadsafe with `insert`. I.e., it is not safe to + *do concurrent `insert` and `find` operations. + * + * @param k The key to search for + * @return An iterator to the key if it exists, else map.end() + */ + __device__ const_iterator find(key_type const& k) const + { + size_type const key_hash = m_hf(k); + size_type index = key_hash % m_capacity; + + value_type* current_bucket = &m_hashtbl_values[index]; + + while (true) { + key_type const existing_key = current_bucket->first; + + if (m_unused_key == existing_key) { return this->end(); } + + if (m_equal(k, existing_key)) { + return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket); + } + + index = (index + 1) % m_capacity; + current_bucket = &m_hashtbl_values[index]; + } + } + + /** + * @brief Searches the map for the specified key. + * + * This version of the find function specifies a hashing function and an + * equality comparison. This allows the caller to use different functions + * for insert and find (for example, when you want to insert keys from + * one table and use find to match keys from a different table with the + * keys from the first table). + * + * @note `find` is not threadsafe with `insert`. I.e., it is not safe to + * do concurrent `insert` and `find` operations. + * + * @tparam find_hasher Type of hashing function + * @tparam find_key_equal Type of equality comparison + * + * @param k The key to search for + * @param f_hash The hashing function to use to hash this key + * @param f_equal The equality function to use to compare this key with the + * contents of the hash table + * @return An iterator to the key if it exists, else map.end() + */ + template + __device__ const_iterator find(key_type const& k, + find_hasher f_hash, + find_key_equal f_equal) const + { + size_type const key_hash = f_hash(k); + size_type index = key_hash % m_capacity; + + value_type* current_bucket = &m_hashtbl_values[index]; + + while (true) { + key_type const existing_key = current_bucket->first; + + if (m_unused_key == existing_key) { return this->end(); } + + if (f_equal(k, existing_key)) { + return const_iterator(m_hashtbl_values, m_hashtbl_values + m_capacity, current_bucket); + } + + index = (index + 1) % m_capacity; + current_bucket = &m_hashtbl_values[index]; + } + } + + void assign_async(const concurrent_unordered_map& other, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) + { + if (other.m_capacity <= m_capacity) { + m_capacity = other.m_capacity; + } else { + m_allocator.deallocate(m_hashtbl_values, m_capacity, stream); + m_capacity = other.m_capacity; + m_capacity = other.m_capacity; + + m_hashtbl_values = m_allocator.allocate(m_capacity, stream); + } + CUDA_TRY(cudaMemcpyAsync(m_hashtbl_values, + other.m_hashtbl_values, + m_capacity * sizeof(value_type), + cudaMemcpyDefault, + stream.value())); + } + + void clear_async(rmm::cuda_stream_view stream = rmm::cuda_stream_default) + { + constexpr int block_size = 128; + init_hashtbl<<<((m_capacity - 1) / block_size) + 1, block_size, 0, stream.value()>>>( + m_hashtbl_values, m_capacity, m_unused_key, m_unused_element); + } + + void print() + { + for (size_type i = 0; i < m_capacity; ++i) { + std::cout << i << ": " << m_hashtbl_values[i].first << "," << m_hashtbl_values[i].second + << std::endl; + } + } + + void prefetch(const int dev_id, rmm::cuda_stream_view stream = rmm::cuda_stream_default) + { + cudaPointerAttributes hashtbl_values_ptr_attributes; + cudaError_t status = cudaPointerGetAttributes(&hashtbl_values_ptr_attributes, m_hashtbl_values); + + if (cudaSuccess == status && isPtrManaged(hashtbl_values_ptr_attributes)) { + CUDA_TRY(cudaMemPrefetchAsync( + m_hashtbl_values, m_capacity * sizeof(value_type), dev_id, stream.value())); + } + CUDA_TRY(cudaMemPrefetchAsync(this, sizeof(*this), dev_id, stream.value())); + } + + /** + * @brief Frees the contents of the map and destroys the map object. + * + * This function is invoked as the deleter of the `std::unique_ptr` returned + * from the `create()` factory function. + * + * @param stream CUDA stream used for device memory operations and kernel launches. + */ + void destroy(rmm::cuda_stream_view stream = rmm::cuda_stream_default) + { + m_allocator.deallocate(m_hashtbl_values, m_capacity, stream); + delete this; + } + + concurrent_unordered_map() = delete; + concurrent_unordered_map(concurrent_unordered_map const&) = default; + concurrent_unordered_map(concurrent_unordered_map&&) = default; + concurrent_unordered_map& operator=(concurrent_unordered_map const&) = default; + concurrent_unordered_map& operator=(concurrent_unordered_map&&) = default; + ~concurrent_unordered_map() = default; + + private: + hasher m_hf; + key_equal m_equal; + mapped_type m_unused_element; + key_type m_unused_key; + allocator_type m_allocator; + size_type m_capacity; + value_type* m_hashtbl_values; + + /** + * @brief Private constructor used by `create` factory function. + * + * @param capacity The desired m_capacity of the hash table + * @param unused_element The sentinel value to use for an empty value + * @param unused_key The sentinel value to use for an empty key + * @param hash_function The hash function to use for hashing keys + * @param equal The equality comparison function for comparing if two keys + *are equal + * @param allocator The allocator to use for allocation the hash table's + * storage + * @param stream CUDA stream used for device memory operations and kernel launches. + */ + concurrent_unordered_map(size_type capacity, + const mapped_type unused_element, + const key_type unused_key, + const Hasher& hash_function, + const Equality& equal, + const allocator_type& allocator, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) + : m_hf(hash_function), + m_equal(equal), + m_allocator(allocator), + m_capacity(capacity), + m_unused_element(unused_element), + m_unused_key(unused_key) + { + m_hashtbl_values = m_allocator.allocate(m_capacity, stream); + constexpr int block_size = 128; + { + cudaPointerAttributes hashtbl_values_ptr_attributes; + cudaError_t status = + cudaPointerGetAttributes(&hashtbl_values_ptr_attributes, m_hashtbl_values); + + if (cudaSuccess == status && isPtrManaged(hashtbl_values_ptr_attributes)) { + int dev_id = 0; + CUDA_TRY(cudaGetDevice(&dev_id)); + CUDA_TRY(cudaMemPrefetchAsync( + m_hashtbl_values, m_capacity * sizeof(value_type), dev_id, stream.value())); + } + } + + init_hashtbl<<<((m_capacity - 1) / block_size) + 1, block_size, 0, stream.value()>>>( + m_hashtbl_values, m_capacity, m_unused_key, m_unused_element); + CUDA_TRY(cudaGetLastError()); + } +}; diff --git a/cpp/libcugraph_etl/include/hash/hash_allocator.cuh b/cpp/libcugraph_etl/include/hash/hash_allocator.cuh new file mode 100644 index 00000000000..bbad086dae6 --- /dev/null +++ b/cpp/libcugraph_etl/include/hash/hash_allocator.cuh @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2017-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef HASH_ALLOCATOR_CUH +#define HASH_ALLOCATOR_CUH + +#include + +#include +#include +#include +#include + +template +struct managed_allocator { + typedef T value_type; + rmm::mr::device_memory_resource* mr = new rmm::mr::managed_memory_resource; + + managed_allocator() = default; + + template + constexpr managed_allocator(const managed_allocator&) noexcept + { + } + + T* allocate(std::size_t n, rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + { + return static_cast(mr->allocate(n * sizeof(T), stream)); + } + + void deallocate(T* p, + std::size_t n, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + { + mr->deallocate(p, n * sizeof(T), stream); + } +}; + +template +bool operator==(const managed_allocator&, const managed_allocator&) +{ + return true; +} +template +bool operator!=(const managed_allocator&, const managed_allocator&) +{ + return false; +} + +template +struct default_allocator { + typedef T value_type; + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource(); + + default_allocator() = default; + + template + constexpr default_allocator(const default_allocator&) noexcept + { + } + + T* allocate(std::size_t n, rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + { + return static_cast(mr->allocate(n * sizeof(T), stream)); + } + + void deallocate(T* p, + std::size_t n, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + { + mr->deallocate(p, n * sizeof(T), stream); + } +}; + +template +bool operator==(const default_allocator&, const default_allocator&) +{ + return true; +} +template +bool operator!=(const default_allocator&, const default_allocator&) +{ + return false; +} + +#endif diff --git a/cpp/libcugraph_etl/include/hash/helper_functions.cuh b/cpp/libcugraph_etl/include/hash/helper_functions.cuh new file mode 100644 index 00000000000..db377f938d2 --- /dev/null +++ b/cpp/libcugraph_etl/include/hash/helper_functions.cuh @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2017-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef HELPER_FUNCTIONS_CUH +#define HELPER_FUNCTIONS_CUH + +#include + +#include + +constexpr int64_t DEFAULT_HASH_TABLE_OCCUPANCY = 50; + +/** + * @brief Compute requisite size of hash table. + * + * Computes the number of entries required in a hash table to satisfy + * inserting a specified number of keys to achieve the specified hash table + * occupancy. + * + * @param num_keys_to_insert The number of keys that will be inserted + * @param desired_occupancy The desired occupancy percentage, e.g., 50 implies a + * 50% occupancy + * @return size_t The size of the hash table that will satisfy the desired + * occupancy for the specified number of insertions + */ +inline size_t compute_hash_table_size(cudf::size_type num_keys_to_insert, + uint32_t desired_occupancy = DEFAULT_HASH_TABLE_OCCUPANCY) +{ + assert(desired_occupancy != 0); + assert(desired_occupancy <= 100); + double const grow_factor{100.0 / desired_occupancy}; + + // Calculate size of hash map based on the desired occupancy + size_t hash_table_size{static_cast(std::ceil(num_keys_to_insert * grow_factor))}; + + return hash_table_size; +} + +template +__forceinline__ __device__ pair_type load_pair_vectorized(const pair_type* __restrict__ const ptr) +{ + if (sizeof(uint4) == sizeof(pair_type)) { + union pair_type2vec_type { + uint4 vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0, 0, 0, 0}; + converter.vec_val = *reinterpret_cast(ptr); + return converter.pair_val; + } else if (sizeof(uint2) == sizeof(pair_type)) { + union pair_type2vec_type { + uint2 vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0, 0}; + converter.vec_val = *reinterpret_cast(ptr); + return converter.pair_val; + } else if (sizeof(int) == sizeof(pair_type)) { + union pair_type2vec_type { + int vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0}; + converter.vec_val = *reinterpret_cast(ptr); + return converter.pair_val; + } else if (sizeof(short) == sizeof(pair_type)) { + union pair_type2vec_type { + short vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0}; + converter.vec_val = *reinterpret_cast(ptr); + return converter.pair_val; + } else { + return *ptr; + } +} + +template +__forceinline__ __device__ void store_pair_vectorized(pair_type* __restrict__ const ptr, + const pair_type val) +{ + if (sizeof(uint4) == sizeof(pair_type)) { + union pair_type2vec_type { + uint4 vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0, 0, 0, 0}; + converter.pair_val = val; + *reinterpret_cast(ptr) = converter.vec_val; + } else if (sizeof(uint2) == sizeof(pair_type)) { + union pair_type2vec_type { + uint2 vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0, 0}; + converter.pair_val = val; + *reinterpret_cast(ptr) = converter.vec_val; + } else if (sizeof(int) == sizeof(pair_type)) { + union pair_type2vec_type { + int vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0}; + converter.pair_val = val; + *reinterpret_cast(ptr) = converter.vec_val; + } else if (sizeof(short) == sizeof(pair_type)) { + union pair_type2vec_type { + short vec_val; + pair_type pair_val; + }; + pair_type2vec_type converter = {0}; + converter.pair_val = val; + *reinterpret_cast(ptr) = converter.vec_val; + } else { + *ptr = val; + } +} + +template +__global__ void init_hashtbl(value_type* __restrict__ const hashtbl_values, + const size_type n, + const key_type key_val, + const elem_type elem_val) +{ + const size_type idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx < n) { + store_pair_vectorized(hashtbl_values + idx, thrust::make_pair(key_val, elem_val)); + } +} + +template +struct equal_to { + using result_type = bool; + using first_argument_type = T; + using second_argument_type = T; + __forceinline__ __host__ __device__ constexpr bool operator()( + const first_argument_type& lhs, const second_argument_type& rhs) const + { + return lhs == rhs; + } +}; + +template +class cycle_iterator_adapter { + public: + using value_type = typename std::iterator_traits::value_type; + using difference_type = typename std::iterator_traits::difference_type; + using pointer = typename std::iterator_traits::pointer; + using reference = typename std::iterator_traits::reference; + using iterator_type = Iterator; + + cycle_iterator_adapter() = delete; + + __host__ __device__ explicit cycle_iterator_adapter(const iterator_type& begin, + const iterator_type& end, + const iterator_type& current) + : m_begin(begin), m_end(end), m_current(current) + { + } + + __host__ __device__ cycle_iterator_adapter& operator++() + { + if (m_end == (m_current + 1)) + m_current = m_begin; + else + ++m_current; + return *this; + } + + __host__ __device__ const cycle_iterator_adapter& operator++() const + { + if (m_end == (m_current + 1)) + m_current = m_begin; + else + ++m_current; + return *this; + } + + __host__ __device__ cycle_iterator_adapter& operator++(int) + { + cycle_iterator_adapter old(m_begin, m_end, m_current); + if (m_end == (m_current + 1)) + m_current = m_begin; + else + ++m_current; + return old; + } + + __host__ __device__ const cycle_iterator_adapter& operator++(int) const + { + cycle_iterator_adapter old(m_begin, m_end, m_current); + if (m_end == (m_current + 1)) + m_current = m_begin; + else + ++m_current; + return old; + } + + __host__ __device__ bool equal(const cycle_iterator_adapter& other) const + { + return m_current == other.m_current && m_begin == other.m_begin && m_end == other.m_end; + } + + __host__ __device__ reference& operator*() { return *m_current; } + + __host__ __device__ const reference& operator*() const { return *m_current; } + + __host__ __device__ const pointer operator->() const { return m_current.operator->(); } + + __host__ __device__ pointer operator->() { return m_current; } + + private: + iterator_type m_current; + iterator_type m_begin; + iterator_type m_end; +}; + +template +__host__ __device__ bool operator==(const cycle_iterator_adapter& lhs, + const cycle_iterator_adapter& rhs) +{ + return lhs.equal(rhs); +} + +template +__host__ __device__ bool operator!=(const cycle_iterator_adapter& lhs, + const cycle_iterator_adapter& rhs) +{ + return !lhs.equal(rhs); +} + +#endif // HELPER_FUNCTIONS_CUH diff --git a/cpp/libcugraph_etl/include/hash/managed.cuh b/cpp/libcugraph_etl/include/hash/managed.cuh new file mode 100644 index 00000000000..04c94e4a09c --- /dev/null +++ b/cpp/libcugraph_etl/include/hash/managed.cuh @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2017-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MANAGED_CUH +#define MANAGED_CUH + +#include + +struct managed { + static void* operator new(size_t n) + { + void* ptr = 0; + cudaError_t result = cudaMallocManaged(&ptr, n); + if (cudaSuccess != result || 0 == ptr) throw std::bad_alloc(); + return ptr; + } + + static void operator delete(void* ptr) noexcept + { + auto const free_result = cudaFree(ptr); + assert(free_result == cudaSuccess); + } +}; + +inline bool isPtrManaged(cudaPointerAttributes attr) +{ +#if CUDART_VERSION >= 10000 + return (attr.type == cudaMemoryTypeManaged); +#else + return attr.isManaged; +#endif +} + +#endif // MANAGED_CUH diff --git a/cpp/libcugraph_etl/src/renumbering.cu b/cpp/libcugraph_etl/src/renumbering.cu index 510d42df041..d622c46cede 100644 --- a/cpp/libcugraph_etl/src/renumbering.cu +++ b/cpp/libcugraph_etl/src/renumbering.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,1056 @@ #include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include +#include +#include + namespace cugraph { namespace etl { +using size_type = cudf::size_type; // size_type is int32 +using accum_type = uint32_t; + +constexpr uint32_t hash_inc_constant = 9999; + +typedef struct str_hash_value { + __host__ __device__ str_hash_value(){}; + + __host__ __device__ str_hash_value(size_type row, accum_type count, int32_t col) + { + row_ = row; + count_ = count; + col_ = col; + }; + + size_type row_{std::numeric_limits::max()}; + accum_type count_{std::numeric_limits::max()}; + int32_t col_{std::numeric_limits::max()}; // 0 or 1 based on src or dst vertex + +} str_hash_value; + +// key is uint32 hash value +using cudf_map_type = concurrent_unordered_map; + +__device__ uint32_t rotl32(uint32_t x, int8_t r) { return (x << r) | (x >> (32 - r)); } + +__device__ inline uint32_t fmix32(uint32_t h) +{ + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + return h; +} + +// get 4 bytes of char +__device__ uint32_t MurmurHash3_32_head(uint32_t val, uint32_t hash_begin = 32435354) +{ + uint32_t h1 = hash_begin; // seeds + constexpr uint32_t c1 = 0xcc9e2d51; + constexpr uint32_t c2 = 0x1b873593; + + uint32_t k1 = val; + k1 *= c1; + k1 = rotl32(k1, 15); + k1 *= c2; + h1 ^= k1; + h1 = rotl32(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + + return h1; +} + +__device__ uint32_t MurmurHash3_32_tail(uint8_t* data, + int len_byte_tail, + int len_total, + uint32_t hash_begin = 32435354) +{ + uint32_t h1 = hash_begin; + uint32_t k1 = 0; + constexpr uint32_t c1 = 0xcc9e2d51; + constexpr uint32_t c2 = 0x1b873593; + switch (len_total & 3) { + case 3: k1 ^= data[2] << 16; + case 2: k1 ^= data[1] << 8; + case 1: + k1 ^= data[0]; + k1 *= c1; + k1 = rotl32(k1, 15); + k1 *= c2; + h1 ^= k1; + }; + //---------- + // finalization + h1 ^= len_total; + h1 = fmix32(h1); + return h1; +} + +__device__ uint32_t calc_murmur32_hash(int8_t* loc_1, + int8_t* loc_2, + int32_t len_str1, + int32_t len_str2) +{ + uint32_t hashed_str_val = 32435354; // seed + + for (int i = 0; i < len_str1 / 4; i++) { + int8_t* loc = &(loc_1[i * 4]); + uint32_t data = loc[0] | (loc[1] << 8) | (loc[2] << 16) | (loc[3] << 24); + hashed_str_val = MurmurHash3_32_head(data, hashed_str_val); + } + uint32_t trailing_chars = (len_str1 & 3); + uint32_t data = 0; + switch (len_str1 & 3) { + case 3: data |= loc_1[len_str1 - 3] << 16; + case 2: data |= loc_1[len_str1 - 2] << 8; + case 1: data |= loc_1[len_str1 - 1]; + } + + // this assumes no empty data in col2 + // either have read misalignment or dtype read align problem + for (int i = 0; i < len_str2; i++) { + data |= loc_2[i] << (trailing_chars * 8); + trailing_chars++; + + if (trailing_chars == 4) { + hashed_str_val = MurmurHash3_32_head(data, hashed_str_val); + trailing_chars = 0; + data = 0; + } + } + if (trailing_chars != 0) { + hashed_str_val = MurmurHash3_32_tail( + reinterpret_cast(&data), trailing_chars, len_str1 + len_str2, hashed_str_val); + } + return hashed_str_val; +} + +__device__ __inline__ bool compare_string(size_type src_idx, + size_type match_idx, + const int8_t* col_1, + const int32_t* offset_1, + const int8_t* col_2, + const int32_t* offset_2) +{ + // match length + int32_t start_a_0 = offset_1[src_idx]; + int32_t length_a_0 = offset_1[src_idx + 1] - start_a_0; + int32_t start_a_1 = offset_2[src_idx]; + int32_t length_a_1 = offset_2[src_idx + 1] - start_a_1; + + int32_t start_b_0 = offset_1[match_idx]; + int32_t length_b_0 = offset_1[match_idx + 1] - start_b_0; + int32_t start_b_1 = offset_2[match_idx]; + int32_t length_b_1 = offset_2[match_idx + 1] - start_b_1; + if ((length_a_0 == length_b_0) && (length_a_1 == length_b_1)) { + // match first part + while (length_a_0 > 0) { + if (col_1[start_a_0++] != col_1[start_b_0++]) { return false; } + length_a_0--; + } + + // match second part + while (length_a_1 > 0) { + if (col_2[start_a_1++] != col_2[start_b_1++]) { return false; } + length_a_1--; + } + + } else { + return false; + } + + // match chars + return true; +} + +__device__ __inline__ bool compare_string_2(size_type src_idx, + size_type match_idx, + int8_t* base_col_1, + int32_t* base_offset_1, + int8_t* base_col_2, + int32_t* base_offset_2, + int8_t* col_1, + int32_t* offset_1, + int8_t* col_2, + int32_t* offset_2) +{ + // match length + int32_t start_a_0 = base_offset_1[src_idx]; + int32_t length_a_0 = base_offset_1[src_idx + 1] - start_a_0; + int32_t start_a_1 = base_offset_2[src_idx]; + int32_t length_a_1 = base_offset_2[src_idx + 1] - start_a_1; + + int32_t start_b_0 = offset_1[match_idx]; + int32_t length_b_0 = offset_1[match_idx + 1] - start_b_0; + int32_t start_b_1 = offset_2[match_idx]; + int32_t length_b_1 = offset_2[match_idx + 1] - start_b_1; + if ((length_a_0 == length_b_0) && (length_a_1 == length_b_1)) { + // match first part + while (length_a_0 > 0) { + if (base_col_1[start_a_0++] != col_1[start_b_0++]) { return false; } + length_a_0--; + } + + // match second part + while (length_a_1 > 0) { + if (base_col_2[start_a_1++] != col_2[start_b_1++]) { return false; } + length_a_1--; + } + + } else { + // printf("not equal length\n"); + return false; + } + + // match chars + // printf("%d matched\n", threadIdx.x); + return true; +} + +__device__ __inline__ size_type validate_ht_row_insert(volatile size_type* ptr) +{ + size_type row = ptr[0]; + int32_t sleep = 133; + while (row == std::numeric_limits::max()) { +#if (__CUDA_ARCH__ >= 700) + __nanosleep(sleep); +#endif + sleep = sleep * 2; + row = ptr[0]; + } + return row; +} + +__device__ __inline__ int32_t validate_ht_col_insert(volatile int32_t* ptr_col) +{ + volatile int32_t col = ptr_col[0]; + int32_t sleep = 133; + + while (col == std::numeric_limits::max()) { +#if (__CUDA_ARCH__ >= 700) + __nanosleep(sleep); +#endif + sleep = sleep * 2; + col = ptr_col[0]; + } + return col; +} + +__global__ void concat_and_create_histogram(int8_t* col_1, + int32_t* offset_1, + int8_t* col_2, + int32_t* offset_2, + size_type num_rows, + cudf_map_type hash_map, + accum_type* sysmem_insert_counter) +{ + extern __shared__ int8_t smem_[]; + int32_t* smem_col_1_offsets = reinterpret_cast(smem_); + int32_t* smem_col_2_offsets = + reinterpret_cast(smem_ + ((blockDim.x + 1) * sizeof(int32_t))); + accum_type* insert_counter = + reinterpret_cast(smem_ + ((blockDim.x + 1) * 2 * sizeof(int32_t))); + + int warp_accum_idx = threadIdx.x / warpSize; + + if ((threadIdx.x % warpSize) == 0) insert_counter[warp_accum_idx] = 0; + __syncwarp(); + + size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; // size_type is int32_t + + if (start_idx < num_rows) { + smem_col_1_offsets[threadIdx.x] = offset_1[start_idx]; + smem_col_2_offsets[threadIdx.x] = offset_2[start_idx]; + } + + if (threadIdx.x == 0) { + if ((start_idx + blockDim.x) <= num_rows) { + smem_col_1_offsets[blockDim.x] = offset_1[start_idx + blockDim.x]; + smem_col_2_offsets[blockDim.x] = offset_2[start_idx + blockDim.x]; + } else { + int32_t last_offset_idx = num_rows - start_idx; + smem_col_1_offsets[last_offset_idx] = offset_1[num_rows]; + smem_col_2_offsets[last_offset_idx] = offset_2[num_rows]; + } + } + __syncthreads(); + + if (start_idx < num_rows) { + int32_t len_str1 = smem_col_1_offsets[threadIdx.x + 1] - smem_col_1_offsets[threadIdx.x]; + int32_t len_str2 = smem_col_2_offsets[threadIdx.x + 1] - smem_col_2_offsets[threadIdx.x]; + + int8_t* loc_1 = &(col_1[smem_col_1_offsets[threadIdx.x]]); + int8_t* loc_2 = &(col_2[smem_col_2_offsets[threadIdx.x]]); + uint32_t hashed_str_val = calc_murmur32_hash(loc_1, loc_2, len_str1, len_str2); + + // concurrent_unordered_map + // key : hashed_val, val: {idx, count} + auto insert_pair = + hash_map.insert(thrust::make_pair(hashed_str_val, str_hash_value{start_idx, 0, 0})); + + if (!insert_pair.second) { + size_type row__ = validate_ht_row_insert(&(insert_pair.first->second.row_)); + + while (!compare_string(row__, start_idx, col_1, offset_1, col_2, offset_2)) { + // else loop over +1 count of hash value and insert again + hashed_str_val += hash_inc_constant; + insert_pair = + hash_map.insert(thrust::make_pair(hashed_str_val, str_hash_value{start_idx, 0, 0})); + if (insert_pair.second) { + atomicAdd(&(insert_counter[warp_accum_idx]), 1); + break; + } + row__ = validate_ht_row_insert(&(insert_pair.first->second.row_)); + } + atomicAdd((accum_type*)&(insert_pair.first->second.count_), 1); + } else { + atomicAdd((accum_type*)&(insert_pair.first->second.count_), 1); + // // smem atomic counter before global aggregation + atomicAdd(&(insert_counter[warp_accum_idx]), 1); + } + } + __syncwarp(); + if ((threadIdx.x % warpSize) == 0) { + atomicAdd(sysmem_insert_counter, insert_counter[warp_accum_idx]); + } +} + +__global__ void concat_and_create_histogram_2(int8_t* col_1, + int32_t* offset_1, + int8_t* col_2, + int32_t* offset_2, + int8_t* match_col_1, + int32_t* match_offset_1, + int8_t* match_col_2, + int32_t* match_offset_2, + size_type num_rows, + cudf_map_type hash_map, + accum_type* sysmem_insert_counter) +{ + extern __shared__ int8_t smem_[]; + int32_t* smem_col_1_offsets = reinterpret_cast(smem_); + int32_t* smem_col_2_offsets = + reinterpret_cast(smem_ + ((blockDim.x + 1) * sizeof(int32_t))); + accum_type* insert_counter = + reinterpret_cast(smem_ + ((blockDim.x + 1) * 2 * sizeof(int32_t))); + + int warp_accum_idx = threadIdx.x / warpSize; + + if ((threadIdx.x % warpSize) == 0) insert_counter[warp_accum_idx] = 0; + __syncwarp(); + + size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; // size_type is int32_t + + if (start_idx < num_rows) { + smem_col_1_offsets[threadIdx.x] = offset_1[start_idx]; + smem_col_2_offsets[threadIdx.x] = offset_2[start_idx]; + } + + if (threadIdx.x == 0) { + if ((start_idx + blockDim.x) <= num_rows) { + smem_col_1_offsets[blockDim.x] = offset_1[start_idx + blockDim.x]; + smem_col_2_offsets[blockDim.x] = offset_2[start_idx + blockDim.x]; + } else { + int32_t last_offset_idx = num_rows - start_idx; + smem_col_1_offsets[last_offset_idx] = offset_1[num_rows]; + smem_col_2_offsets[last_offset_idx] = offset_2[num_rows]; + } + } + __syncthreads(); + + if (start_idx < num_rows) { + int32_t len_str1 = smem_col_1_offsets[threadIdx.x + 1] - smem_col_1_offsets[threadIdx.x]; + int32_t len_str2 = smem_col_2_offsets[threadIdx.x + 1] - smem_col_2_offsets[threadIdx.x]; + + int8_t* loc_1 = &(col_1[smem_col_1_offsets[threadIdx.x]]); + int8_t* loc_2 = &(col_2[smem_col_2_offsets[threadIdx.x]]); + uint32_t hashed_str_val = calc_murmur32_hash(loc_1, loc_2, len_str1, len_str2); + + // concurrent_unordered_map + // key : hashed_val, val: {idx, count} + + auto insert_pair = + hash_map.insert(thrust::make_pair(hashed_str_val, str_hash_value{start_idx, 0, 1})); + + if (!insert_pair.second) { + size_type row__ = validate_ht_row_insert(&(insert_pair.first->second.row_)); + int32_t col__ = validate_ht_col_insert(&(insert_pair.first->second.col_)); + + while (1) { + if (col__ == 0) { + if (compare_string_2(row__, + start_idx, + match_col_1, + match_offset_1, + match_col_2, + match_offset_2, + col_1, + offset_1, + col_2, + offset_2)) + break; + } else if (col__ == 1) { + if (compare_string(row__, start_idx, col_1, offset_1, col_2, offset_2)) break; + } + // else loop over +1 count of hash value and insert again + hashed_str_val += hash_inc_constant; + // printf("new insert\n"); + insert_pair = + hash_map.insert(thrust::make_pair(hashed_str_val, str_hash_value{start_idx, 0, 1})); + if (insert_pair.second) { + atomicAdd(&(insert_counter[warp_accum_idx]), 1); + break; + } + row__ = validate_ht_row_insert(&(insert_pair.first->second.row_)); + col__ = validate_ht_col_insert(&(insert_pair.first->second.col_)); + } + // atomicAdd((unsigned int *)&(insert_pair.first->second.count_), 1); + } else { + // atomicAdd((unsigned int *)&(insert_pair.first->second.count_), 1); + // smem atomic counter before global aggregation + atomicAdd(&(insert_counter[warp_accum_idx]), 1); + } + } + __syncwarp(); + if ((threadIdx.x % warpSize) == 0) { + atomicAdd(sysmem_insert_counter, insert_counter[warp_accum_idx]); + } +} + +template +__global__ void set_src_vertex_idx(int8_t* col_1, + int32_t* offset_1, + int8_t* col_2, + int32_t* offset_2, + size_type num_rows, + cudf_map_type lookup_table, + T* out_vertex_mapping) +{ + extern __shared__ int8_t smem_[]; + int32_t* smem_col_1_offsets = reinterpret_cast(smem_); + int32_t* smem_col_2_offsets = + reinterpret_cast(smem_ + ((blockDim.x + 1) * sizeof(int32_t))); + + size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; // size_type is int32_t + + if (start_idx < num_rows) { + smem_col_1_offsets[threadIdx.x] = offset_1[start_idx]; + smem_col_2_offsets[threadIdx.x] = offset_2[start_idx]; + } + + if (threadIdx.x == 0) { + if ((start_idx + blockDim.x) <= num_rows) { + smem_col_1_offsets[blockDim.x] = offset_1[start_idx + blockDim.x]; + smem_col_2_offsets[blockDim.x] = offset_2[start_idx + blockDim.x]; + } else { + int32_t last_offset_idx = num_rows - start_idx; + smem_col_1_offsets[last_offset_idx] = offset_1[num_rows]; + smem_col_2_offsets[last_offset_idx] = offset_2[num_rows]; + } + } + __syncthreads(); + + if (start_idx < num_rows) { + int32_t len_str1 = smem_col_1_offsets[threadIdx.x + 1] - smem_col_1_offsets[threadIdx.x]; + int32_t len_str2 = smem_col_2_offsets[threadIdx.x + 1] - smem_col_2_offsets[threadIdx.x]; + + int8_t* loc_1 = &(col_1[smem_col_1_offsets[threadIdx.x]]); + int8_t* loc_2 = &(col_2[smem_col_2_offsets[threadIdx.x]]); + uint32_t hashed_str_val = calc_murmur32_hash(loc_1, loc_2, len_str1, len_str2); + + // concurrent_unordered_map + // key : hashed_val, val: {idx, count} + + auto it = lookup_table.find(hashed_str_val); + // match string, if not match hash+1 find again + while (it != lookup_table.end()) { + if (compare_string(it->second.row_, start_idx, col_1, offset_1, col_2, offset_2)) { + out_vertex_mapping[start_idx] = (T)it->second.count_; + break; + } + hashed_str_val += hash_inc_constant; + it = lookup_table.find(hashed_str_val); + } + } +} + +template +__global__ void set_dst_vertex_idx(int8_t* col_1, + int32_t* offset_1, + int8_t* col_2, + int32_t* offset_2, + int8_t* match_col_1, + int32_t* match_offset_1, + int8_t* match_col_2, + int32_t* match_offset_2, + size_type num_rows, + cudf_map_type lookup_table, + T* out_vertex_mapping) +{ + extern __shared__ int8_t smem_[]; + int32_t* smem_col_1_offsets = reinterpret_cast(smem_); + int32_t* smem_col_2_offsets = + reinterpret_cast(smem_ + ((blockDim.x + 1) * sizeof(int32_t))); + + size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; // size_type is int32_t + + if (start_idx < num_rows) { + smem_col_1_offsets[threadIdx.x] = offset_1[start_idx]; + smem_col_2_offsets[threadIdx.x] = offset_2[start_idx]; + } + + if (threadIdx.x == 0) { + if ((start_idx + blockDim.x) <= num_rows) { + smem_col_1_offsets[blockDim.x] = offset_1[start_idx + blockDim.x]; + smem_col_2_offsets[blockDim.x] = offset_2[start_idx + blockDim.x]; + } else { + int32_t last_offset_idx = num_rows - start_idx; + smem_col_1_offsets[last_offset_idx] = offset_1[num_rows]; + smem_col_2_offsets[last_offset_idx] = offset_2[num_rows]; + } + } + __syncthreads(); + + if (start_idx < num_rows) { + int32_t len_str1 = smem_col_1_offsets[threadIdx.x + 1] - smem_col_1_offsets[threadIdx.x]; + int32_t len_str2 = smem_col_2_offsets[threadIdx.x + 1] - smem_col_2_offsets[threadIdx.x]; + + int8_t* loc_1 = &(col_1[smem_col_1_offsets[threadIdx.x]]); + int8_t* loc_2 = &(col_2[smem_col_2_offsets[threadIdx.x]]); + uint32_t hashed_str_val = calc_murmur32_hash(loc_1, loc_2, len_str1, len_str2); + + // concurrent_unordered_map + // key : hashed_val, val: {idx, count} + + auto it = lookup_table.find(hashed_str_val); + // match string, if not match hash+1 find again + while (it != lookup_table.end()) { + if (it->second.col_ == 0) { + if (compare_string_2(it->second.row_, + start_idx, + match_col_1, + match_offset_1, + match_col_2, + match_offset_2, + col_1, + offset_1, + col_2, + offset_2)) { + out_vertex_mapping[start_idx] = (T)it->second.count_; + break; + } + } else if (it->second.col_ == 1) { + if (compare_string(it->second.row_, start_idx, col_1, offset_1, col_2, offset_2)) { + out_vertex_mapping[start_idx] = (T)it->second.count_; + break; + } + } + hashed_str_val += hash_inc_constant; + it = lookup_table.find(hashed_str_val); + } + } +} + +__global__ void create_mapping_histogram(uint32_t* hash_value, + str_hash_value* payload, + cudf_map_type hash_map, + accum_type count) +{ + accum_type idx = threadIdx.x + blockIdx.x * blockDim.x; + + if (idx < count) { auto it = hash_map.insert(thrust::make_pair(hash_value[idx], payload[idx])); } +} + +__global__ void assign_histogram_idx(cudf_map_type cuda_map_obj, + size_t slot_count, + str_hash_value* key, + uint32_t* value, + size_type* counter) +{ + if (threadIdx.x == 0 && blockIdx.x == 0) { counter[0] = 0; } + __threadfence(); + size_t tid = threadIdx.x + blockIdx.x * blockDim.x; + + auto it = cuda_map_obj.data(); + for (size_t idx = tid; idx < slot_count; idx += (gridDim.x * blockDim.x)) { + auto hash_itr = it + idx; + + if ((hash_itr->second.row_ != cuda_map_obj.get_unused_element().row_) && + (hash_itr->second.count_ != cuda_map_obj.get_unused_element().count_) && + (hash_itr->first != cuda_map_obj.get_unused_key())) { + size_type count = atomicAdd((size_type*)counter, 1); + value[count] = hash_itr->first; + key[count].row_ = hash_itr->second.row_; + key[count].count_ = hash_itr->second.count_; + key[count].col_ = hash_itr->second.col_; + } + } +} + +__global__ void set_vertex_indices(str_hash_value* ht_value_payload, accum_type count) +{ + accum_type tid = threadIdx.x + blockIdx.x * blockDim.x; + // change count_ to renumber_idx + for (accum_type idx = tid; idx < count; idx += (gridDim.x * blockDim.x)) { + ht_value_payload[idx].count_ = idx; + } +} + +__global__ void set_output_col_offsets(str_hash_value* row_col_pair, + int32_t* out_col1_offset, + int32_t* out_col2_offset, + int dst_pair_match, + int32_t* in_col1_offset, + int32_t* in_col2_offset, + accum_type total_elements) +{ + int32_t start_idx = threadIdx.x + blockIdx.x * blockDim.x; + + for (int32_t idx = start_idx; idx < total_elements; idx += (gridDim.x * blockDim.x)) { + if (dst_pair_match == row_col_pair[idx].col_ && idx == row_col_pair[idx].count_) { + // get row + int32_t row = row_col_pair[idx].row_; + out_col1_offset[idx] = in_col1_offset[row + 1] - in_col1_offset[row]; + out_col2_offset[idx] = in_col2_offset[row + 1] - in_col2_offset[row]; + } else { + out_col1_offset[idx] = 0; + out_col2_offset[idx] = 0; + } + } +} + +__global__ void offset_buffer_size_comp(int32_t* out_col1_length, + int32_t* out_col2_length, + int32_t* out_col1_offsets, + int32_t* out_col2_offsets, + accum_type total_elem, + accum_type* out_sum) +{ + int32_t idx = threadIdx.x + blockIdx.x * blockDim.x; + + if (idx == 0) { + accum_type sum = accum_type(out_col1_offsets[total_elem - 1] + out_col1_length[total_elem - 1]); + out_col1_offsets[total_elem] = sum; + out_sum[0] = sum; + } else if (idx == 1) { + accum_type sum = accum_type(out_col2_offsets[total_elem - 1] + out_col2_length[total_elem - 1]); + out_col2_offsets[total_elem] = sum; + out_sum[1] = sum; + } +} + +__global__ void select_unrenumber_string(str_hash_value* idx_to_col_row, + int32_t total_elements, + int8_t* src_col1, + int8_t* src_col2, + int32_t* src_col1_offsets, + int32_t* src_col2_offsets, + int8_t* dst_col1, + int8_t* dst_col2, + int32_t* dst_col1_offsets, + int32_t* dst_col2_offsets, + int8_t* col1_out, + int8_t* col2_out, + int32_t* col1_out_offsets, + int32_t* col2_out_offsets) +{ + size_type start_idx = threadIdx.x + blockIdx.x * blockDim.x; + + for (size_type idx = start_idx; idx < total_elements; idx += (gridDim.x * blockDim.x)) { + int32_t row = idx_to_col_row[idx].row_; + int32_t col = idx_to_col_row[idx].col_; + + if (col == 0) { + int32_t col1_src_str_start = src_col1_offsets[row]; + int32_t col1_src_str_length = src_col1_offsets[row + 1] - col1_src_str_start; + int32_t col1_out_offset = col1_out_offsets[idx]; + + for (int32_t i = 0; i < col1_src_str_length; i++) { + col1_out[col1_out_offset + i] = src_col1[col1_src_str_start + i]; + } + + int32_t col2_src_str_start = src_col2_offsets[row]; + int32_t col2_src_str_length = src_col2_offsets[row + 1] - col2_src_str_start; + int32_t col2_out_offset = col2_out_offsets[idx]; + + for (int32_t i = 0; i < col2_src_str_length; i++) { + col2_out[col2_out_offset + i] = src_col2[col2_src_str_start + i]; + } + + } else if (col == 1) { + int32_t col1_dst_str_start = dst_col1_offsets[row]; + int32_t col1_dst_str_length = dst_col1_offsets[row + 1] - col1_dst_str_start; + int32_t col1_out_offset = col1_out_offsets[idx]; + + for (int32_t i = 0; i < col1_dst_str_length; i++) { + col1_out[col1_out_offset + i] = src_col1[col1_dst_str_start + i]; + } + + int32_t col2_dst_str_start = dst_col2_offsets[row]; + int32_t col2_dst_str_length = dst_col2_offsets[row + 1] - col2_dst_str_start; + int32_t col2_out_offset = col2_out_offsets[idx]; + + for (int32_t i = 0; i < col2_dst_str_length; i++) { + col2_out[col2_out_offset + i] = src_col2[col2_dst_str_start + i]; + } + } + } +} + +struct struct_sort_descending { + __host__ __device__ bool operator()(str_hash_value& a, str_hash_value& b) + { + return (a.count_ > b.count_); + } +}; + +struct renumber_functor { + template ::value>* = nullptr> + std::tuple, + std::unique_ptr, + std::unique_ptr> + operator()(raft::handle_t const& handle, + cudf::table_view const& src_view, + cudf::table_view const& dst_view) + { + return std::make_tuple( + std::unique_ptr(new cudf::column( + cudf::data_type(cudf::type_id::INT32), 0, rmm::device_buffer{0, cudaStream_t{0}})), + std::unique_ptr(new cudf::column( + cudf::data_type(cudf::type_id::INT32), 0, rmm::device_buffer{0, cudaStream_t{0}})), + std::make_unique(std::vector>{})); + } + + template ::value>* = nullptr> + std::tuple, + std::unique_ptr, + std::unique_ptr> + operator()(raft::handle_t const& handle, + cudf::table_view const& src_view, + cudf::table_view const& dst_view) + { + assert(src_view.num_columns() == 2); + assert(dst_view.num_columns() == 2); + + size_type num_rows = src_view.num_rows(); + using char_type = int8_t; + using str_offset_type = int32_t; // kernels init'd int32 only + + std::vector src_vertex_chars_ptrs; + std::vector src_vertex_offset_ptrs; + std::vector dst_vertex_chars_ptrs; + std::vector dst_vertex_offset_ptrs; + + for (int i = 0; i < src_view.num_columns(); i++) { + auto str_col_view = cudf::strings_column_view(src_view.column(i)); + src_vertex_chars_ptrs.push_back( + const_cast(str_col_view.chars().data())); + src_vertex_offset_ptrs.push_back( + const_cast(str_col_view.offsets().data())); + } + + for (int i = 0; i < dst_view.num_columns(); i++) { + auto str_col_view = cudf::strings_column_view(dst_view.column(i)); + dst_vertex_chars_ptrs.push_back( + const_cast(str_col_view.chars().data())); + dst_vertex_offset_ptrs.push_back( + const_cast(str_col_view.offsets().data())); + } + + cudaStream_t exec_strm = handle.get_stream(); + + auto alloc = std::make_shared(); + raft::mr::host::buffer buff(alloc, exec_strm); + buff.resize(sizeof(accum_type) * 32, exec_strm); + accum_type* hist_insert_counter = buff.data(); + *hist_insert_counter = 0; + + float load_factor = 0.7; + + rmm::device_uvector atomic_agg(32, exec_strm); // just padded to 32 + CHECK_CUDA(cudaMemsetAsync(atomic_agg.data(), 0, sizeof(accum_type), exec_strm)); + + auto cuda_map_obj = cudf_map_type::create( + std::max(static_cast(static_cast(num_rows) / load_factor), + (size_t)num_rows + 1), + exec_strm, + str_hash_value{}) + .release(); + dim3 block(512, 1, 1); + dim3 grid((num_rows - 1) / block.x + 1, 1, 1); + + int32_t num_multiprocessors = 80; // get from cuda properties + + // assumes warp_size is 32 + size_t warp_size = 32; + size_t smem_size = + (block.x + 1) * 2 * sizeof(int32_t) + (block.x / warp_size) * sizeof(accum_type); + + concat_and_create_histogram<<>>(src_vertex_chars_ptrs[0], + src_vertex_offset_ptrs[0], + src_vertex_chars_ptrs[1], + src_vertex_offset_ptrs[1], + num_rows, + *cuda_map_obj, + atomic_agg.data()); + + concat_and_create_histogram_2<<>>(dst_vertex_chars_ptrs[0], + dst_vertex_offset_ptrs[0], + dst_vertex_chars_ptrs[1], + dst_vertex_offset_ptrs[1], + src_vertex_chars_ptrs[0], + src_vertex_offset_ptrs[0], + src_vertex_chars_ptrs[1], + src_vertex_offset_ptrs[1], + num_rows, + *cuda_map_obj, + atomic_agg.data()); + + CHECK_CUDA(cudaMemcpy( + hist_insert_counter, atomic_agg.data(), sizeof(accum_type), cudaMemcpyDeviceToHost)); + CHECK_CUDA(cudaStreamSynchronize(exec_strm)); + + accum_type key_value_count = hist_insert_counter[0]; + // {row, count} pairs, sortDesecending on count w/ custom comparator + rmm::device_uvector sort_key(key_value_count, exec_strm); + rmm::device_uvector sort_value(key_value_count, exec_strm); // string hash values + rmm::device_uvector atomic_idx(32, exec_strm); // just padded to 32 + + int32_t num_blocks = 0; + cudaOccupancyMaxActiveBlocksPerMultiprocessor(&num_blocks, assign_histogram_idx, block.x, 0); + grid.x = num_multiprocessors * num_blocks; + assign_histogram_idx<<>>(*cuda_map_obj, + cuda_map_obj->capacity(), + sort_key.data(), + sort_value.data(), + atomic_idx.data()); + + // can release original histogram memory here + + // FIXME: cub doesnt have custom comparator sort + // new cub release will have sort with custom comparator + thrust::sort_by_key(rmm::exec_policy(exec_strm), + sort_key.begin(), + sort_key.end(), + sort_value.begin(), + struct_sort_descending()); + + cudaOccupancyMaxActiveBlocksPerMultiprocessor(&num_blocks, set_vertex_indices, block.x, 0); + grid.x = num_multiprocessors * num_blocks; + set_vertex_indices<<>>(sort_key.data(), hist_insert_counter[0]); + + // can extract unrenumber table here + // get separate src and dst idxs. + rmm::device_uvector out_col1_length(key_value_count, exec_strm); + rmm::device_uvector out_col2_length(key_value_count, exec_strm); + + cudaOccupancyMaxActiveBlocksPerMultiprocessor(&num_blocks, set_output_col_offsets, block.x, 0); + grid.x = num_multiprocessors * num_blocks; + // k-v count pair, out_offset_ptr, inputcol offset ptrs (to measure length) + set_output_col_offsets<<>>(sort_key.data(), + out_col1_length.data(), + out_col2_length.data(), + 0, + src_vertex_offset_ptrs[0], + src_vertex_offset_ptrs[1], + key_value_count); + + set_output_col_offsets<<>>(sort_key.data(), + out_col1_length.data(), + out_col2_length.data(), + 1, + dst_vertex_offset_ptrs[0], + dst_vertex_offset_ptrs[1], + key_value_count); + + // prefix sum to extract column offsets + rmm::device_uvector out_col1_offsets(key_value_count + 1, exec_strm); + rmm::device_uvector out_col2_offsets(key_value_count + 1, exec_strm); + + size_t tmp_storage_bytes = 0; + cub::DeviceScan::ExclusiveSum(nullptr, + tmp_storage_bytes, + out_col1_length.data(), + out_col1_offsets.data(), + key_value_count, + exec_strm); + rmm::device_buffer tmp_storage(tmp_storage_bytes, exec_strm); + cub::DeviceScan::ExclusiveSum(tmp_storage.data(), + tmp_storage_bytes, + out_col1_length.data(), + out_col1_offsets.data(), + key_value_count, + exec_strm); + cub::DeviceScan::ExclusiveSum(tmp_storage.data(), + tmp_storage_bytes, + out_col2_length.data(), + out_col2_offsets.data(), + key_value_count, + exec_strm); + + // reduce to get size of column allocations + // just reusing exscan output instead of using cub::Reduce::Sum() again + // also sets last value of offset buffer that exscan didnt set + offset_buffer_size_comp<<<1, 32, 0, exec_strm>>>(out_col1_length.data(), + out_col2_length.data(), + out_col1_offsets.data(), + out_col2_offsets.data(), + key_value_count, + hist_insert_counter); + + CHECK_CUDA(cudaStreamSynchronize(exec_strm)); + // allocate output columns buffers + rmm::device_buffer unrenumber_col1_chars(hist_insert_counter[0], exec_strm); + rmm::device_buffer unrenumber_col2_chars(hist_insert_counter[1], exec_strm); + + // select string kernel + cudaOccupancyMaxActiveBlocksPerMultiprocessor( + &num_blocks, select_unrenumber_string, block.x, 0); + grid.x = num_multiprocessors * num_blocks; + select_unrenumber_string<<>>( + sort_key.data(), + key_value_count, + src_vertex_chars_ptrs[0], + src_vertex_chars_ptrs[1], + src_vertex_offset_ptrs[0], + src_vertex_offset_ptrs[1], + dst_vertex_chars_ptrs[0], + dst_vertex_chars_ptrs[1], + dst_vertex_offset_ptrs[0], + dst_vertex_offset_ptrs[1], + reinterpret_cast(unrenumber_col1_chars.data()), + reinterpret_cast(unrenumber_col2_chars.data()), + out_col1_offsets.data(), + out_col2_offsets.data()); + CHECK_CUDA(cudaStreamSynchronize(exec_strm)); // do we need sync here?? + + std::vector> renumber_table_vectors; + + auto offset_col_1 = + std::unique_ptr(new cudf::column(cudf::data_type(cudf::type_id::INT32), + key_value_count + 1, + std::move(out_col1_offsets.release()))); + + auto str_col_1 = + std::unique_ptr(new cudf::column(cudf::data_type(cudf::type_id::INT8), + hist_insert_counter[0], + std::move(unrenumber_col1_chars))); + + renumber_table_vectors.push_back( + cudf::make_strings_column(size_type(key_value_count), + std::move(offset_col_1), + std::move(str_col_1), + 0, + rmm::device_buffer(size_type(0), exec_strm))); + + auto offset_col_2 = + std::unique_ptr(new cudf::column(cudf::data_type(cudf::type_id::INT32), + key_value_count + 1, + std::move(out_col2_offsets.release()))); + + auto str_col_2 = + std::unique_ptr(new cudf::column(cudf::data_type(cudf::type_id::INT8), + hist_insert_counter[1], + std::move(unrenumber_col2_chars))); + + renumber_table_vectors.push_back( + cudf::make_strings_column(size_type(key_value_count), + std::move(offset_col_2), + std::move(str_col_2), + 0, + rmm::device_buffer(size_type(0), exec_strm))); + + // make table from string columns - did at the end + + // net HT just insert K-V pairs + auto cuda_map_obj_mapping = + cudf_map_type::create(static_cast(static_cast(key_value_count) / load_factor), + exec_strm, + str_hash_value{}) + .release(); + + grid.x = (key_value_count - 1) / block.x + 1; + create_mapping_histogram<<>>( + sort_value.data(), sort_key.data(), *cuda_map_obj_mapping, key_value_count); + CHECK_CUDA(cudaStreamSynchronize(exec_strm)); + + rmm::device_buffer src_buffer(sizeof(Dtype) * num_rows, exec_strm); + rmm::device_buffer dst_buffer(sizeof(Dtype) * num_rows, exec_strm); + + // iterate input, check hash-map, match string, set vertex idx in buffer + grid.x = (num_rows - 1) / block.x + 1; + smem_size = (block.x + 1) * 2 * sizeof(str_offset_type); + set_src_vertex_idx<<>>( + src_vertex_chars_ptrs[0], + src_vertex_offset_ptrs[0], + src_vertex_chars_ptrs[1], + src_vertex_offset_ptrs[1], + num_rows, + *cuda_map_obj_mapping, + reinterpret_cast(src_buffer.data())); + CHECK_CUDA(cudaStreamSynchronize(exec_strm)); + set_dst_vertex_idx<<>>( + dst_vertex_chars_ptrs[0], + dst_vertex_offset_ptrs[0], + dst_vertex_chars_ptrs[1], + dst_vertex_offset_ptrs[1], + src_vertex_chars_ptrs[0], + src_vertex_offset_ptrs[0], + src_vertex_chars_ptrs[1], + src_vertex_offset_ptrs[1], + num_rows, + *cuda_map_obj_mapping, + reinterpret_cast(dst_buffer.data())); + + std::vector> cols_vector; + cols_vector.push_back(std::unique_ptr( + new cudf::column(cudf::data_type(cudf::type_id::INT32), num_rows, std::move(src_buffer)))); + + cols_vector.push_back(std::unique_ptr( + new cudf::column(cudf::data_type(cudf::type_id::INT32), num_rows, std::move(dst_buffer)))); + + CHECK_CUDA(cudaDeviceSynchronize()); + + return std::make_tuple( + std::move(cols_vector[0]), + std::move(cols_vector[1]), + std::move(std::make_unique(std::move(renumber_table_vectors)))); + } +}; std:: tuple, std::unique_ptr, std::unique_ptr> - renumber_cudf_tables(cudf::table_view const& src_table, + renumber_cudf_tables(raft::handle_t const& handle, + cudf::table_view const& src_table, cudf::table_view const& dst_table, cudf::type_id dtype) { - CUGRAPH_FAIL("not implemented yet"); + CUGRAPH_EXPECTS(src_table.num_columns() == 2, + "Src col: only two string column vertex are supported"); + CUGRAPH_EXPECTS(dst_table.num_columns() == 2, + "Dst col: only two string column vertex are supported"); + + size_type num_rows_ = src_table.num_rows(); + + auto x = + cudf::type_dispatcher(cudf::data_type{dtype}, renumber_functor{}, handle, src_table, dst_table); + return x; } } // namespace etl