Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary host barrier synchronization #1917

Merged
89 changes: 0 additions & 89 deletions cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cugraph/utilities/dataframe_buffer.cuh>
#include <cugraph/utilities/device_comm.cuh>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/host_barrier.hpp>
#include <cugraph/utilities/host_scalar_comm.cuh>
#include <cugraph/utilities/thrust_tuple_utils.cuh>
#include <cugraph/vertex_partition_device_view.cuh>
Expand Down Expand Up @@ -65,17 +64,6 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

if (matrix_major_value_output.key_first()) {
auto key_offsets = GraphViewType::is_adj_matrix_transposed
? *(graph_view.get_local_sorted_unique_edge_col_offsets())
Expand Down Expand Up @@ -122,17 +110,6 @@ void copy_to_matrix_major(raft::handle_t const& handle,
displacements,
handle.get_stream());
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(!(matrix_major_value_output.key_first()));
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
Expand Down Expand Up @@ -170,17 +147,6 @@ void copy_to_matrix_major(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(col_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -260,17 +226,6 @@ void copy_to_matrix_major(raft::handle_t const& handle,
matrix_major_value_output.value_data() + matrix_partition.get_major_value_start_offset());
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(!(matrix_major_value_output.key_first()));
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
Expand Down Expand Up @@ -305,17 +260,6 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

if (matrix_minor_value_output.key_first()) {
auto key_offsets = GraphViewType::is_adj_matrix_transposed
? *(graph_view.get_local_sorted_unique_edge_row_offsets())
Expand Down Expand Up @@ -362,17 +306,6 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
displacements,
handle.get_stream());
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(!(matrix_minor_value_output.key_first()));
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
Expand Down Expand Up @@ -410,17 +343,6 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto rx_counts =
host_scalar_allgather(row_comm,
static_cast<size_t>(thrust::distance(vertex_first, vertex_last)),
Expand Down Expand Up @@ -498,17 +420,6 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
matrix_minor_value_output.value_data());
}
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
} else {
assert(!(matrix_minor_value_output.key_first()));
assert(graph_view.get_number_of_local_vertices() ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cugraph/utilities/dataframe_buffer.cuh>
#include <cugraph/utilities/device_comm.cuh>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/host_barrier.hpp>

#include <raft/cudart_utils.h>
#include <raft/handle.hpp>
Expand Down Expand Up @@ -603,35 +602,13 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

device_reduce(col_comm,
major_buffer_first,
vertex_value_output_first,
matrix_partition.get_major_size(),
raft::comms::op_t::SUM,
i,
handle.get_stream());

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand All @@ -645,17 +622,6 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
auto const col_comm_rank = col_comm.get_rank();
auto const col_comm_size = col_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

for (int i = 0; i < row_comm_size; ++i) {
auto offset = (graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size + i) -
graph_view.get_vertex_partition_first(col_comm_rank * row_comm_size));
Expand All @@ -668,17 +634,6 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
i,
handle.get_stream());
}

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (end of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cugraph/utilities/collect_comm.cuh>
#include <cugraph/utilities/dataframe_buffer.cuh>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/host_barrier.hpp>
#include <cugraph/utilities/host_scalar_comm.cuh>
#include <cugraph/utilities/shuffle_comm.cuh>
#include <cugraph/vertex_partition_device_view.cuh>
Expand Down Expand Up @@ -242,17 +241,6 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto const row_comm_rank = row_comm.get_rank();
auto const row_comm_size = row_comm.get_size();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of row_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif

auto map_counts = host_scalar_allgather(
row_comm,
static_cast<size_t>(thrust::distance(map_unique_key_first, map_unique_key_last)),
Expand Down Expand Up @@ -320,21 +308,6 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(

// 2. aggregate each vertex out-going edges based on keys and transform-reduce.

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

rmm::device_uvector<vertex_t> major_vertices(0, handle.get_stream());
auto e_op_result_buffer = allocate_dataframe_buffer<T>(0, handle.get_stream());
for (size_t i = 0; i < graph_view.get_number_of_local_adj_matrix_partitions(); ++i) {
Expand Down Expand Up @@ -546,21 +519,6 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
}
}

if (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();

// barrier is necessary here to avoid potential overlap (which can leads to deadlock) between
// two different communicators (beginning of col_comm)
#if 1
// FIXME: temporary hack till UCC is integrated into RAFT (so we can use UCC barrier with DASK
// and MPI barrier with MPI)
host_barrier(comm, handle.get_stream_view());
#else
handle.get_stream_view().synchronize();
comm.barrier(); // currently, this is ncclAllReduce
#endif
}

auto execution_policy = handle.get_thrust_policy();
thrust::fill(execution_policy,
vertex_value_output_first,
Expand Down
Loading