Skip to content

Commit

Permalink
Showing 11 changed files with 57 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@
- PR #992 Fix unrenumber of predecessor
- PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index
- PR #1012 Fix Local build script README
- PR #1017 Fix more mg bugs
- PR #1022 Fix support for using a cudf.DataFrame with a MG graph

# cuGraph 0.14.0 (03 Jun 2020)
28 changes: 24 additions & 4 deletions python/cugraph/comms/comms.py
Original file line number Diff line number Diff line change
@@ -9,16 +9,33 @@

# Intialize Comms. If explicit Comms not provided as arg,
# default Comms are initialized as per client information.
def initialize(arg=None):
def initialize(comms=None, p2p=False):
"""
Intitializes a communicator for multi-node multi-gpu communications.
It is expected to be called right after client initialization for running
mnmg algorithms. It wraps raft comms that manages underlying NCCL and UCX
comms handles across the workers of a Dask cluster.
It is recommended to also call `destroy()` when the comms are no longer
needed so the underlying resources can be cleaned up.
Parameters
----------
comms : raft Comms
A pre-initialized raft communicator. If provided, this is used for mnmg
communications.
p2p : bool
Initialize UCX endpoints
"""

global __instance
if __instance is None:
global __default_handle
__default_handle = None
if arg is None:
__instance = raftComms()
if comms is None:
__instance = raftComms(comms_p2p=p2p)
__instance.init()
else:
__instance = arg
__instance = comms
else:
raise Exception("Communicator is already initialized")

@@ -47,6 +64,9 @@ def get_session_id():

# Destroy Comms
def destroy():
"""
Shuts down initialized comms and cleans up resources.
"""
global __instance
if is_initialized():
__instance.destroy()
18 changes: 9 additions & 9 deletions python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by):
for rank in range(len(_local_data_dict)):
data = _local_data_dict[rank]
local_data_dict['edges'].append(data[0])
local_data_dict['offsets'].append(data[1])
local_data_dict['verts'].append(data[2])
if rank == 0:
local_offset = 0
else:
prev_data = _local_data_dict[rank-1]
local_offset = prev_data[1] + 1
local_data_dict['offsets'].append(local_offset)
local_data_dict['verts'].append(data[1] - local_offset + 1)

import numpy as np
local_data_dict['edges'] = np.array(local_data_dict['edges'],
@@ -191,13 +196,8 @@ def get_obj(x): return x[0] if multiple else x
def _get_local_data(df, by):
df = df[0]
num_local_edges = len(df)
if num_local_edges == 0:
local_offset = 0
num_local_verts = 0
else:
local_offset = df[by].min()
num_local_verts = df[by].max() - local_offset + 1
return num_local_edges, local_offset, num_local_verts
local_max = df[by].iloc[-1]
return num_local_edges, local_max


def get_local_data(input_graph, by, load_balance=True):
5 changes: 3 additions & 2 deletions python/cugraph/dask/mg_pagerank/pagerank.py
Original file line number Diff line number Diff line change
@@ -100,7 +100,8 @@ def pagerank(input_graph,
names=['src', 'dst', 'value'],
dtype=['int32', 'int32', 'float32'])
>>> dg = cugraph.DiGraph()
>>> dg.from_dask_cudf_edgelist(ddf)
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')
>>> pr = dcg.pagerank(dg)
"""
from cugraph.structure.graph import null_check
@@ -121,7 +122,7 @@ def pagerank(input_graph,
if input_graph.renumbered is True:
personalization = input_graph.add_internal_vertex_id(
personalization, "vertex", "vertex"
)
).compute()

result = dict([(data.worker_info[wf[0]]["rank"],
client.submit(
1 change: 0 additions & 1 deletion python/cugraph/dask/pagerank/__init__.py

This file was deleted.

45 changes: 0 additions & 45 deletions python/cugraph/dask/pagerank/pagerank.py

This file was deleted.

17 changes: 13 additions & 4 deletions python/cugraph/structure/graph.py
Original file line number Diff line number Diff line change
@@ -330,7 +330,9 @@ def add_edge_list(self, source, destination, value=None):
else:
self.from_cudf_edgelist(input_df)

def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
def from_dask_cudf_edgelist(self, input_ddf, source='source',
destination='destination',
edge_attr=None, renumber=True):
"""
Initializes the distributed graph from the dask_cudf.DataFrame
edgelist. Undirected Graphs are not currently supported.
@@ -345,9 +347,12 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
----------
input_ddf : dask_cudf.DataFrame
The edgelist as a dask_cudf.DataFrame
Source vertices are in a column named 'src', Destination
vertices are in a column named 'dst'
source : str
source argument is source column name
destination : str
destination argument is destination column name.
edge_attr : str
edge_attr argument is the weights column name.
renumber : bool
If source and destination indices are not in range 0 to V where V
is number of vertices, renumber argument should be True.
@@ -359,6 +364,10 @@ def from_dask_cudf_edgelist(self, input_ddf, renumber=True):
if isinstance(input_ddf, dask_cudf.DataFrame):
self.distributed = True
self.local_data = None
rename_map = {source: 'src', destination: 'dst'}
if edge_attr is not None:
rename_map[edge_attr] = 'weights'
input_ddf = input_ddf.rename(columns=rename_map)
if renumber:
renumbered_ddf, number_map = NumberMap.renumber(
input_ddf, "src", "dst"
3 changes: 2 additions & 1 deletion python/cugraph/tests/dask/mg_utility_testing.py
Original file line number Diff line number Diff line change
@@ -20,7 +20,8 @@ def test_compute_local_data():
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')

# Compute_local_data
dg.compute_local_data(by='dst')
4 changes: 2 additions & 2 deletions python/cugraph/tests/dask/test_mg_comms.py
Original file line number Diff line number Diff line change
@@ -53,7 +53,7 @@ def test_dask_pagerank(client_connection):
dtype=['int32', 'int32', 'float32'])

dg1 = cugraph.DiGraph()
dg1.from_dask_cudf_edgelist(ddf1)
dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst')
result_pr1 = dcg.pagerank(dg1)

ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2,
@@ -62,7 +62,7 @@ def test_dask_pagerank(client_connection):
dtype=['int32', 'int32', 'float32'])

dg2 = cugraph.DiGraph()
dg2.from_dask_cudf_edgelist(ddf2, renumber=False)
dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst')
result_pr2 = dcg.pagerank(dg2)

# Calculate single GPU pagerank for verification of results
2 changes: 1 addition & 1 deletion python/cugraph/tests/dask/test_mg_degree.py
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ def test_dask_mg_degree(client_connection):
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, 'src', 'dst')

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, 'src', 'dst')
4 changes: 2 additions & 2 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
@@ -81,10 +81,10 @@ def test_dask_pagerank(client_connection, personalization_perc):
dtype=['int32', 'int32', 'float32'])

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, 'src', 'dst', renumber=False)
g.from_cudf_edgelist(df, 'src', 'dst')

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf, renumber=False)
dg.from_dask_cudf_edgelist(ddf)

# Pre compute local data and personalize
personalization = None

0 comments on commit 3e67085

Please sign in to comment.