Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-GPU hang on graph generation #1572

Merged
merged 10 commits into from
May 6, 2021
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ add_library(cugraph SHARED
src/experimental/pagerank.cu
src/experimental/katz_centrality.cu
src/tree/mst.cu
src/utilities/host_barrier.cpp
)

target_link_directories(cugraph
Expand Down
89 changes: 89 additions & 0 deletions cpp/include/patterns/copy_to_adj_matrix_row_col.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <utilities/dataframe_buffer.cuh>
#include <utilities/device_comm.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>
#include <utilities/host_scalar_comm.cuh>
#include <utilities/thrust_tuple_utils.cuh>
#include <vertex_partition_device.cuh>
Expand Down Expand Up @@ -60,6 +61,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

std::vector<size_t> rx_counts(col_comm_size, size_t{0});
std::vector<size_t> displacements(col_comm_size, size_t{0});
for (int i = 0; i < col_comm_size; ++i) {
Expand All @@ -72,6 +84,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
rx_counts,
displacements,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
Expand Down Expand Up @@ -106,6 +129,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(col_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -171,6 +205,17 @@ void copy_to_matrix_major(raft::handle_t const& handle,
matrix_major_value_output_first + matrix_partition.get_major_value_start_offset());
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
Expand Down Expand Up @@ -202,6 +247,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

std::vector<size_t> rx_counts(row_comm_size, size_t{0});
std::vector<size_t> displacements(row_comm_size, size_t{0});
for (int i = 0; i < row_comm_size; ++i) {
Expand All @@ -214,6 +270,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
rx_counts,
displacements,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_rows()
Expand Down Expand Up @@ -248,6 +315,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(row_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -310,6 +388,17 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
matrix_minor_value_output_first);
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(graph_view.get_number_of_local_vertices() ==
graph_view.get_number_of_local_adj_matrix_partition_rows());
Expand Down
46 changes: 46 additions & 0 deletions cpp/include/patterns/copy_v_transform_reduce_in_out_nbr.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utilities/dataframe_buffer.cuh>
#include <utilities/device_comm.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>

#include <raft/cudart_utils.h>
#include <rmm/thrust_rmm_allocator.h>
Expand Down Expand Up @@ -496,20 +497,43 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
}

if (GraphViewType::is_multi_gpu && update_major) {
auto& comm = handle.get_comms();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

device_reduce(col_comm,
major_buffer_first,
vertex_value_output_first,
matrix_partition.get_major_size(),
raft::comms::op_t::SUM,
i,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand All @@ -523,6 +547,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

for (int i = 0; i < row_comm_size; ++i) {
auto offset = (graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size + i) -
graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size));
Expand All @@ -535,6 +570,17 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
i,
handle.get_stream());
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utilities/collect_comm.cuh>
#include <utilities/dataframe_buffer.cuh>
#include <utilities/error.hpp>
#include <utilities/host_barrier.hpp>
#include <utilities/host_scalar_comm.cuh>
#include <utilities/shuffle_comm.cuh>
#include <vertex_partition_device.cuh>
Expand Down Expand Up @@ -211,10 +212,22 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto kv_map_ptr = std::make_unique<cuco::static_map<vertex_t, value_t>>(
size_t{0}, invalid_vertex_id<vertex_t>::value, invalid_vertex_id<vertex_t>::value);
if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto map_counts =
host_scalar_allgather(row_comm,
static_cast<size_t>(thrust::distance(map_key_first, map_key_last)),
Expand Down Expand Up @@ -292,6 +305,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(

// 2. aggregate each vertex out-going edges based on keys and transform-reduce.

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

rmm::device_uvector<vertex_t> major_vertices(0, handle.get_stream());
auto e_op_result_buffer = allocate_dataframe_buffer<T>(0, handle.get_stream());
for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) {
Expand Down Expand Up @@ -436,6 +464,9 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
// FIXME: additional optimization is possible if reduce_op is a pure function (and reduce_op
// can be mapped to ncclRedOp_t).

// FIXME: a temporary workaround for a NCCL (2.9.6) bug that causes a hang on DGX1 (due to
// remote memory allocation), this barrier is unnecessary otherwise.
col_comm.barrier();
auto rx_sizes =
host_scalar_gather(col_comm, tmp_major_vertices.size(), i, handle.get_stream());
std::vector<size_t> rx_displs{};
Expand Down Expand Up @@ -475,6 +506,21 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
}
}

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

thrust::fill(rmm::exec_policy(handle.get_stream())->on(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
Expand Down
Loading