Skip to content

Commit

Permalink
Merge pull request rapidsai#1017 from Iroy30/mg_bug_fixes
Browse files Browse the repository at this point in the history
[REVIEW] fix local vert and offset  calculation
  • Loading branch information
afender authored Jul 29, 2020
2 parents f3f94a6 + f4fdc7f commit 3e67085
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- PR #992 Fix unrenumber of predecessor
- PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index
- PR #1012 Fix Local build script README
- PR #1017 Fix more mg bugs
- PR #1022 Fix support for using a cudf.DataFrame with a MG graph

# cuGraph 0.14.0 (03 Jun 2020)
Expand Down
28 changes: 24 additions & 4 deletions python/cugraph/comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,33 @@

# Intialize Comms. If explicit Comms not provided as arg,
# default Comms are initialized as per client information.
def initialize(arg=None):
def initialize(comms=None, p2p=False):
"""
Intitializes a communicator for multi-node multi-gpu communications.
It is expected to be called right after client initialization for running
mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX
comms handles across the workers of a Dask cluster.
It is recommended to also call `destroy()` when the comms are no longer
needed so the underlying resources can be cleaned up.
Parameters
----------
comms : raft Comms
A pre-initialized raft communicator. If provided, this is used for mnmg
communications.
p2p : bool
Initialize UCX endpoints
"""

global __instance
if __instance is None:
global __default_handle
__default_handle = None
if arg is None:
__instance = raftComms()
if comms is None:
__instance = raftComms(comms_p2p=p2p)
__instance.init()
else:
__instance = arg
__instance = comms
else:
raise Exception("Communicator is already initialized")

Expand Down Expand Up @@ -47,6 +64,9 @@ def get_session_id():

# Destroy Comms
def destroy():
"""
Shuts down initialized comms and cleans up resources.
"""
global __instance
if is_initialized():
__instance.destroy()
Expand Down
18 changes: 9 additions & 9 deletions python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by):
for rank in range(len(_local_data_dict)):
data = _local_data_dict[rank]
local_data_dict['edges'].append(data[0])
local_data_dict['offsets'].append(data[1])
local_data_dict['verts'].append(data[2])
if rank == 0:
local_offset = 0
else:
prev_data = _local_data_dict[rank-1]
local_offset = prev_data[1] + 1
local_data_dict['offsets'].append(local_offset)
local_data_dict['verts'].append(data[1] - local_offset + 1)

import numpy as np
local_data_dict['edges'] = np.array(local_data_dict['edges'],
Expand Down Expand Up @@ -191,13 +196,8 @@ def get_obj(x): return x[0] if multiple else x
def _get_local_data(df, by):
df = df[0]
num_local_edges = len(df)
if num_local_edges == 0:
local_offset = 0
num_local_verts = 0
else:
local_offset = df[by].min()
num_local_verts = df[by].max() - local_offset + 1
return num_local_edges, local_offset, num_local_verts
local_max = df[by].iloc[-1]
return num_local_edges, local_max


def get_local_data(input_graph, by, load_balance=True):
Expand Down
5 changes: 3 additions & 2 deletions python/cugraph/dask/mg_pagerank/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def pagerank(input_graph,
names=['src', 'dst', 'value'],
dtype=['int32', 'int32', 'float32'])
>>> dg = cugraph.DiGraph()
>>> dg.from_dask_cudf_edgelist(ddf)
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')
>>> pr = dcg.pagerank(dg)
"""
from cugraph.structure.graph import null_check
Expand All @@ -121,7 +122,7 @@ def pagerank(input_graph,
if input_graph.renumbered is True:
personalization = input_graph.add_internal_vertex_id(
personalization, "vertex", "vertex"
)
).compute()

result = dict([(data.worker_info[wf[0]]["rank"],
client.submit(
Expand Down
1 change: 0 additions & 1 deletion python/cugraph/dask/pagerank/__init__.py

This file was deleted.

45 changes: 0 additions & 45 deletions python/cugraph/dask/pagerank/pagerank.py

This file was deleted.

17 changes: 13 additions & 4 deletions python/cugraph/structure/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ def add_edge_list(self, source, destination, value=None):
else:
self.from_cudf_edgelist(input_df)

def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
def from_dask_cudf_edgelist(self, input_ddf, source='source',
destination='destination',
edge_attr=None, renumber=True):
"""
Initializes the distributed graph from the dask_cudf.DataFrame
edgelist. Undirected Graphs are not currently supported.
Expand All @@ -345,9 +347,12 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
----------
input_ddf : dask_cudf.DataFrame
The edgelist as a dask_cudf.DataFrame
Source vertices are in a column named 'src', Destination
vertices are in a column named 'dst'
source : str
source argument is source column name
destination : str
destination argument is destination column name.
edge_attr : str
edge_attr argument is the weights column name.
renumber : bool
If source and destination indices are not in range 0 to V where V
is number of vertices, renumber argument should be True.
Expand All @@ -359,6 +364,10 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
if isinstance(input_ddf, dask_cudf.DataFrame):
self.distributed = True
self.local_data = None
rename_map = {source: 'src', destination: 'dst'}
if edge_attr is not None:
rename_map[edge_attr] = 'weights'
input_ddf = input_ddf.rename(columns=rename_map)
if renumber:
renumbered_ddf, number_map = NumberMap.renumber(
input_ddf, "src", "dst"
Expand Down
3 changes: 2 additions & 1 deletion python/cugraph/tests/dask/mg_utility_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def test_compute_local_data():
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')

# Compute_local_data
dg.compute_local_data(by='dst')
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_dask_pagerank(client_connection):
dtype=['int32', 'int32', 'float32'])

dg1 = cugraph.DiGraph()
dg1.from_dask_cudf_edgelist(ddf1)
dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst')
result_pr1 = dcg.pagerank(dg1)

ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2,
Expand All @@ -62,7 +62,7 @@ def test_dask_pagerank(client_connection):
dtype=['int32', 'int32', 'float32'])

dg2 = cugraph.DiGraph()
dg2.from_dask_cudf_edgelist(ddf2, renumber=False)
dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst')
result_pr2 = dcg.pagerank(dg2)

# Calculate single GPU pagerank for verification of results
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_dask_mg_degree(client_connection):
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, 'src', 'dst')

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, 'src', 'dst')
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ def test_dask_pagerank(client_connection, personalization_perc):
dtype=['int32', 'int32', 'float32'])

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, 'src', 'dst', renumber=False)
g.from_cudf_edgelist(df, 'src', 'dst')

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf, renumber=False)
dg.from_dask_cudf_edgelist(ddf)

# Pre compute local data and personalize
personalization = None
Expand Down

0 comments on commit 3e67085

Please sign in to comment.