diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3f421da5e19..91511736225 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -439,6 +439,7 @@ add_library(cugraph SHARED src/experimental/pagerank.cu src/experimental/katz_centrality.cu src/tree/mst.cu + src/utilities/host_barrier.cpp ) target_link_directories(cugraph diff --git a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh index ca20b9a1285..26a4eed4213 100644 --- a/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh +++ b/cpp/include/patterns/copy_to_adj_matrix_row_col.cuh @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -60,6 +61,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + std::vector rx_counts(col_comm_size, size_t{0}); std::vector displacements(col_comm_size, size_t{0}); for (int i = 0; i < col_comm_size; ++i) { @@ -72,6 +84,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, rx_counts, displacements, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols() @@ -106,6 +129,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto rx_counts = host_scalar_allgather(col_comm, static_cast(thrust::distance(vertex_first, vertex_last)), @@ -171,6 +205,17 @@ void copy_to_matrix_major(raft::handle_t const& handle, matrix_major_value_output_first + matrix_partition.get_major_value_start_offset()); } } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols() @@ -202,6 +247,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + std::vector rx_counts(row_comm_size, size_t{0}); std::vector displacements(row_comm_size, size_t{0}); for (int i = 0; i < row_comm_size; ++i) { @@ -214,6 +270,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, rx_counts, displacements, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed ? graph_view.get_number_of_local_adj_matrix_partition_rows() @@ -248,6 +315,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto rx_counts = host_scalar_allgather(row_comm, static_cast(thrust::distance(vertex_first, vertex_last)), @@ -310,6 +388,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle, matrix_minor_value_output_first); } } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } else { assert(graph_view.get_number_of_local_vertices() == graph_view.get_number_of_local_adj_matrix_partition_rows()); diff --git a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh index 6d828dab513..6aded0eccf0 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -496,6 +497,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, } if (GraphViewType::is_multi_gpu && update_major) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); @@ -503,6 +505,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + device_reduce(col_comm, major_buffer_first, vertex_value_output_first, @@ -510,6 +523,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, raft::comms::op_t::SUM, i, handle.get_stream()); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } } @@ -523,6 +547,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, auto const col_comm_rank = col_comm.get_rank(); auto const col_comm_size = col_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + for (int i = 0; i < row_comm_size; ++i) { auto offset = (graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size + i) - graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size)); @@ -535,6 +570,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle, i, handle.get_stream()); } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } } diff --git a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh index f6eac67e4e7..9a1d9fea24c 100644 --- a/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh +++ b/cpp/include/patterns/copy_v_transform_reduce_key_aggregated_out_nbr.cuh @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -211,10 +212,22 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( auto kv_map_ptr = std::make_unique>( size_t{0}, invalid_vertex_id::value, invalid_vertex_id::value); if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto map_counts = host_scalar_allgather(row_comm, static_cast(thrust::distance(map_key_first, map_key_last)), @@ -292,6 +305,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( // 2. aggregate each vertex out-going edges based on keys and transform-reduce. + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector major_vertices(0, handle.get_stream()); auto e_op_result_buffer = allocate_dataframe_buffer(0, handle.get_stream()); for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { @@ -436,6 +464,9 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( // FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op // can be mapped to ncclRedOp_t). + // FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to + // remote memory allocation), this barrier is unnecessary otherwise. + col_comm.barrier(); auto rx_sizes = host_scalar_gather(col_comm, tmp_major_vertices.size(), i, handle.get_stream()); std::vector rx_displs{}; @@ -475,6 +506,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr( } } + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), vertex_value_output_first, vertex_value_output_first + graph_view.get_number_of_local_vertices(), diff --git a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh index 3d87f19969e..4f3925f7d4c 100644 --- a/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh +++ b/cpp/include/patterns/update_frontier_v_push_if_out_nbr.cuh @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -403,6 +404,21 @@ void update_frontier_v_push_if_out_nbr( // 1. fill the buffer + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector keys(size_t{0}, handle.get_stream()); auto payload_buffer = allocate_dataframe_buffer(size_t{0}, handle.get_stream()); rmm::device_scalar buffer_idx(size_t{0}, handle.get_stream()); @@ -585,6 +601,21 @@ void update_frontier_v_push_if_out_nbr( } } + if (GraphViewType::is_multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + // 2. reduce the buffer auto num_buffer_elements = @@ -596,13 +627,21 @@ void update_frontier_v_push_if_out_nbr( if (GraphViewType::is_multi_gpu) { // FIXME: this step is unnecessary if row_comm_size== 1 auto& comm = handle.get_comms(); - auto const comm_rank = comm.get_rank(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); - auto const row_comm_rank = row_comm.get_rank(); auto const row_comm_size = row_comm.get_size(); auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); auto const col_comm_rank = col_comm.get_rank(); - auto const col_comm_size = col_comm.get_size(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif std::vector h_vertex_lasts(row_comm_size); for (size_t i = 0; i < h_vertex_lasts.size(); ++i) { @@ -649,6 +688,17 @@ void update_frontier_v_push_if_out_nbr( get_dataframe_buffer_begin(payload_buffer), keys.size(), reduce_op); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } // 3. update vertex properties @@ -753,7 +803,7 @@ void update_frontier_v_push_if_out_nbr( } } } -} +} // namespace experimental } // namespace experimental } // namespace cugraph diff --git a/cpp/include/utilities/host_barrier.hpp b/cpp/include/utilities/host_barrier.hpp new file mode 100644 index 00000000000..11803a7bde4 --- /dev/null +++ b/cpp/include/utilities/host_barrier.hpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace cugraph { +namespace experimental { + +// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and +// MPI barrier for MPI) +void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view); + +} // namespace experimental +} // namespace cugraph diff --git a/cpp/include/utilities/shuffle_comm.cuh b/cpp/include/utilities/shuffle_comm.cuh index b318009d9bf..b42b9ad06bb 100644 --- a/cpp/include/utilities/shuffle_comm.cuh +++ b/cpp/include/utilities/shuffle_comm.cuh @@ -73,6 +73,10 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const &comm, rx_offsets, rx_src_ranks, stream); + // FIXME: temporary unverified work-around for a NCCL (2.9.6) bug that causes a hang on DGX1 (due + // to remote memory allocation), this synchronization is unnecessary otherwise but seems like + // suppress the hange issue. Need to be revisited once NCCL 2.10 is released. + CUDA_TRY(cudaDeviceSynchronize()); raft::update_host(tx_counts.data(), d_tx_value_counts.data(), comm_size, stream); raft::update_host(rx_counts.data(), d_rx_value_counts.data(), comm_size, stream); @@ -201,8 +205,6 @@ auto shuffle_values(raft::comms::comms_t const &comm, rmm::device_uvector d_tx_value_counts(comm_size, stream); raft::update_device(d_tx_value_counts.data(), tx_value_counts.data(), comm_size, stream); - CUDA_TRY(cudaStreamSynchronize(stream)); // tx_value_counts should be up-to-date - std::vector tx_counts{}; std::vector tx_offsets{}; std::vector tx_dst_ranks{}; diff --git a/cpp/src/experimental/coarsen_graph.cu b/cpp/src/experimental/coarsen_graph.cu index 1eccbd23584..6397f92e336 100644 --- a/cpp/src/experimental/coarsen_graph.cu +++ b/cpp/src/experimental/coarsen_graph.cu @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -269,6 +270,16 @@ coarsen_graph( for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) { // 1-1. locally construct coarsened edge list + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif rmm::device_uvector major_labels( store_transposed ? graph_view.get_number_of_local_adj_matrix_partition_cols(i) : graph_view.get_number_of_local_adj_matrix_partition_rows(i), @@ -285,6 +296,16 @@ coarsen_graph( major_labels.size(), static_cast(i), handle.get_stream()); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif rmm::device_uvector edgelist_major_vertices(0, handle.get_stream()); rmm::device_uvector edgelist_minor_vertices(0, handle.get_stream()); diff --git a/cpp/src/experimental/generate_rmat_edgelist.cu b/cpp/src/experimental/generate_rmat_edgelist.cu index d75a4654a15..f00443a0596 100644 --- a/cpp/src/experimental/generate_rmat_edgelist.cu +++ b/cpp/src/experimental/generate_rmat_edgelist.cu @@ -137,8 +137,9 @@ generate_rmat_edgelists(raft::handle_t const& handle, bool scramble_vertex_ids) { CUGRAPH_EXPECTS(min_scale > 0, "minimum graph scale is 1."); - CUGRAPH_EXPECTS(size_t{1} << max_scale <= std::numeric_limits::max(), - "Invalid input argument: scale too large for vertex_t."); + CUGRAPH_EXPECTS( + size_t{1} << max_scale <= static_cast(std::numeric_limits::max()), + "Invalid input argument: scale too large for vertex_t."); std::vector, rmm::device_uvector>> output{}; output.reserve(n_edgelists); diff --git a/cpp/src/experimental/renumber_edgelist.cu b/cpp/src/experimental/renumber_edgelist.cu index dbf0250b88a..01022e8fa6d 100644 --- a/cpp/src/experimental/renumber_edgelist.cu +++ b/cpp/src/experimental/renumber_edgelist.cu @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -59,6 +60,22 @@ rmm::device_uvector compute_renumber_map( // 1. acquire (unique major label, count) pairs + if (multi_gpu) { + auto& comm = handle.get_comms(); + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + ; + comm.barrier(); // currently, this is ncclAllReduce +#endif + } + rmm::device_uvector major_labels(0, handle.get_stream()); rmm::device_uvector major_counts(0, handle.get_stream()); for (size_t i = 0; i < edgelist_major_vertices.size(); ++i) { @@ -71,6 +88,7 @@ rmm::device_uvector compute_renumber_map( edgelist_major_vertices[i], edgelist_major_vertices[i] + edgelist_edge_counts[i], sorted_major_labels.begin()); + // FIXME: better refactor this sort-count_if-reduce_by_key routine for reuse thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), sorted_major_labels.begin(), sorted_major_labels.end()); @@ -98,6 +116,9 @@ rmm::device_uvector compute_renumber_map( rmm::device_uvector rx_major_labels(0, handle.get_stream()); rmm::device_uvector rx_major_counts(0, handle.get_stream()); + // FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to + // remote memory allocation), this barrier is unnecessary otherwise. + col_comm.barrier(); auto rx_sizes = host_scalar_gather( col_comm, tmp_major_labels.size(), static_cast(i), handle.get_stream()); std::vector rx_displs{}; @@ -118,32 +139,39 @@ rmm::device_uvector compute_renumber_map( static_cast(i), handle.get_stream()); if (static_cast(i) == col_comm_rank) { - thrust::sort_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_major_labels.begin(), - rx_major_labels.end(), - rx_major_counts.begin()); - major_labels.resize(rx_major_labels.size(), handle.get_stream()); - major_counts.resize(major_labels.size(), handle.get_stream()); - auto pair_it = - thrust::reduce_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_major_labels.begin(), - rx_major_labels.end(), - rx_major_counts.begin(), - major_labels.begin(), - major_counts.begin()); - major_labels.resize(thrust::distance(major_labels.begin(), thrust::get<0>(pair_it)), - handle.get_stream()); - major_counts.resize(major_labels.size(), handle.get_stream()); - major_labels.shrink_to_fit(handle.get_stream()); - major_counts.shrink_to_fit(handle.get_stream()); + major_labels = std::move(rx_major_labels); + major_counts = std::move(rx_major_counts); } } else { - tmp_major_labels.shrink_to_fit(handle.get_stream()); - tmp_major_counts.shrink_to_fit(handle.get_stream()); + assert(i == 0); major_labels = std::move(tmp_major_labels); major_counts = std::move(tmp_major_counts); } } + if (multi_gpu) { + // FIXME: better refactor this sort-count_if-reduce_by_key routine for reuse + thrust::sort_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + major_labels.begin(), + major_labels.end(), + major_counts.begin()); + auto num_unique_labels = + thrust::count_if(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(major_labels.size()), + [labels = major_labels.data()] __device__(auto i) { + return (i == 0) || (labels[i - 1] != labels[i]); + }); + rmm::device_uvector tmp_major_labels(num_unique_labels, handle.get_stream()); + rmm::device_uvector tmp_major_counts(tmp_major_labels.size(), handle.get_stream()); + thrust::reduce_by_key(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + major_labels.begin(), + major_labels.end(), + major_counts.begin(), + tmp_major_labels.begin(), + tmp_major_counts.begin()); + major_labels = std::move(tmp_major_labels); + major_counts = std::move(tmp_major_counts); + } // 2. acquire unique minor labels @@ -168,28 +196,54 @@ rmm::device_uvector compute_renumber_map( minor_labels.end())), handle.get_stream()); if (multi_gpu) { + auto& comm = handle.get_comms(); auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto const row_comm_size = row_comm.get_size(); - rmm::device_uvector rx_minor_labels(0, handle.get_stream()); - std::tie(rx_minor_labels, std::ignore) = groupby_gpuid_and_shuffle_values( - row_comm, - minor_labels.begin(), - minor_labels.end(), - [key_func = detail::compute_gpu_id_from_vertex_t{row_comm_size}] __device__( - auto val) { return key_func(val); }, - handle.get_stream()); - thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_minor_labels.begin(), - rx_minor_labels.end()); - rx_minor_labels.resize( - thrust::distance( - rx_minor_labels.begin(), - thrust::unique(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), - rx_minor_labels.begin(), - rx_minor_labels.end())), - handle.get_stream()); - minor_labels = std::move(rx_minor_labels); + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + + if (row_comm_size > 1) { + rmm::device_uvector rx_minor_labels(0, handle.get_stream()); + std::tie(rx_minor_labels, std::ignore) = groupby_gpuid_and_shuffle_values( + row_comm, + minor_labels.begin(), + minor_labels.end(), + [key_func = detail::compute_gpu_id_from_vertex_t{row_comm_size}] __device__( + auto val) { return key_func(val); }, + handle.get_stream()); + thrust::sort(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + rx_minor_labels.begin(), + rx_minor_labels.end()); + rx_minor_labels.resize( + thrust::distance( + rx_minor_labels.begin(), + thrust::unique(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()), + rx_minor_labels.begin(), + rx_minor_labels.end())), + handle.get_stream()); + minor_labels = std::move(rx_minor_labels); + } + + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between + // two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK + // and MPI barrier with MPI) + // + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif } minor_labels.shrink_to_fit(handle.get_stream()); @@ -366,6 +420,19 @@ void expensive_check_edgelist( auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name()); auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name()); + // FIXME: this barrier is unnecessary if the above host_scalar_allreduce is a true host + // operation (as it serves as a barrier) barrier is necessary here to avoid potential + // overlap (which can leads to deadlock) between two different communicators (beginning of + // col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + rmm::device_uvector sorted_major_vertices(0, handle.get_stream()); { auto recvcounts = @@ -385,6 +452,17 @@ void expensive_check_edgelist( sorted_major_vertices.end()); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) + // between two different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + rmm::device_uvector sorted_minor_vertices(0, handle.get_stream()); { auto recvcounts = @@ -404,6 +482,17 @@ void expensive_check_edgelist( sorted_minor_vertices.end()); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) + // between two different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with + // DASK and MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + auto edge_first = thrust::make_zip_iterator( thrust::make_tuple(edgelist_major_vertices[i], edgelist_minor_vertices[i])); CUGRAPH_EXPECTS( @@ -509,7 +598,6 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_const_major_vertices, edgelist_const_minor_vertices, edgelist_edge_counts); - // 2. initialize partition_t object, number_of_vertices, and number_of_edges for the coarsened // graph @@ -535,6 +623,18 @@ renumber_edgelist(raft::handle_t const& handle, // FIXME: compare this hash based approach with a binary search based approach in both memory // footprint and execution time + // FIXME: this barrier is unnecessary if the above host_scalar_allgather is a true host operation + // (as it serves as a barrier) barrier is necessary here to avoid potential overlap (which can + // leads to deadlock) between two different communicators (beginning of col_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif + for (size_t i = 0; i < edgelist_major_vertices.size(); ++i) { rmm::device_uvector renumber_map_major_labels( col_comm_rank == static_cast(i) ? vertex_t{0} @@ -571,6 +671,16 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_major_vertices[i]); } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between two + // different communicators (beginning of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif { rmm::device_uvector renumber_map_minor_labels( partition.get_matrix_partition_minor_size(), handle.get_stream()); @@ -611,6 +721,16 @@ renumber_edgelist(raft::handle_t const& handle, edgelist_minor_vertices[i]); } } + // barrier is necessary here to avoid potential overlap (which can leads to deadlock) between two + // different communicators (end of row_comm) +#if 1 + // FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK and + // MPI barrier with MPI) + host_barrier(comm, handle.get_stream_view()); +#else + handle.get_stream_view().synchronize(); + comm.barrier(); // currently, this is ncclAllReduce +#endif return std::make_tuple( std::move(renumber_map_labels), partition, number_of_vertices, number_of_edges); diff --git a/cpp/src/utilities/host_barrier.cpp b/cpp/src/utilities/host_barrier.cpp new file mode 100644 index 00000000000..1c018d624ed --- /dev/null +++ b/cpp/src/utilities/host_barrier.cpp @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include + +namespace cugraph { +namespace experimental { + +// FIXME: a temporary hack till UCC is integrated into RAFT (so we can use UCC barrier for DASK and +// MPI barrier for MPI) +void host_barrier(raft::comms::comms_t const& comm, rmm::cuda_stream_view stream_view) +{ + stream_view.synchronize(); + + auto const comm_size = comm.get_size(); + auto const comm_rank = comm.get_rank(); + + // k-tree barrier + + int constexpr k = 2; + static_assert(k >= 2); + std::vector requests(k - 1); + std::vector dummies(k - 1); + + // up + + int mod = 1; + while (mod < comm_size) { + if (comm_rank % mod == 0) { + auto level_rank = comm_rank / mod; + if (level_rank % k == 0) { + auto num_irecvs = 0; + ; + for (int i = 1; i < k; ++i) { + auto src_rank = (level_rank + i) * mod; + if (src_rank < comm_size) { + comm.irecv(dummies.data() + (i - 1), + sizeof(std::byte), + src_rank, + int{0} /* tag */, + requests.data() + (i - 1)); + ++num_irecvs; + } + } + comm.waitall(num_irecvs, requests.data()); + } else { + comm.isend(dummies.data(), + sizeof(std::byte), + (level_rank - (level_rank % k)) * mod, + int{0} /* tag */, + requests.data()); + comm.waitall(1, requests.data()); + } + } + mod *= k; + } + + // down + + mod /= k; + while (mod >= 1) { + if (comm_rank % mod == 0) { + auto level_rank = comm_rank / mod; + if (level_rank % k == 0) { + auto num_isends = 0; + for (int i = 1; i < k; ++i) { + auto dst_rank = (level_rank + i) * mod; + if (dst_rank < comm_size) { + comm.isend(dummies.data() + (i - 1), + sizeof(std::byte), + dst_rank, + int{0} /* tag */, + requests.data() + (i - 1)); + ++num_isends; + } + } + comm.waitall(num_isends, requests.data()); + } else { + comm.irecv(dummies.data(), + sizeof(std::byte), + (level_rank - (level_rank % k)) * mod, + int{0} /* tag */, + requests.data()); + comm.waitall(1, requests.data()); + } + } + mod /= k; + } +} + +} // namespace experimental +} // namespace cugraph