Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] fix local vert and offset calculation #1017

Merged
merged 5 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- PR #992 Fix unrenumber of predecessor
- PR #1008 Fix for cudf updates disabling iteration of Series/Columns/Index
- PR #1012 Fix Local build script README
- PR #1017 Fix more mg bugs

# cuGraph 0.14.0 (03 Jun 2020)

Expand Down
14 changes: 9 additions & 5 deletions python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,13 @@ def calculate_local_data(self, comms, by):
for rank in range(len(_local_data_dict)):
data = _local_data_dict[rank]
local_data_dict['edges'].append(data[0])
local_data_dict['offsets'].append(data[1])
local_data_dict['verts'].append(data[2])
if rank == 0:
local_offset = 0
else:
prev_data = _local_data_dict[rank-1]
local_offset = prev_data[1] + 1
local_data_dict['offsets'].append(local_offset)
local_data_dict['verts'].append(data[1] - local_offset + 1)

import numpy as np
local_data_dict['edges'] = np.array(local_data_dict['edges'],
Expand Down Expand Up @@ -191,9 +196,8 @@ def get_obj(x): return x[0] if multiple else x
def _get_local_data(df, by):
df = df[0]
num_local_edges = len(df)
local_offset = df[by].min()
num_local_verts = df[by].max() - local_offset + 1
return num_local_edges, local_offset, num_local_verts
local_max = df[by].iloc[-1]
return num_local_edges, local_max


def get_local_data(input_graph, by, load_balance=True):
Expand Down
3 changes: 2 additions & 1 deletion python/cugraph/dask/opg_pagerank/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def pagerank(input_graph,
names=['src', 'dst', 'value'],
dtype=['int32', 'int32', 'float32'])
>>> dg = cugraph.DiGraph()
>>> dg.from_dask_cudf_edgelist(ddf)
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')
>>> pr = dcg.pagerank(dg)
"""

Expand Down
14 changes: 13 additions & 1 deletion python/cugraph/structure/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ def add_edge_list(self, source, destination, value=None):
else:
self.from_cudf_edgelist(input_df)

def from_dask_cudf_edgelist(self, input_ddf):
def from_dask_cudf_edgelist(self, input_ddf, source='source',
destination='destination',
edge_attr=None, renumber=True):
"""
Initializes the distributed graph from the dask_cudf.DataFrame
edgelist. Renumbering and undirected Graphs are not currently
Expand All @@ -242,6 +244,12 @@ def from_dask_cudf_edgelist(self, input_ddf):
----------
input_ddf : dask_cudf.DataFrame
The edgelist as a dask_cudf.DataFrame
source : str
source argument is source column name
destination : str
destination argument is destination column name.
edge_attr : str
edge_attr argument is the weights column name.
"""
if self.edgelist is not None or self.adjlist is not None:
raise Exception('Graph already has values')
Expand All @@ -250,6 +258,10 @@ def from_dask_cudf_edgelist(self, input_ddf):
if isinstance(input_ddf, dask_cudf.DataFrame):
self.distributed = True
self.local_data = None
rename_map = {source: 'src', destination: 'dst'}
if edge_attr is not None:
rename_map[edge_attr] = 'weights'
input_ddf = input_ddf.rename(columns=rename_map)
self.edgelist = self.EdgeList(input_ddf)
else:
raise Exception('input should be a dask_cudf dataFrame')
Expand Down
3 changes: 2 additions & 1 deletion python/cugraph/tests/dask/opg_utility_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def test_compute_local_data():
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
edge_attr='value')

# Compute_local_data
dg.compute_local_data(by='dst')
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/tests/dask/test_opg_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_dask_pagerank():
dtype=['int32', 'int32', 'float32'])

dg1 = cugraph.DiGraph()
dg1.from_dask_cudf_edgelist(ddf1)
dg1.from_dask_cudf_edgelist(ddf1, 'src', 'dst')
result_pr1 = dcg.pagerank(dg1)

ddf2 = dask_cudf.read_csv(input_data_path2, chunksize=chunksize2,
Expand All @@ -51,7 +51,7 @@ def test_dask_pagerank():
dtype=['int32', 'int32', 'float32'])

dg2 = cugraph.DiGraph()
dg2.from_dask_cudf_edgelist(ddf2)
dg2.from_dask_cudf_edgelist(ddf2, 'src', 'dst')
result_pr2 = dcg.pagerank(dg2)

# Calculate single GPU pagerank for verification of results
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/tests/dask/test_opg_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_dask_opg_degree():
dtype=['int32', 'int32', 'float32'])

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, 'src', 'dst')

g = cugraph.DiGraph()
g.from_cudf_edgelist(df, 'src', 'dst')
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/tests/dask/test_opg_pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_dask_pagerank():
g.from_cudf_edgelist(df, 'src', 'dst')

dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf)
dg.from_dask_cudf_edgelist(ddf, 'src', 'dst')

# Pre compute local data
# dg.compute_local_data(by='dst')
Expand Down