Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] BUG Move subcomms init outside of individual algorithm functions #1196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
- PR #1166 Fix misspelling of function calls in asserts causing debug build to fail
- PR #1180 BLD Adopt RAFT model for cuhornet dependency
- PR #1181 Fix notebook error handling in CI
- PR #1186 BLD Installing raft headers under cugraph
- PR #1186 BLD Installing raft headers under cugraph
- PR #1192 Fix benchmark notes and documentation issues in graph.py
- PR #1196 Move subcomms init outside of individual algorithm functions

# cuGraph 0.15.0 (26 Aug 2020)

Expand Down
4 changes: 4 additions & 0 deletions cpp/include/partition_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ struct key_naming_t {
using pair_comms_t =
std::pair<std::shared_ptr<raft::comms::comms_t>, std::shared_ptr<raft::comms::comms_t>>;

// FIXME: This class is a misnomer since the python layer is currently
// responsible for creating and managing partitioning. Consider renaming it or
// refactoring it away.
//
// class responsible for creating 2D partition sub-comms:
// this is instantiated by each worker (processing element, PE)
// for the row/column it belongs to;
Expand Down
15 changes: 13 additions & 2 deletions cpp/include/utilities/cython.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ void populate_graph_container(graph_container_t& graph_container,
size_t num_partition_edges,
size_t num_global_vertices,
size_t num_global_edges,
size_t row_comm_size, // pcols
size_t col_comm_size, // prows
bool sorted_by_degree,
bool transposed,
bool multi_gpu);
Expand Down Expand Up @@ -201,5 +199,18 @@ std::pair<size_t, weight_t> call_louvain(raft::handle_t const& handle,
size_t max_level,
weight_t resolution);

// Helper for setting up subcommunicators, typically called as part of the
// user-initiated comms initialization in Python.
//
// raft::handle_t& handle
// Raft handle for which the new subcommunicators will be created. The
// subcommunicators will then be accessible from the handle passed to the
// parallel processes.
//
// size_t row_comm_size
// Number of items in a partition row (ie. pcols), needed for creating the
// appropriate number of subcommunicator instances.
void init_subcomms(raft::handle_t& handle, size_t row_comm_size);
rlratzel marked this conversation as resolved.
Show resolved Hide resolved

} // namespace cython
} // namespace cugraph
21 changes: 9 additions & 12 deletions cpp/src/utilities/cython.cu
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ void populate_graph_container(graph_container_t& graph_container,
size_t num_partition_edges,
size_t num_global_vertices,
size_t num_global_edges,
size_t row_comm_size, // pcols
size_t col_comm_size, // prows
bool sorted_by_degree,
bool transposed,
bool multi_gpu)
Expand All @@ -123,20 +121,12 @@ void populate_graph_container(graph_container_t& graph_container,
bool do_expensive_check{false};
bool hypergraph_partitioned{false};

// FIXME: Consider setting up the subcomms right after initializing comms, no
// need to delay to this point.
// Setup the subcommunicators needed for this partition on the handle.
partition_2d::subcomm_factory_t<partition_2d::key_naming_t, int> subcomm_factory(handle,
row_comm_size);
// FIXME: once the subcomms are set up earlier (outside this function), remove
// the row/col_comm_size params and retrieve them from the handle (commented
// out lines below)
auto& row_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().row_name());
auto const row_comm_rank = row_comm.get_rank();
// auto const row_comm_size = row_comm.get_size(); // pcols
auto const row_comm_size = row_comm.get_size(); // pcols
auto& col_comm = handle.get_subcomm(cugraph::partition_2d::key_naming_t().col_name());
auto const col_comm_rank = col_comm.get_rank();
// auto const col_comm_size = col_comm.get_size(); // prows
auto const col_comm_size = col_comm.get_size(); // prows

graph_container.vertex_partition_offsets = vertex_partition_offsets;
graph_container.src_vertices = src_vertices;
Expand Down Expand Up @@ -491,5 +481,12 @@ template std::pair<size_t, double> call_louvain(raft::handle_t const& handle,
size_t max_level,
double resolution);

// Helper for setting up subcommunicators
void init_subcomms(raft::handle_t& handle, size_t row_comm_size)
{
partition_2d::subcomm_factory_t<partition_2d::key_naming_t, int> subcomm_factory(handle,
row_comm_size);
}

} // namespace cython
} // namespace cugraph
25 changes: 25 additions & 0 deletions python/cugraph/comms/comms.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# cython: profile=False
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3

