diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d8bd4a7e99..22f00f7e393 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## New Features - PR #822 Added new functions in python graph class, similar to networkx +- PR #840 OPG degree ## Improvements - PR #764 Updated sssp and bfs with GraphCSR, removed gdf_column, added nullptr weights test for sssp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 51475343cd4..974d4f705cf 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -290,6 +290,7 @@ link_directories( "${CMAKE_CUDA_IMPLICIT_LINK_DIRECTORIES}") add_library(cugraph SHARED + src/comms/mpi/comms_mpi.cpp src/ktruss/ktruss.cu src/db/db_object.cu src/db/db_parser_integration_test.cu @@ -352,6 +353,10 @@ add_library(cugraph SHARED # add_dependencies(cugraph cugunrock) +if (BUILD_MPI) + add_compile_definitions(ENABLE_OPG=1) +endif (BUILD_MPI) + ################################################################################################### # - include paths --------------------------------------------------------------------------------- target_include_directories(cugraph diff --git a/cpp/include/comms_mpi.hpp b/cpp/include/comms_mpi.hpp new file mode 100644 index 00000000000..7a17bdfea4c --- /dev/null +++ b/cpp/include/comms_mpi.hpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#if ENABLE_OPG +#include +#include +#endif +#include +namespace cugraph { +namespace experimental { + +enum class ReduceOp { SUM, MAX, MIN }; + +// basic info about the snmg env setup +class Comm { + private: + int _p{0}; + int _rank{0}; + bool _finalize_mpi{false}; + bool _finalize_nccl{false}; + + int _device_id{0}; + int _device_count{0}; + + int _sm_count_per_device{0}; + int _max_grid_dim_1D{0}; + int _max_block_dim_1D{0}; + int _l2_cache_size{0}; + int _shared_memory_size_per_sm{0}; + +#if ENABLE_OPG + MPI_Comm _mpi_comm{}; + ncclComm_t _nccl_comm{}; +#endif + + public: + Comm(){}; + Comm(int p); +#if ENABLE_OPG + Comm(ncclComm_t comm, int size, int rank); +#endif + ~Comm(); + int get_rank() const { return _rank; } + int get_p() const { return _p; } + int get_dev() const { return _device_id; } + int get_dev_count() const { return _device_count; } + int get_sm_count() const { return _sm_count_per_device; } + bool is_master() const { return (_rank == 0) ? true : false; } + + void barrier(); + + template + void allgather(size_t size, value_t *sendbuff, value_t *recvbuff) const; + + template + void allreduce(size_t size, value_t *sendbuff, value_t *recvbuff, ReduceOp reduce_op) const; +}; + +} // namespace experimental +} // namespace cugraph diff --git a/cpp/include/graph.hpp b/cpp/include/graph.hpp index 63d188e149b..8870d413584 100644 --- a/cpp/include/graph.hpp +++ b/cpp/include/graph.hpp @@ -14,7 +14,7 @@ * limitations under the License. */ #pragma once - +#include namespace cugraph { namespace experimental { @@ -47,6 +47,7 @@ enum class DegreeDirection { template class GraphBase { public: + Comm comm; WT *edge_data; ///< edge weight GraphProperties prop; @@ -57,12 +58,16 @@ class GraphBase { /** * @brief Fill the identifiers array with the vertex identifiers. * - * @param[out] identifier Pointer to device memory to store the vertex identifiers + * @param[out] identifier Pointer to device memory to store the vertex + * identifiers */ void get_vertex_identifiers(VT *identifiers) const; + void set_communicator(Comm &comm_) { comm = comm_; } + GraphBase(WT *edge_data_, VT number_of_vertices_, ET number_of_edges_) : edge_data(edge_data_), + comm(), prop(), number_of_vertices(number_of_vertices_), number_of_edges(number_of_edges_) @@ -102,8 +107,8 @@ class GraphCOO : public GraphBase { /** * @brief Wrap existing arrays representing an edge list in a Graph. * - * GraphCOO does not own the memory used to represent this graph. This - * function does not allocate memory. + * GraphCOO does not own the memory used to represent this graph. + * This function does not allocate memory. * * @param source_indices This array of size E (number of edges) contains the index of the * source for each edge. Indices must be in the range [0, V-1]. @@ -138,9 +143,11 @@ class GraphCompressedSparseBase : public GraphBase { VT *indices{nullptr}; ///< CSR indices /** - * @brief Fill the identifiers in the array with the source vertex identifiers + * @brief Fill the identifiers in the array with the source vertex + * identifiers * - * @param[out] src_indices Pointer to device memory to store the source vertex identifiers + * @param[out] src_indices Pointer to device memory to store the + * source vertex identifiers */ void get_source_indices(VT *src_indices) const; @@ -160,8 +167,8 @@ class GraphCompressedSparseBase : public GraphBase { /** * @brief Wrap existing arrays representing adjacency lists in a Graph. - * GraphCSR does not own the memory used to represent this graph. This - * function does not allocate memory. + * GraphCSR does not own the memory used to represent this graph. + * This function does not allocate memory. * * @param offsets This array of size V+1 (V is number of vertices) contains the * offset of adjacency lists of every vertex. Offsets must be in the range [0, E] (number of @@ -199,8 +206,8 @@ class GraphCSR : public GraphCompressedSparseBase { /** * @brief Wrap existing arrays representing adjacency lists in a Graph. - * GraphCSR does not own the memory used to represent this graph. This - * function does not allocate memory. + * GraphCSR does not own the memory used to represent this graph. + * This function does not allocate memory. * * @param offsets This array of size V+1 (V is number of vertices) contains the * offset of adjacency lists of every vertex. Offsets must be in the range [0, E] (number of @@ -235,9 +242,9 @@ class GraphCSC : public GraphCompressedSparseBase { GraphCSC() : GraphCompressedSparseBase(nullptr, nullptr, nullptr, 0, 0) {} /** - * @brief Wrap existing arrays representing transposed adjacency lists in a Graph. - * GraphCSC does not own the memory used to represent this graph. This - * function does not allocate memory. + * @brief Wrap existing arrays representing transposed adjacency lists in + * a Graph. GraphCSC does not own the memory used to represent this graph. + * This function does not allocate memory. * * @param offsets This array of size V+1 (V is number of vertices) contains the * offset of adjacency lists of every vertex. Offsets must be in the range [0, E] (number of diff --git a/cpp/src/comms/mpi/comms_mpi.cpp b/cpp/src/comms/mpi/comms_mpi.cpp new file mode 100644 index 00000000000..f473c0a1939 --- /dev/null +++ b/cpp/src/comms/mpi/comms_mpi.cpp @@ -0,0 +1,279 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "utilities/error_utils.h" + +namespace cugraph { +namespace experimental { +#if ENABLE_OPG + +/**---------------------------------------------------------------------------* + * @brief Exception thrown when a NCCL error is encountered. + * + *---------------------------------------------------------------------------**/ +struct nccl_error : public std::runtime_error { + nccl_error(std::string const &message) : std::runtime_error(message) {} +}; + +inline void throw_nccl_error(ncclResult_t error, const char *file, unsigned int line) +{ + throw nccl_error(std::string{"NCCL error encountered at: " + std::string{file} + ":" + + std::to_string(line) + ": " + ncclGetErrorString(error)}); +} + +#define NCCL_TRY(call) \ + { \ + ncclResult_t nccl_status = (call); \ + if (nccl_status != ncclSuccess) { throw_nccl_error(nccl_status, __FILE__, __LINE__); } \ + } +// MPI errors are expected to be fatal before reaching this. +// FIXME : improve when adding raft comms +#define MPI_TRY(cmd) \ + { \ + int e = cmd; \ + if (e != MPI_SUCCESS) { CUGRAPH_FAIL("Failed: MPI error"); } \ + } + +template +constexpr MPI_Datatype get_mpi_type() +{ + if (std::is_integral::value) { + if (std::is_signed::value) { + if (sizeof(value_t) == 1) { + return MPI_INT8_T; + } else if (sizeof(value_t) == 2) { + return MPI_INT16_T; + } else if (sizeof(value_t) == 4) { + return MPI_INT32_T; + } else if (sizeof(value_t) == 8) { + return MPI_INT64_T; + } else { + CUGRAPH_FAIL("unsupported type"); + } + } else { + if (sizeof(value_t) == 1) { + return MPI_UINT8_T; + } else if (sizeof(value_t) == 2) { + return MPI_UINT16_T; + } else if (sizeof(value_t) == 4) { + return MPI_UINT32_T; + } else if (sizeof(value_t) == 8) { + return MPI_UINT64_T; + } else { + CUGRAPH_FAIL("unsupported type"); + } + } + } else if (std::is_same::value) { + return MPI_FLOAT; + } else if (std::is_same::value) { + return MPI_DOUBLE; + } else { + CUGRAPH_FAIL("unsupported type"); + } +} + +template +constexpr ncclDataType_t get_nccl_type() +{ + if (std::is_integral::value) { + if (std::is_signed::value) { + if (sizeof(value_t) == 1) { + return ncclInt8; + } else if (sizeof(value_t) == 4) { + return ncclInt32; + } else if (sizeof(value_t) == 8) { + return ncclInt64; + } else { + CUGRAPH_FAIL("unsupported type"); + } + } else { + if (sizeof(value_t) == 1) { + return ncclUint8; + } else if (sizeof(value_t) == 4) { + return ncclUint32; + } else if (sizeof(value_t) == 8) { + return ncclUint64; + } else { + CUGRAPH_FAIL("unsupported type"); + } + } + } else if (std::is_same::value) { + return ncclFloat32; + } else if (std::is_same::value) { + return ncclFloat64; + } else { + CUGRAPH_FAIL("unsupported type"); + } +} + +constexpr MPI_Op get_mpi_reduce_op(ReduceOp reduce_op) +{ + if (reduce_op == ReduceOp::SUM) { + return MPI_SUM; + } else if (reduce_op == ReduceOp::MAX) { + return MPI_MAX; + } else if (reduce_op == ReduceOp::MIN) { + return MPI_MIN; + } else { + CUGRAPH_FAIL("unsupported type"); + } +} + +constexpr ncclRedOp_t get_nccl_reduce_op(ReduceOp reduce_op) +{ + if (reduce_op == ReduceOp::SUM) { + return ncclSum; + } else if (reduce_op == ReduceOp::MAX) { + return ncclMax; + } else if (reduce_op == ReduceOp::MIN) { + return ncclMin; + } else { + CUGRAPH_FAIL("unsupported type"); + } +} +#endif + +Comm::Comm(int p) : _p{p} +{ +#if ENABLE_OPG + // MPI + int flag{}, mpi_world_size; + + MPI_TRY(MPI_Initialized(&flag)); + + if (flag == false) { + int provided{}; + MPI_TRY(MPI_Init_thread(nullptr, nullptr, MPI_THREAD_MULTIPLE, &provided)); + if (provided != MPI_THREAD_MULTIPLE) { MPI_TRY(MPI_ERR_OTHER); } + _finalize_mpi = true; + } + + MPI_TRY(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); + MPI_TRY(MPI_Comm_size(MPI_COMM_WORLD, &mpi_world_size)); + CUGRAPH_EXPECTS((_p == mpi_world_size), + "Invalid input arguments: p should match the number of MPI processes."); + + _mpi_comm = MPI_COMM_WORLD; + + // CUDA + + CUDA_TRY(cudaGetDeviceCount(&_device_count)); + _device_id = _rank % _device_count; // FIXME : assumes each node has the same number of GPUs + CUDA_TRY(cudaSetDevice(_device_id)); + + CUDA_TRY( + cudaDeviceGetAttribute(&_sm_count_per_device, cudaDevAttrMultiProcessorCount, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_max_grid_dim_1D, cudaDevAttrMaxGridDimX, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_max_block_dim_1D, cudaDevAttrMaxBlockDimX, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_l2_cache_size, cudaDevAttrL2CacheSize, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute( + &_shared_memory_size_per_sm, cudaDevAttrMaxSharedMemoryPerMultiprocessor, _device_id)); + + // NCCL + + ncclUniqueId nccl_unique_id_p{}; + if (get_rank() == 0) { NCCL_TRY(ncclGetUniqueId(&nccl_unique_id_p)); } + MPI_TRY(MPI_Bcast(&nccl_unique_id_p, sizeof(ncclUniqueId), MPI_BYTE, 0, _mpi_comm)); + NCCL_TRY(ncclCommInitRank(&_nccl_comm, get_p(), nccl_unique_id_p, get_rank())); + _finalize_nccl = true; +#endif +} + +#if ENABLE_OPG +Comm::Comm(ncclComm_t comm, int size, int rank) : _nccl_comm(comm), _p(size), _rank(rank) +{ + // CUDA + CUDA_TRY(cudaGetDeviceCount(&_device_count)); + _device_id = _rank % _device_count; // FIXME : assumes each node has the same number of GPUs + CUDA_TRY(cudaSetDevice(_device_id)); // FIXME : check if this is needed or if + // python takes care of this + + CUDA_TRY( + cudaDeviceGetAttribute(&_sm_count_per_device, cudaDevAttrMultiProcessorCount, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_max_grid_dim_1D, cudaDevAttrMaxGridDimX, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_max_block_dim_1D, cudaDevAttrMaxBlockDimX, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute(&_l2_cache_size, cudaDevAttrL2CacheSize, _device_id)); + CUDA_TRY(cudaDeviceGetAttribute( + &_shared_memory_size_per_sm, cudaDevAttrMaxSharedMemoryPerMultiprocessor, _device_id)); +} +#endif + +Comm::~Comm() +{ +#if ENABLE_OPG + // NCCL + if (_finalize_nccl) ncclCommDestroy(_nccl_comm); + + if (_finalize_mpi) { MPI_Finalize(); } +#endif +} + +void Comm::barrier() +{ +#if ENABLE_OPG + MPI_Barrier(MPI_COMM_WORLD); +#endif +} + +template +void Comm::allgather(size_t size, value_t *sendbuff, value_t *recvbuff) const +{ +#if ENABLE_OPG + NCCL_TRY(ncclAllGather((const void *)sendbuff, + (void *)recvbuff, + size, + get_nccl_type(), + _nccl_comm, + cudaStreamDefault)); +#endif +} + +template +void Comm::allreduce(size_t size, value_t *sendbuff, value_t *recvbuff, ReduceOp reduce_op) const +{ +#if ENABLE_OPG + NCCL_TRY(ncclAllReduce((const void *)sendbuff, + (void *)recvbuff, + size, + get_nccl_type(), + get_nccl_reduce_op(reduce_op), + _nccl_comm, + cudaStreamDefault)); +#endif +} + +// explicit +template void Comm::allgather(size_t size, int *sendbuff, int *recvbuff) const; +template void Comm::allgather(size_t size, float *sendbuff, float *recvbuff) const; +template void Comm::allgather(size_t size, double *sendbuff, double *recvbuff) const; +template void Comm::allreduce(size_t size, + int *sendbuff, + int *recvbuff, + ReduceOp reduce_op) const; +template void Comm::allreduce(size_t size, + float *sendbuff, + float *recvbuff, + ReduceOp reduce_op) const; +template void Comm::allreduce(size_t size, + double *sendbuff, + double *recvbuff, + ReduceOp reduce_op) const; + +} // namespace experimental +} // namespace cugraph diff --git a/cpp/src/structure/graph.cu b/cpp/src/structure/graph.cu index a099a16d7ba..1d329e69321 100644 --- a/cpp/src/structure/graph.cu +++ b/cpp/src/structure/graph.cu @@ -1,12 +1,17 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2020, NVIDIA CORPORATION. * - * NVIDIA CORPORATION and its licensors retain all intellectual property - * and proprietary rights in and to this software, related documentation - * and any modifications thereto. Any use, reproduction, disclosure or - * distribution of this software and related documentation without an express - * license agreement from NVIDIA CORPORATION is strictly prohibited. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #include @@ -31,7 +36,9 @@ void degree_from_offsets(vertex_t number_of_vertices, } template -void degree_from_vertex_ids(edge_t number_of_edges, +void degree_from_vertex_ids(const cugraph::experimental::Comm &comm, + vertex_t number_of_vertices, + edge_t number_of_edges, vertex_t const *indices, edge_t *degree, cudaStream_t stream) @@ -41,6 +48,7 @@ void degree_from_vertex_ids(edge_t number_of_edges, thrust::make_counting_iterator(0), thrust::make_counting_iterator(number_of_edges), [indices, degree] __device__(edge_t e) { cugraph::atomicAdd(degree + indices[e], 1); }); + comm.allreduce(number_of_vertices, degree, degree, cugraph::experimental::ReduceOp::SUM); } } // namespace @@ -74,11 +82,24 @@ void GraphCOO::degree(ET *degree, DegreeDirection direction) const cudaStream_t stream{nullptr}; if (direction != DegreeDirection::IN) { - degree_from_vertex_ids(GraphBase::number_of_edges, src_indices, degree, stream); + if (GraphBase::comm.get_p()) // FIXME retrieve global source + // indexing for the allreduce work + CUGRAPH_FAIL("OPG degree not implemented for OUT degree"); + degree_from_vertex_ids(GraphBase::comm, + GraphBase::number_of_vertices, + GraphBase::number_of_edges, + src_indices, + degree, + stream); } if (direction != DegreeDirection::OUT) { - degree_from_vertex_ids(GraphBase::number_of_edges, dst_indices, degree, stream); + degree_from_vertex_ids(GraphBase::comm, + GraphBase::number_of_vertices, + GraphBase::number_of_edges, + dst_indices, + degree, + stream); } } @@ -94,11 +115,20 @@ void GraphCompressedSparseBase::degree(ET *degree, DegreeDirection d cudaStream_t stream{nullptr}; if (direction != DegreeDirection::IN) { + if (GraphBase::comm.get_p()) + CUGRAPH_FAIL("OPG degree not implemented for OUT degree"); // FIXME retrieve global + // source indexing for + // the allreduce to work degree_from_offsets(GraphBase::number_of_vertices, offsets, degree, stream); } if (direction != DegreeDirection::OUT) { - degree_from_vertex_ids(GraphBase::number_of_edges, indices, degree, stream); + degree_from_vertex_ids(GraphBase::comm, + GraphBase::number_of_vertices, + GraphBase::number_of_edges, + indices, + degree, + stream); } } diff --git a/cpp/src/utilities/cuda_utils.cuh b/cpp/src/utilities/cuda_utils.cuh index e13d6295981..8f7874dad93 100644 --- a/cpp/src/utilities/cuda_utils.cuh +++ b/cpp/src/utilities/cuda_utils.cuh @@ -63,4 +63,4 @@ __device__ static __forceinline__ int32_t atomicAdd(int32_t *addr, int32_t val) return ::atomicAdd(addr, val); } -} // namespace cugraph +} // namespace cugraph \ No newline at end of file diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index c0632a2f3f8..75d9c49e1a5 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -249,6 +249,11 @@ if (BUILD_MPI) "${CMAKE_CURRENT_SOURCE_DIR}/nccl/nccl_test.cu") ConfigureTest(NCCL_TEST "${NCCL_TEST_SRC}" "") + + set(NCCL_DEGREE_TEST_SRC + "${CMAKE_CURRENT_SOURCE_DIR}/nccl/degree_test.cu") + + ConfigureTest(NCCL_DEGREE_TEST "${NCCL_DEGREE_TEST_SRC}" "") endif(BUILD_MPI) ################################################################################################### diff --git a/cpp/tests/nccl/degree_test.cu b/cpp/tests/nccl/degree_test.cu new file mode 100644 index 00000000000..7683874939c --- /dev/null +++ b/cpp/tests/nccl/degree_test.cu @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include "gtest/gtest.h" +#include "test_utils.h" + +// ref Degree on the host +template +void ref_degree_h(std::vector &ind_h, std::vector °ree) +{ + for (size_t i = 0; i < degree.size(); i++) degree[i] = 0; + for (size_t i = 0; i < ind_h.size(); i++) degree[ind_h[i]] += 1; +} + +// global to local offsets by shifting all offsets by the first offset value +template +void shift_by_front(std::vector &v) +{ + auto start = v.front(); + for (auto i = size_t{0}; i < v.size(); ++i) v[i] -= start; +} + +// 1D partitioning such as each GPU has about the same number of edges +template +void opg_edge_partioning( + int r, int p, std::vector &ind_h, std::vector &part_offset, size_t &e_loc) +{ + // set first and last partition offsets + part_offset[0] = 0; + part_offset[p] = ind_h.size(); + // part_offset[p] = *(std::max_element(ind_h.begin(), ind_h.end())); + auto loc_nnz = ind_h.size() / p; + for (int i = 1; i < p; i++) { + // get the first vertex ID of each partition + auto start_nnz = i * loc_nnz; + auto start_v = 0; + for (auto j = size_t{0}; j < ind_h.size(); ++j) { + if (j >= start_nnz) { + start_v = j; + break; + } + } + part_offset[i] = start_v; + } + e_loc = part_offset[r + 1] - part_offset[r]; +} +TEST(degree, success) +{ + int v = 6; + + // host + std::vector src_h = {0, 0, 2, 2, 2, 3, 3, 4, 4, 5, 5}, + dest_h = {1, 2, 0, 1, 4, 4, 5, 3, 5, 3, 1}; + std::vector degree_h(v, 0.0), degree_ref(v, 0.0); + + // MG + int p; + MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &p)); + cugraph::experimental::Comm comm(p); + std::vector part_offset(p + 1); + auto i = comm.get_rank(); + size_t e_loc; + + opg_edge_partioning(i, p, src_h, part_offset, e_loc); +#ifdef OPG_VERBOSE + sleep(i); + for (auto j = part_offset.begin(); j != part_offset.end(); ++j) std::cout << *j << ' '; + std::cout << std::endl; + std::cout << "eloc: " << e_loc << std::endl; +#endif + std::vector src_loc_h(src_h.begin() + part_offset[i], + src_h.begin() + part_offset[i] + e_loc), + dest_loc_h(dest_h.begin() + part_offset[i], dest_h.begin() + part_offset[i] + e_loc); + shift_by_front(src_loc_h); + + // print mg info + printf("# Rank %2d - Pid %6d - device %2d\n", comm.get_rank(), getpid(), comm.get_dev()); + + // local device + thrust::device_vector src_d(src_loc_h.begin(), src_loc_h.end()); + thrust::device_vector dest_d(dest_loc_h.begin(), dest_loc_h.end()); + thrust::device_vector degree_d(v); + + // load local chunck to cugraph + cugraph::experimental::GraphCOO G(thrust::raw_pointer_cast(src_d.data()), + thrust::raw_pointer_cast(dest_d.data()), + nullptr, + degree_h.size(), + e_loc); + G.set_communicator(comm); + + // OUT degree + G.degree(thrust::raw_pointer_cast(degree_d.data()), cugraph::experimental::DegreeDirection::IN); + thrust::copy(degree_d.begin(), degree_d.end(), degree_h.begin()); + ref_degree_h(dest_h, degree_ref); + // sleep(i); + for (size_t j = 0; j < degree_h.size(); ++j) EXPECT_EQ(degree_ref[j], degree_h[j]); + std::cout << "Rank " << i << " done checking." << std::endl; +} + +int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + MPI_Init(&argc, &argv); + rmmInitialize(nullptr); + int rc = RUN_ALL_TESTS(); + rmmFinalize(); + MPI_Finalize(); + return rc; +}