From bca042ab7d202ca555014aa168b53c888ba61ec2 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 7 Nov 2023 15:05:54 -0800 Subject: [PATCH 1/7] delete dead function declaration --- cpp/include/cugraph/algorithms.hpp | 45 ------------------------------ 1 file changed, 45 deletions(-) diff --git a/cpp/include/cugraph/algorithms.hpp b/cpp/include/cugraph/algorithms.hpp index 78846bc5766..8501eedce5c 100644 --- a/cpp/include/cugraph/algorithms.hpp +++ b/cpp/include/cugraph/algorithms.hpp @@ -464,51 +464,6 @@ k_truss_subgraph(raft::handle_t const& handle, size_t number_of_vertices, int k); -// FIXME: Internally distances is of int (signed 32-bit) data type, but current -// template uses data from VT, ET, WT from the legacy::GraphCSR View even if weights -// are not considered -/** - * @Synopsis Performs a breadth first search traversal of a graph starting from a vertex. - * - * @throws cugraph::logic_error with a custom message when an error occurs. - * - * @tparam VT Type of vertex identifiers. Supported value : int (signed, - * 32-bit) - * @tparam ET Type of edge identifiers. Supported value : int (signed, - * 32-bit) - * @tparam WT Type of edge weights. Supported values : int (signed, 32-bit) - * - * @param[in] handle Library handle (RAFT). If a communicator is set in the handle, - the multi GPU version will be selected. - * @param[in] graph cuGraph graph descriptor, should contain the connectivity - * information as a CSR - * - * @param[out] distances If set to a valid pointer, this is populated by distance of - * every vertex in the graph from the starting vertex - * - * @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal - * predecessor of every vertex - * - * @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal - * shortest_path counter of every vertex - * - * @param[in] start_vertex The starting vertex for breadth first search traversal - * - * @param[in] directed Treat the input graph as directed - * - * @param[in] mg_batch If set to true use SG BFS path when comms are initialized. - * - */ -template -void bfs(raft::handle_t const& handle, - legacy::GraphCSRView const& graph, - VT* distances, - VT* predecessors, - double* sp_counters, - const VT start_vertex, - bool directed = true, - bool mg_batch = false); - /** * @brief Compute Hungarian algorithm on a weighted bipartite graph * From 1169c6e47f894b938faeed94fd5723efd4e3152b Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 7 Nov 2023 15:06:19 -0800 Subject: [PATCH 2/7] delete no longer relevent comments --- cpp/include/cugraph/utilities/device_comm.hpp | 6 ------ cpp/include/cugraph/utilities/shuffle_comm.cuh | 5 ----- 2 files changed, 11 deletions(-) diff --git a/cpp/include/cugraph/utilities/device_comm.hpp b/cpp/include/cugraph/utilities/device_comm.hpp index 7087724921a..3613dcde1f8 100644 --- a/cpp/include/cugraph/utilities/device_comm.hpp +++ b/cpp/include/cugraph/utilities/device_comm.hpp @@ -806,9 +806,6 @@ device_sendrecv(raft::comms::comms_t const& comm, size_t constexpr tuple_size = thrust::tuple_size::value_type>::value; - // FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination - // rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside - // ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8 detail::device_sendrecv_tuple_iterator_element_impl::value_type>::value; - // FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination - // rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside - // ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8 detail::device_multicast_sendrecv_tuple_iterator_element_impl d_rx_value_counts(comm_size, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released. std::vector tx_counts(comm_size, size_t{1}); std::vector tx_offsets(comm_size); std::iota(tx_offsets.begin(), tx_offsets.end(), size_t{0}); @@ -835,7 +834,6 @@ auto shuffle_values(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, @@ -889,7 +887,6 @@ auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, @@ -946,7 +943,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, allocate_dataframe_buffer::value_type>( rx_keys.size(), stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_key_first, @@ -959,7 +955,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm, rx_src_ranks, stream_view); - // FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released // (if num_tx_dst_ranks == num_rx_src_ranks == comm_size). device_multicast_sendrecv(comm, tx_value_first, From 3c669471e8fd2785c13b47340234c6a842b2cf09 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 7 Nov 2023 16:41:06 -0800 Subject: [PATCH 3/7] replace allgatherv with a fixed size with allgather --- .../cugraph/utilities/host_scalar_comm.hpp | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/cpp/include/cugraph/utilities/host_scalar_comm.hpp b/cpp/include/cugraph/utilities/host_scalar_comm.hpp index 6d3d772c2a2..967c75dbc5f 100644 --- a/cpp/include/cugraph/utilities/host_scalar_comm.hpp +++ b/cpp/include/cugraph/utilities/host_scalar_comm.hpp @@ -254,19 +254,14 @@ template std::enable_if_t::value, std::vector> host_scalar_allgather( raft::comms::comms_t const& comm, T input, cudaStream_t stream) { - std::vector rx_counts(comm.get_size(), size_t{1}); - std::vector displacements(rx_counts.size(), size_t{0}); - std::iota(displacements.begin(), displacements.end(), size_t{0}); - rmm::device_uvector d_outputs(rx_counts.size(), stream); + rmm::device_uvector d_outputs(comm.get_size(), stream); raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream); - // FIXME: better use allgather - comm.allgatherv(d_outputs.data() + comm.get_rank(), + comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), - rx_counts.data(), - displacements.data(), + size_t{1}, stream); - std::vector h_outputs(rx_counts.size()); - raft::update_host(h_outputs.data(), d_outputs.data(), rx_counts.size(), stream); + std::vector h_outputs(d_outputs.size()); + raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream); auto status = comm.sync_stream(stream); CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); return h_outputs; @@ -277,11 +272,6 @@ std::enable_if_t::value, std::vector::value; - std::vector rx_counts(comm.get_size(), tuple_size); - std::vector displacements(rx_counts.size(), size_t{0}); - for (size_t i = 0; i < displacements.size(); ++i) { - displacements[i] = i * tuple_size; - } std::vector h_tuple_scalar_elements(tuple_size); rmm::device_uvector d_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size, stream); @@ -292,11 +282,9 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st h_tuple_scalar_elements.data(), tuple_size, stream); - // FIXME: better use allgather - comm.allgatherv(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, + comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, d_allgathered_tuple_scalar_elements.data(), - rx_counts.data(), - displacements.data(), + tuple_size, stream); std::vector h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size); raft::update_host(h_allgathered_tuple_scalar_elements.data(), From f12cdde11bba55d82076767adf3898e17244761a Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 7 Nov 2023 16:50:37 -0800 Subject: [PATCH 4/7] copyright year --- cpp/include/cugraph/utilities/device_comm.hpp | 2 +- cpp/include/cugraph/utilities/host_scalar_comm.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/cugraph/utilities/device_comm.hpp b/cpp/include/cugraph/utilities/device_comm.hpp index 3613dcde1f8..990074e781b 100644 --- a/cpp/include/cugraph/utilities/device_comm.hpp +++ b/cpp/include/cugraph/utilities/device_comm.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/include/cugraph/utilities/host_scalar_comm.hpp b/cpp/include/cugraph/utilities/host_scalar_comm.hpp index 967c75dbc5f..dadba1f91db 100644 --- a/cpp/include/cugraph/utilities/host_scalar_comm.hpp +++ b/cpp/include/cugraph/utilities/host_scalar_comm.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 4cbaac04ee4fafca9d52f06aeaab5ef3ae03c1f3 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Thu, 9 Nov 2023 11:12:37 -0800 Subject: [PATCH 5/7] checked that katz works with negative beta values (at least networkx's katz implementation works, so this might be acceptable input) --- cpp/src/centrality/katz_centrality_impl.cuh | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/centrality/katz_centrality_impl.cuh b/cpp/src/centrality/katz_centrality_impl.cuh index 202d00a5771..ac31043d862 100644 --- a/cpp/src/centrality/katz_centrality_impl.cuh +++ b/cpp/src/centrality/katz_centrality_impl.cuh @@ -74,8 +74,6 @@ void katz_centrality( CUGRAPH_EXPECTS(epsilon >= 0.0, "Invalid input argument: epsilon should be non-negative."); if (do_expensive_check) { - // FIXME: should I check for betas? - if (has_initial_guess) { auto num_negative_values = count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) { From 1161d4a1178b4e322fcdbb35a0f545284668a058 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Thu, 9 Nov 2023 17:04:18 -0800 Subject: [PATCH 6/7] replace atomicCAS with atomic_ref.compare_exchange_strong --- .../weakly_connected_components_impl.cuh | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index 615a50ded54..be74012e3ee 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -236,18 +236,16 @@ struct v_op_t { auto tag = thrust::get<1>(tagged_v); auto v_offset = vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(thrust::get<0>(tagged_v)); - // FIXME: better switch to atomic_ref after - // https://github.com/nvidia/libcudacxx/milestone/2 - auto old = - atomicCAS(level_components + v_offset, invalid_component_id::value, tag); - if (old != invalid_component_id::value && old != tag) { // conflict + cuda::atomic_ref v_component(*(level_components + v_offset)); + auto old = invalid_component_id::value; + bool success = v_component.compare_exchange_strong(old, tag, cuda::std::memory_order_relaxed); + if (!success && (old != tag)) { // conflict return thrust::make_tuple(thrust::optional{bucket_idx_conflict}, thrust::optional{std::byte{0}} /* dummy */); } else { - auto update = (old == invalid_component_id::value); return thrust::make_tuple( - update ? thrust::optional{bucket_idx_next} : thrust::nullopt, - update ? thrust::optional{std::byte{0}} /* dummy */ : thrust::nullopt); + success ? thrust::optional{bucket_idx_next} : thrust::nullopt, + success ? thrust::optional{std::byte{0}} /* dummy */ : thrust::nullopt); } } From f60b8a2a0e9d67807ae2851e2cb9393d5409c6d5 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Mon, 13 Nov 2023 22:18:36 -0800 Subject: [PATCH 7/7] add host_scalar_gather --- .../cugraph/utilities/host_scalar_comm.hpp | 76 +++++++++++++++++-- .../weakly_connected_components_impl.cuh | 26 +------ 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/cpp/include/cugraph/utilities/host_scalar_comm.hpp b/cpp/include/cugraph/utilities/host_scalar_comm.hpp index dadba1f91db..4e6ec35b9d5 100644 --- a/cpp/include/cugraph/utilities/host_scalar_comm.hpp +++ b/cpp/include/cugraph/utilities/host_scalar_comm.hpp @@ -256,10 +256,7 @@ std::enable_if_t::value, std::vector> host_scalar_allga { rmm::device_uvector d_outputs(comm.get_size(), stream); raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream); - comm.allgather(d_outputs.data() + comm.get_rank(), - d_outputs.data(), - size_t{1}, - stream); + comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), size_t{1}, stream); std::vector h_outputs(d_outputs.size()); raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream); auto status = comm.sync_stream(stream); @@ -283,9 +280,9 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st tuple_size, stream); comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, - d_allgathered_tuple_scalar_elements.data(), - tuple_size, - stream); + d_allgathered_tuple_scalar_elements.data(), + tuple_size, + stream); std::vector h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size); raft::update_host(h_allgathered_tuple_scalar_elements.data(), d_allgathered_tuple_scalar_elements.data(), @@ -306,6 +303,71 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st return ret; } +template +std::enable_if_t::value, T> host_scalar_scatter( + raft::comms::comms_t const& comm, + std::vector const& inputs, // relevant only in root + int root, + cudaStream_t stream) +{ + CUGRAPH_EXPECTS( + ((comm.get_rank() == root) && (inputs.size() == static_cast(comm.get_size()))) || + ((comm.get_rank() != root) && (inputs.size() == 0)), + "inputs.size() should match with comm.get_size() in root and should be 0 otherwise."); + rmm::device_uvector d_outputs(comm.get_size(), stream); + if (comm.get_rank() == root) { + raft::update_device(d_outputs.data(), inputs.data(), inputs.size(), stream); + } + comm.bcast(d_outputs.data(), d_outputs.size(), root, stream); + T h_output{}; + raft::update_host(&h_output, d_outputs.data() + comm.get_rank(), 1, stream); + auto status = comm.sync_stream(stream); + CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); + return h_output; +} + +template +std::enable_if_t::value, T> host_scalar_scatter( + raft::comms::comms_t const& comm, + std::vector const& inputs, // relevant only in root + int root, + cudaStream_t stream) +{ + CUGRAPH_EXPECTS( + ((comm.get_rank() == root) && (inputs.size() == static_cast(comm.get_size()))) || + ((comm.get_rank() != root) && (inputs.size() == 0)), + "inputs.size() should match with comm.get_size() in root and should be 0 otherwise."); + size_t constexpr tuple_size = thrust::tuple_size::value; + rmm::device_uvector d_scatter_tuple_scalar_elements(comm.get_size() * tuple_size, + stream); + if (comm.get_rank() == root) { + for (int i = 0; i < comm.get_size(); ++i) { + std::vector h_tuple_scalar_elements(tuple_size); + detail::update_vector_of_tuple_scalar_elements_from_tuple_impl() + .update(h_tuple_scalar_elements, inputs[i]); + raft::update_device(d_scatter_tuple_scalar_elements.data() + i * tuple_size, + h_tuple_scalar_elements.data(), + tuple_size, + stream); + } + } + comm.bcast( + d_scatter_tuple_scalar_elements.data(), d_scatter_tuple_scalar_elements.size(), root, stream); + std::vector h_tuple_scalar_elements(tuple_size); + raft::update_host(h_tuple_scalar_elements.data(), + d_scatter_tuple_scalar_elements.data() + comm.get_rank() * tuple_size, + tuple_size, + stream); + auto status = comm.sync_stream(stream); + CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure."); + + T ret{}; + detail::update_tuple_from_vector_of_tuple_scalar_elements_impl().update( + ret, h_tuple_scalar_elements); + + return ret; +} + // Return value is valid only in root (return value may better be std::optional in C++17 or later) template std::enable_if_t::value, std::vector> host_scalar_gather( diff --git a/cpp/src/components/weakly_connected_components_impl.cuh b/cpp/src/components/weakly_connected_components_impl.cuh index be74012e3ee..b7b6e139cfa 100644 --- a/cpp/src/components/weakly_connected_components_impl.cuh +++ b/cpp/src/components/weakly_connected_components_impl.cuh @@ -455,33 +455,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle, std::numeric_limits::max()); } - // FIXME: we need to add host_scalar_scatter -#if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream()); - raft::update_device(d_counts.data(), - init_max_new_root_counts.data(), - init_max_new_root_counts.size(), - handle.get_stream()); - device_bcast( - comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream()); - raft::update_host( - &init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream()); -#else init_max_new_roots = - host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream()); -#endif + host_scalar_scatter(comm, init_max_new_root_counts, int{0}, handle.get_stream()); } else { - // FIXME: we need to add host_scalar_scatter -#if 1 - rmm::device_uvector d_counts(comm_size, handle.get_stream()); - device_bcast( - comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream()); - raft::update_host( - &init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream()); -#else init_max_new_roots = - host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream()); -#endif + host_scalar_scatter(comm, std::vector{}, int{0}, handle.get_stream()); } handle.sync_stream();