from cugraph.structure.graph_primtypes cimport handle_t


cdef extern from "utilities/cython.hpp" namespace "cugraph::cython":

cdef void init_subcomms(handle_t &handle,
size_t row_comm_size)
137 changes: 118 additions & 19 deletions python/cugraph/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,163 @@
from cugraph.raft.dask.common.comms import Comms as raftComms
from cugraph.raft.dask.common.comms import worker_state
from cugraph.raft.common.handle import Handle
from cugraph.comms.comms_wrapper import init_subcomms as c_init_subcomms
from dask.distributed import default_client
from cugraph.dask.common import read_utils
import math


__instance = None
__default_handle = None
__subcomm = None


# Intialize Comms. If explicit Comms not provided as arg,
# default Comms are initialized as per client information.
def initialize(comms=None, p2p=False):
def __get_2D_div(ngpus):
pcols = int(math.sqrt(ngpus))
while ngpus % pcols != 0:
pcols = pcols - 1
return int(ngpus/pcols), pcols


def subcomm_init(prows, pcols, partition_type):
sID = get_session_id()
ngpus = get_n_workers()
if prows is None and pcols is None:
if partition_type == 1:
pcols, prows = __get_2D_div(ngpus)
else:
prows, pcols = __get_2D_div(ngpus)
else:
if prows is not None and pcols is not None:
if ngpus != prows*pcols:
raise Exception('prows*pcols should be equal to the\
number of processes')
elif prows is not None:
if ngpus % prows != 0:
raise Exception('prows must be a factor of the number\
of processes')
pcols = int(ngpus/prows)
elif pcols is not None:
if ngpus % pcols != 0:
raise Exception('pcols must be a factor of the number\
of processes')
prows = int(ngpus/pcols)

client = default_client()
client.run(_subcomm_init, sID, pcols)
global __subcomm
__subcomm = (prows, pcols, partition_type)


def _subcomm_init(sID, partition_row_size):
handle = get_handle(sID)
c_init_subcomms(handle, partition_row_size)


def initialize(comms=None,
p2p=False,
prows=None,
pcols=None,
partition_type=1):
"""
Initialize a communicator for multi-node/multi-gpu communications.
It is expected to be called right after client initialization for running
multi-GPU algorithms. It wraps raft comms that manages underlying NCCL and
UCX comms handles across the workers of a Dask cluster.
Initialize a communicator for multi-node/multi-gpu communications. It is
expected to be called right after client initialization for running
multi-GPU algorithms (this wraps raft comms that manages underlying NCCL
and UCX comms handles across the workers of a Dask cluster).

It is recommended to also call `destroy()` when the comms are no longer
needed so the underlying resources can be cleaned up.

Parameters
----------
comms : raft Comms
A pre-initialized raft communicator. If provided, this is used for mnmg
communications.
communications. If not provided, default comms are initialized as per
client information.
p2p : bool
Initialize UCX endpoints
Initialize UCX endpoints if True. Default is False.
prows : int
Specifies the number of rows when performing a 2D partitioning of the
input graph. If specified, this must be a factor of the total number of
parallel processes. When specified with pcols, prows*pcols should be
equal to the total number of parallel processes.
pcols : int
Specifies the number of columns when performing a 2D partitioning of
the input graph. If specified, this must be a factor of the total
number of parallel processes. When specified with prows, prows*pcols
should be equal to the total number of parallel processes.
partition_type : int
Valid values are currently 1 or any int other than 1. A value of 1 (the
default) represents a partitioning resulting in prows*pcols
partitions. A non-1 value currently results in a partitioning of
p*pcols partitions, where p is the number of GPUs.
"""

global __instance
if __instance is None:
global __default_handle
__default_handle = None
if comms is None:
# Initialize communicator
__instance = raftComms(comms_p2p=p2p)
__instance.init()
# Initialize subcommunicator
subcomm_init(prows, pcols, partition_type)
else:
__instance = comms
else:
raise Exception("Communicator is already initialized")


# Check is Comms was initialized.
def is_initialized():
"""
Returns True if comms was initialized, False otherwise.
"""
global __instance
if __instance is not None:
return True
else:
return False


