Skip to content

Commit

Permalink
Fix multi-GPU hang on graph generation (#1572)
Browse files Browse the repository at this point in the history
Two bug fixes for multi-GPU graph creation.

- Add barrier to avoid overlap between different communicators
- NCCL bug workaround on DGX1

Authors:
  - Seunghwa Kang (https://github.com/seunghwak)

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)
  - Andrei Schaffer (https://github.com/aschaffer)
  - Brad Rees (https://github.com/BradReesWork)

URL: #1572
  • Loading branch information
seunghwak authored May 6, 2021
1 parent 50b43f7 commit e4f58eb
Show file tree
Hide file tree
Showing 11 changed files with 559 additions and 48 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ add_library(cugraph SHARED
src/experimental/pagerank.cu
src/experimental/katz_centrality.cu
src/tree/mst.cu
src/utilities/host_barrier.cpp
)

target_link_directories(cugraph
Expand Down
89 changes: 89 additions & 0 deletions cpp/include/patterns/copy_to_adj_matrix_row_col.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utilities/dataframe_buffer.cuh>
#include <utilities/device_comm.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>
#include <utilities/host_scalar_comm.cuh>
#include <utilities/thrust_tuple_utils.cuh>
#include <vertex_partition_device.cuh>
Expand Down Expand Up @@ -60,6 +61,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

std::vector<size_t> rx_counts(col_comm_size, size_t{0});
std::vector<size_t> displacements(col_comm_size, size_t{0});
for (int i = 0; i < col_comm_size; ++i) {
Expand All @@ -72,6 +84,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
rx_counts,
displacements,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
Expand Down Expand Up @@ -106,6 +129,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(col_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -171,6 +205,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
matrix_major_value_output_first + matrix_partition.get_major_value_start_offset());
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
Expand Down Expand Up @@ -202,6 +247,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

std::vector<size_t> rx_counts(row_comm_size, size_t{0});
std::vector<size_t> displacements(row_comm_size, size_t{0});
for (int i = 0; i < row_comm_size; ++i) {
Expand All @@ -214,6 +270,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
rx_counts,
displacements,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_rows()
Expand Down Expand Up @@ -248,6 +315,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(row_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -310,6 +388,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
matrix_minor_value_output_first);
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() ==
graph_view.get_number_of_local_adj_matrix_partition_rows());
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utilities/dataframe_buffer.cuh>
#include <utilities/device_comm.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>

#include <raft/cudart_utils.h>
#include <rmm/thrust_rmm_allocator.h>
Expand Down Expand Up @@ -496,20 +497,43 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
}

if (GraphViewType::is_multi_gpu && update_major) {
auto& comm = handle.get_comms();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

device_reduce(col_comm,
major_buffer_first,
vertex_value_output_first,
matrix_partition.get_major_size(),
raft::comms::op_t::SUM,
i,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand All @@ -523,6 +547,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

for (int i = 0; i < row_comm_size; ++i) {
auto offset = (graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size + i) -
graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size));
Expand All @@ -535,6 +570,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
i,
handle.get_stream());
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utilities/collect_comm.cuh>
#include <utilities/dataframe_buffer.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>
#include <utilities/host_scalar_comm.cuh>
#include <utilities/shuffle_comm.cuh>
#include <vertex_partition_device.cuh>
Expand Down Expand Up @@ -211,10 +212,22 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto kv_map_ptr = std::make_unique<cuco::static_map<vertex_t, value_t>>(
size_t{0}, invalid_vertex_id<vertex_t>::value, invalid_vertex_id<vertex_t>::value);
if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto map_counts =
host_scalar_allgather(row_comm,
static_cast<size_t>(thrust::distance(map_key_first, map_key_last)),
Expand Down Expand Up @@ -292,6 +305,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(

// 2. aggregate each vertex out-going edges based on keys and transform-reduce.

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

rmm::device_uvector<vertex_t> major_vertices(0, handle.get_stream());
auto e_op_result_buffer = allocate_dataframe_buffer<T>(0, handle.get_stream());
for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) {
Expand Down Expand Up @@ -436,6 +464,9 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
// FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op
// can be mapped to ncclRedOp_t).

// FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to
// remote memory allocation), this barrier is unnecessary otherwise.
col_comm.barrier();
auto rx_sizes =
host_scalar_gather(col_comm, tmp_major_vertices.size(), i, handle.get_stream());
std::vector<size_t> rx_displs{};
Expand Down Expand Up @@ -475,6 +506,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
}
}

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
Expand Down
Loading

0 comments on commit e4f58eb

Please sign in to comment.