From 124b970a17d131fb857c724f009f2fbc7c5d7dca Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Wed, 22 Jul 2020 10:17:39 -0500 Subject: [PATCH 1/4] fix local vert and offset calculation --- CHANGELOG.md | 1 + python/cugraph/dask/common/input_utils.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 513e034aa0e..b4694932bbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ - PR #992 Fix unrenumber of predecessor - PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index - PR #1012 Fix Local build script README +- PR #1017 Fix more mg bugs # cuGraph 0.14.0 (03 Jun 2020) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index e3f7a4b1899..ab038507b78 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by): for rank in range(len(_local_data_dict)): data = _local_data_dict[rank] local_data_dict['edges'].append(data[0]) - local_data_dict['offsets'].append(data[1]) - local_data_dict['verts'].append(data[2]) + if rank == 0: + local_offset = 0 + else: + prev_data = _local_data_dict[rank-1] + local_offset = prev_data[1] + 1 + local_data_dict['offsets'].append(local_offset) + local_data_dict['verts'].append(data[1] - local_offset + 1) import numpy as np local_data_dict['edges'] = np.array(local_data_dict['edges'], @@ -191,9 +196,8 @@ def get_obj(x): return x[0] if multiple else x def _get_local_data(df, by): df = df[0] num_local_edges = len(df) - local_offset = df[by].min() - num_local_verts = df[by].max() - local_offset + 1 - return num_local_edges, local_offset, num_local_verts + local_max = df[by].iloc[-1] + return num_local_edges, local_max def get_local_data(input_graph, by, load_balance=True): From d23a5dd1f9ab4566e03887de28bbe6c5aaba552d Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Thu, 23 Jul 2020 00:13:43 -0500 Subject: [PATCH 2/4] update from_dask_cudf_edgelist api --- python/cugraph/dask/opg_pagerank/pagerank.py | 3 ++- python/cugraph/structure/graph.py | 14 +++++++++++++- python/cugraph/tests/dask/opg_utility_testing.py | 3 ++- python/cugraph/tests/dask/test_opg_comms.py | 4 ++-- python/cugraph/tests/dask/test_opg_degree.py | 2 +- python/cugraph/tests/dask/test_opg_pagerank.py | 2 +- 6 files changed, 21 insertions(+), 7 deletions(-) diff --git a/python/cugraph/dask/opg_pagerank/pagerank.py b/python/cugraph/dask/opg_pagerank/pagerank.py index 90a785251f0..1a04e79aba7 100644 --- a/python/cugraph/dask/opg_pagerank/pagerank.py +++ b/python/cugraph/dask/opg_pagerank/pagerank.py @@ -92,7 +92,8 @@ def pagerank(input_graph, names=['src', 'dst', 'value'], dtype=['int32', 'int32', 'float32']) >>> dg = cugraph.DiGraph() - >>> dg.from_dask_cudf_edgelist(ddf) + >>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') >>> pr = dcg.pagerank(dg) """ diff --git a/python/cugraph/structure/graph.py b/python/cugraph/structure/graph.py index 9ba78e9c266..5a66cb41763 100644 --- a/python/cugraph/structure/graph.py +++ b/python/cugraph/structure/graph.py @@ -233,7 +233,9 @@ def add_edge_list(self, source, destination, value=None): else: self.from_cudf_edgelist(input_df) - def from_dask_cudf_edgelist(self, input_ddf): + def from_dask_cudf_edgelist(self, input_ddf, source='source', + destination='destination', + edge_attr=None, renumber=True): """ Initializes the distributed graph from the dask_cudf.DataFrame edgelist. Renumbering and undirected Graphs are not currently @@ -242,6 +244,12 @@ def from_dask_cudf_edgelist(self, input_ddf): ---------- input_ddf : dask_cudf.DataFrame The edgelist as a dask_cudf.DataFrame + source : str + source argument is source column name + destination : str + destination argument is destination column name. + edge_attr : str + edge_attr argument is the weights column name. """ if self.edgelist is not None or self.adjlist is not None: raise Exception('Graph already has values') @@ -250,6 +258,10 @@ def from_dask_cudf_edgelist(self, input_ddf): if isinstance(input_ddf, dask_cudf.DataFrame): self.distributed = True self.local_data = None + rename_map = {source: 'src', destination: 'dst'} + if edge_attr is not None: + rename_map[edge_attr] = 'weights' + input_ddf = input_ddf.rename(columns=rename_map) self.edgelist = self.EdgeList(input_ddf) else: raise Exception('input should be a dask_cudf dataFrame') diff --git a/python/cugraph/tests/dask/opg_utility_testing.py b/python/cugraph/tests/dask/opg_utility_testing.py index aef81fb53f5..2819d3b3253 100644 --- a/python/cugraph/tests/dask/opg_utility_testing.py +++ b/python/cugraph/tests/dask/opg_utility_testing.py @@ -20,7 +20,8 @@ def test_compute_local_data(): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst', + edge_attr='value') # Compute_local_data dg.compute_local_data(by='dst') diff --git a/python/cugraph/tests/dask/test_opg_comms.py b/python/cugraph/tests/dask/test_opg_comms.py index dbb192f8bd8..93d30813343 100644 --- a/python/cugraph/tests/dask/test_opg_comms.py +++ b/python/cugraph/tests/dask/test_opg_comms.py @@ -42,7 +42,7 @@ def test_dask_pagerank(): dtype=['int32', 'int32', 'float32']) dg1 = cugraph.DiGraph() - dg1.from_dask_cudf_edgelist(ddf1) + dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst') result_pr1 = dcg.pagerank(dg1) ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2, @@ -51,7 +51,7 @@ def test_dask_pagerank(): dtype=['int32', 'int32', 'float32']) dg2 = cugraph.DiGraph() - dg2.from_dask_cudf_edgelist(ddf2) + dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst') result_pr2 = dcg.pagerank(dg2) # Calculate single GPU pagerank for verification of results diff --git a/python/cugraph/tests/dask/test_opg_degree.py b/python/cugraph/tests/dask/test_opg_degree.py index 7fb9078feb7..38a6900e290 100644 --- a/python/cugraph/tests/dask/test_opg_degree.py +++ b/python/cugraph/tests/dask/test_opg_degree.py @@ -31,7 +31,7 @@ def test_dask_opg_degree(): dtype=['int32', 'int32', 'float32']) dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') g = cugraph.DiGraph() g.from_cudf_edgelist(df, 'src', 'dst') diff --git a/python/cugraph/tests/dask/test_opg_pagerank.py b/python/cugraph/tests/dask/test_opg_pagerank.py index 99db0338b56..76862e4de4c 100644 --- a/python/cugraph/tests/dask/test_opg_pagerank.py +++ b/python/cugraph/tests/dask/test_opg_pagerank.py @@ -44,7 +44,7 @@ def test_dask_pagerank(): g.from_cudf_edgelist(df, 'src', 'dst') dg = cugraph.DiGraph() - dg.from_dask_cudf_edgelist(ddf) + dg.from_dask_cudf_edgelist(ddf, 'src', 'dst') # Pre compute local data # dg.compute_local_data(by='dst') From ec2bc43dc0dc4ef7de32419ddf16fc2a85c9d920 Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Tue, 28 Jul 2020 17:04:44 -0500 Subject: [PATCH 3/4] flake8 --- python/cugraph/dask/common/input_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/cugraph/dask/common/input_utils.py b/python/cugraph/dask/common/input_utils.py index 7d516e9e72a..ab038507b78 100644 --- a/python/cugraph/dask/common/input_utils.py +++ b/python/cugraph/dask/common/input_utils.py @@ -199,6 +199,7 @@ def _get_local_data(df, by): local_max = df[by].iloc[-1] return num_local_edges, local_max + def get_local_data(input_graph, by, load_balance=True): _ddf = input_graph.edgelist.edgelist_df ddf = _ddf.sort_values(by=by, ignore_index=True) From f4fdc7f090343b45c8a87e11489c15c45ce9f98b Mon Sep 17 00:00:00 2001 From: Ishika Roy Date: Tue, 28 Jul 2020 22:54:04 -0500 Subject: [PATCH 4/4] add comms doc --- python/cugraph/comms/comms.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/python/cugraph/comms/comms.py b/python/cugraph/comms/comms.py index 28ce2a3fc1e..642d99440e0 100644 --- a/python/cugraph/comms/comms.py +++ b/python/cugraph/comms/comms.py @@ -9,16 +9,33 @@ # Intialize Comms. If explicit Comms not provided as arg, # default Comms are initialized as per client information. -def initialize(arg=None): +def initialize(comms=None, p2p=False): + """ + Intitializes a communicator for multi-node multi-gpu communications. + It is expected to be called right after client initialization for running + mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX + comms handles across the workers of a Dask cluster. + It is recommended to also call `destroy()` when the comms are no longer + needed so the underlying resources can be cleaned up. + + Parameters + ---------- + comms : raft Comms + A pre-initialized raft communicator. If provided, this is used for mnmg + communications. + p2p : bool + Initialize UCX endpoints + """ + global __instance if __instance is None: global __default_handle __default_handle = None - if arg is None: - __instance = raftComms() + if comms is None: + __instance = raftComms(comms_p2p=p2p) __instance.init() else: - __instance = arg + __instance = comms else: raise Exception("Communicator is already initialized") @@ -47,6 +64,9 @@ def get_session_id(): # Destroy Comms def destroy(): + """ + Shuts down initialized comms and cleans up resources. + """ global __instance if is_initialized(): __instance.destroy()