# Get raft Comms
def get_comms():
"""
Returns raft Comms instance
"""
global __instance
return __instance


# Get workers in the Comms
def get_workers():
"""
Returns the workers in the Comms instance, or None if Comms is not
initialized.
"""
if is_initialized():
global __instance
return __instance.worker_addresses


# Get sessionId for finding sessionstate of workers.
def get_session_id():
"""
Returns the sessionId for finding sessionstate of workers, or None if Comms
is not initialized.
"""
if is_initialized():
global __instance
return __instance.sessionId


# Destroy Comms
def get_2D_partition():
"""
Returns a tuple representing the 2D partition information: (prows, pcols,
partition_type)
"""
global __subcomm
if __subcomm is not None:
return __subcomm


def destroy():
"""
Shuts down initialized comms and cleans up resources.
Expand All @@ -93,9 +181,10 @@ def destroy():
__instance = None


# Default handle in case Comms is not initialized.
# This does not perform nccl initialization.
def get_default_handle():
"""
Returns the default handle. This does not perform nccl initialization.
"""
global __default_handle
if __default_handle is None:
__default_handle = Handle()
Expand All @@ -114,6 +203,16 @@ def get_worker_id(sID):
return sessionstate['wid']


def get_n_workers(sID):
sessionstate = worker_state(sID)
return sessionstate['nworkers']
# FIXME: There are several similar instances of utility functions for getting
# the number of workers, including:
# * get_n_workers() (from cugraph.dask.common.read_utils)
# * len(get_visible_devices())
# * len(numba.cuda.gpus)
# Consider consolidating these or emphasizing why different
# functions/techniques are needed.
def get_n_workers(sID=None):
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
if sID is None:
return read_utils.get_n_workers()
else:
sessionstate = worker_state(sID)
return sessionstate['nworkers']
9 changes: 9 additions & 0 deletions python/cugraph/comms/comms_wrapper.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

from cugraph.structure.graph_primtypes cimport handle_t
from cugraph.comms.comms cimport init_subcomms as c_init_subcomms


def init_subcomms(handle, row_comm_size):
cdef size_t handle_size_t = <size_t>handle.getHandle()
handle_ = <handle_t*>handle_size_t
c_init_subcomms(handle_[0], row_comm_size)
10 changes: 0 additions & 10 deletions python/cugraph/dask/community/louvain.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ def call_louvain(sID,
data,
num_verts,
num_edges,
partition_row_size,
partition_col_size,
vertex_partition_offsets,
sorted_by_degree,
max_level,
Expand All @@ -36,8 +34,6 @@ def call_louvain(sID,
return c_mg_louvain.louvain(data[0],
num_verts,
num_edges,
partition_row_size,
partition_col_size,
vertex_partition_offsets,
wid,
handle,
Expand Down Expand Up @@ -67,10 +63,6 @@ def louvain(input_graph, max_iter=100, resolution=1.0, load_balance=True):
"""
# FIXME: finish docstring: describe parameters, etc.

# FIXME: import here to prevent circular import: cugraph->louvain
# wrapper->cugraph/structure->cugraph/dask->dask/louvain->cugraph/structure
# from cugraph.structure.graph import Graph

# FIXME: dask methods to populate graphs from edgelists are only present on
# DiGraph classes. Disable the Graph check for now and assume inputs are
# symmetric DiGraphs.
Expand All @@ -96,8 +88,6 @@ def louvain(input_graph, max_iter=100, resolution=1.0, load_balance=True):
wf[1],
num_verts,
num_edges,
partition_row_size,
partition_col_size,
vertex_partition_offsets,
sorted_by_degree,
max_iter,
Expand Down
3 changes: 0 additions & 3 deletions python/cugraph/dask/community/louvain_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ numberTypeMap = {np.dtype("int32") : <int>numberTypeEnum.int32Type,
def louvain(input_df,
num_global_verts,
num_global_edges,
partition_row_size,
partition_col_size,
vertex_partition_offsets,
rank,
handle,
Expand Down Expand Up @@ -96,7 +94,6 @@ def louvain(input_df,
<numberTypeEnum>(<int>(numberTypeMap[weight_t])),
num_partition_edges,
num_global_verts, num_global_edges,
partition_row_size, partition_col_size,
sorted_by_degree,
False, True) # store_transposed, multi_gpu

Expand Down
Loading