Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] dask personalization, fix df query #1237

Merged
merged 16 commits into from
Nov 20, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
- PR #1271 Add extra check to make SG Louvain deterministic

## Bug Fixes
- PR #1237 update tests for assymetric graphs, enable personalization pagerank
- PR #1242 Calling gunrock cmake using explicit -D options, re-enabling C++ tests
- PR #1246 Use latest Gunrock, update HITS implementation
- PR #1250 Updated cuco commit hash to latest as of 2020-10-30 and removed unneeded GIT_SHALLOW param
10 changes: 0 additions & 10 deletions datasets/asymmetric_directed__tiny.csv

This file was deleted.

78 changes: 78 additions & 0 deletions datasets/karate-asymmetric.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
1 2 1.0
1 3 1.0
1 4 1.0
1 5 1.0
1 6 1.0
1 7 1.0
1 8 1.0
1 9 1.0
1 11 1.0
1 12 1.0
1 13 1.0
1 14 1.0
1 18 1.0
1 20 1.0
1 22 1.0
1 32 1.0
2 3 1.0
2 4 1.0
2 8 1.0
2 14 1.0
2 18 1.0
2 20 1.0
2 22 1.0
2 31 1.0
3 4 1.0
3 8 1.0
3 9 1.0
3 10 1.0
3 14 1.0
3 28 1.0
3 29 1.0
3 33 1.0
4 8 1.0
4 13 1.0
4 14 1.0
5 7 1.0
5 11 1.0
6 7 1.0
6 11 1.0
6 17 1.0
7 17 1.0
9 31 1.0
9 33 1.0
9 34 1.0
10 34 1.0
14 34 1.0
15 33 1.0
15 34 1.0
16 33 1.0
16 34 1.0
19 33 1.0
19 34 1.0
20 34 1.0
21 33 1.0
21 34 1.0
23 33 1.0
23 34 1.0
24 26 1.0
24 28 1.0
24 30 1.0
24 33 1.0
24 34 1.0
25 26 1.0
25 28 1.0
25 32 1.0
26 32 1.0
27 30 1.0
27 34 1.0
28 34 1.0
29 32 1.0
29 34 1.0
30 33 1.0
30 34 1.0
31 33 1.0
31 34 1.0
32 33 1.0
32 34 1.0
33 34 1.0
2 changes: 1 addition & 1 deletion notebooks/demo/batch_betweenness.ipynb
Original file line number Diff line number Diff line change
@@ -231,7 +231,7 @@
"source": [
"cluster = dask_cuda.LocalCUDACluster()\n",
"client = dask.distributed.Client(cluster)\n",
"Comms.initialize()"
"Comms.initialize(p2p=True)"
]
},
{
4 changes: 2 additions & 2 deletions notebooks/demo/mg_pagerank.ipynb
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@
"source": [
"cluster = LocalCUDACluster()\n",
"client = Client(cluster)\n",
"Comms.initialize()"
"Comms.initialize(p2p=True)"
]
},
{
@@ -317,4 +317,4 @@
},
"nbformat": 4,
"nbformat_minor": 4
}
}
6 changes: 5 additions & 1 deletion python/cugraph/cores/k_core.py
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
from cugraph.cores import k_core_wrapper, core_number_wrapper
from cugraph.utilities import cugraph_to_nx
from cugraph.utilities import check_nx_graph
from cugraph.structure.graph import Graph


def k_core(G, k=None, core_number=None):
@@ -63,6 +64,9 @@ def k_core(G, k=None, core_number=None):
mytype = type(G)
KCoreGraph = mytype()

if mytype is not Graph:
raise Exception("directed graph not supported")

if core_number is not None:
if G.renumbered is True:
core_number = G.add_internal_vertex_id(
@@ -73,7 +77,7 @@ def k_core(G, k=None, core_number=None):
core_number = core_number.rename(
columns={"core_number": "values"}, copy=False
)

print(core_number)
if k is None:
k = core_number["values"].max()

4 changes: 2 additions & 2 deletions python/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx
Original file line number Diff line number Diff line change
@@ -99,8 +99,8 @@ def mg_pagerank(input_df,

if personalization is not None:
sz = personalization['vertex'].shape[0]
personalization['vertex'] = personalization['vertex'].astype(np.int32)
personalization['values'] = personalization['values'].astype(df['pagerank'].dtype)
personalization['vertex'] = personalization['vertex'].astype(vertex_t)
personalization['values'] = personalization['values'].astype(weight_t)
c_pers_vtx = personalization['vertex'].__cuda_array_interface__['data'][0]
c_pers_val = personalization['values'].__cuda_array_interface__['data'][0]

48 changes: 30 additions & 18 deletions python/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
@@ -119,9 +119,6 @@ def pagerank(input_graph,
"""
from cugraph.structure.graph import null_check

if personalization is not None:
raise Exception("Personalization not supported")

nstart = None

client = default_client()
@@ -141,21 +138,36 @@ def pagerank(input_graph,
if input_graph.renumbered is True:
personalization = input_graph.add_internal_vertex_id(
personalization, "vertex", "vertex"
).compute()

result = [client.submit(call_pagerank,
Comms.get_session_id(),
wf[1],
num_verts,
num_edges,
vertex_partition_offsets,
alpha,
max_iter,
tol,
personalization,
nstart,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
)
p_data = get_distributed_data(personalization)

result = [client.submit(call_pagerank,
Comms.get_session_id(),
wf[1],
num_verts,
num_edges,
vertex_partition_offsets,
alpha,
max_iter,
tol,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to discuss more about tol.

The code below is what NetworkX does (compare err with N * tol). This requires setting tol to a smaller value if N gets large (tol here basically means tolerance for a single PageRank value not the entire set of PageRank vector).

https://github.com/networkx/networkx/blob/master/networkx/algorithms/link_analysis/pagerank_alg.py#L155

err = sum([abs(x[n] - xlast[n]) for n in x])
        if err < N * tol:
            return x

The new PageRank algorithm adopts this NetworkX logic to determine convergence, so should we follow the NetworkX logic (and set tol to a smaller value with a larger N) or better remove N * from the new PageRank code?

p_data.worker_to_parts[wf[0]][0],
nstart,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
else:
result = [client.submit(call_pagerank,
Comms.get_session_id(),
wf[1],
num_verts,
num_edges,
vertex_partition_offsets,
alpha,
max_iter,
tol,
personalization,
nstart,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)
ddf = dask_cudf.from_delayed(result)
if input_graph.renumbered:
4 changes: 2 additions & 2 deletions python/cugraph/structure/graph.py
Original file line number Diff line number Diff line change
@@ -1212,7 +1212,7 @@ def degrees(self, vertex_subset=None):
df = self.unrenumber(df, "vertex")

if vertex_subset is not None:
df = df.query("`vertex` in @vertex_subset")
df = df[df['vertex'].isin(vertex_subset)]

return df

@@ -1226,7 +1226,7 @@ def _degree(self, vertex_subset, x=0):
df = self.unrenumber(df, "vertex")

if vertex_subset is not None:
df = df.query("`vertex` in @vertex_subset")
df = df[df['vertex'].isin(vertex_subset)]

return df

14 changes: 6 additions & 8 deletions python/cugraph/tests/dask/test_mg_pagerank.py
Original file line number Diff line number Diff line change
@@ -26,11 +26,11 @@
# and randomly assigns them personalization values


def personalize(v, personalization_perc):
def personalize(vertices, personalization_perc):
personalization = None
if personalization_perc != 0:
personalization = {}
nnz_vtx = np.arange(0, v)
nnz_vtx = vertices.values_host
personalization_count = int(
(nnz_vtx.size * personalization_perc) / 100.0
)
@@ -46,10 +46,10 @@ def personalize(v, personalization_perc):
v = np.fromiter(personalization.values(), dtype="float32")
cu_personalization = cudf.DataFrame({"vertex": k, "values": v})

return cu_personalization
return cu_personalization, personalization


PERSONALIZATION_PERC = [0]
PERSONALIZATION_PERC = [0, 10, 50]


@pytest.fixture
@@ -96,12 +96,10 @@ def test_dask_pagerank(client_connection, personalization_perc):
dg = cugraph.DiGraph()
dg.from_dask_cudf_edgelist(ddf, "src", "dst")

# Pre compute local data and personalize
personalization = None
if personalization_perc != 0:
dg.compute_local_data(by="dst")
personalization = personalize(
dg.number_of_vertices(), personalization_perc
personalization, p = personalize(
g.nodes(), personalization_perc
)

expected_pr = cugraph.pagerank(
4 changes: 3 additions & 1 deletion python/cugraph/tests/test_jaccard.py
Original file line number Diff line number Diff line change
@@ -63,6 +63,8 @@ def networkx_call(M):
edges = []
for i in range(len(M)):
edges.append((sources[i], destinations[i]))
edges.append((destinations[i], sources[i]))
edges = list(dict.fromkeys(edges))
edges = sorted(edges)
# in NVGRAPH tests we read as CSR and feed as CSC, so here we doing this
# explicitly
@@ -71,6 +73,7 @@ def networkx_call(M):
Gnx = nx.from_pandas_edgelist(
M, source="0", target="1", edge_attr="weight", create_using=nx.Graph()
)

# Networkx Jaccard Call
print("Solving... ")
t1 = time.time()
@@ -94,7 +97,6 @@ def test_jaccard(graph_file):

M = utils.read_csv_for_nx(graph_file)
cu_M = utils.read_csv_file(graph_file)

cu_src, cu_dst, cu_coeff = cugraph_call(cu_M)
nx_src, nx_dst, nx_coeff = networkx_call(M)

17 changes: 0 additions & 17 deletions python/cugraph/tests/test_k_core.py
Original file line number Diff line number Diff line change
@@ -65,23 +65,6 @@ def compare_edges(cg, nxg):
return True


# FIXME: the default set of datasets includes an asymmetric directed graph
# (email-EU-core.csv), which currently produces different results between
# cugraph and Nx and fails that test. Investigate, resolve, and use
# utils.DATASETS instead.
#
# https://github.com/rapidsai/cugraph/issues/1046
#
# @pytest.mark.parametrize("graph_file", utils.DATASETS)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
def test_core_number_DiGraph(graph_file):
gc.collect()

cu_kcore, nx_kcore = calc_k_cores(graph_file)

assert compare_edges(cu_kcore, nx_kcore)


@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
def test_core_number_Graph(graph_file):
gc.collect()
16 changes: 7 additions & 9 deletions python/cugraph/tests/test_pagerank.py
Original file line number Diff line number Diff line change
@@ -97,22 +97,19 @@ def networkx_call(Gnx, max_iter, tol, alpha, personalization_perc, nnz_vtx):
personalization = None
if personalization_perc != 0:
personalization = {}
# print(nnz_vtx)
personalization_count = int(
(nnz_vtx.size * personalization_perc) / 100.0
)
print(personalization_count)
nnz_vtx = np.random.choice(
nnz_vtx, min(nnz_vtx.size, personalization_count), replace=False
)
# print(nnz_vtx)

nnz_val = np.random.random(nnz_vtx.size)
nnz_val = nnz_val / sum(nnz_val)
# print(nnz_val)
for vtx, val in zip(nnz_vtx, nnz_val):
personalization[vtx] = val

z = {k: 1.0 / Gnx.number_of_nodes() for k in range(Gnx.number_of_nodes())}
z = {k: 1.0 / Gnx.number_of_nodes() for k in Gnx.nodes()}

# Networkx Pagerank Call
t1 = time.time()
@@ -147,7 +144,7 @@ def networkx_call(Gnx, max_iter, tol, alpha, personalization_perc, nnz_vtx):
# https://github.com/rapidsai/cugraph/issues/533
#
# @pytest.mark.parametrize("graph_file", utils.DATASETS)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
@pytest.mark.parametrize("graph_file", utils.DATASETS)
@pytest.mark.parametrize("max_iter", MAX_ITERATIONS)
@pytest.mark.parametrize("tol", TOLERANCE)
@pytest.mark.parametrize("alpha", ALPHA)
@@ -160,7 +157,7 @@ def test_pagerank(

# NetworkX PageRank
M = utils.read_csv_for_nx(graph_file)
nnz_vtx = np.unique(M)
nnz_vtx = np.unique(M[['0', '1']])
Gnx = nx.from_pandas_edgelist(
M, source="0", target="1", create_using=nx.DiGraph()
)
@@ -196,7 +193,7 @@ def test_pagerank(
assert err < (0.01 * len(cugraph_pr))


@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
@pytest.mark.parametrize("graph_file", utils.DATASETS)
@pytest.mark.parametrize("max_iter", MAX_ITERATIONS)
@pytest.mark.parametrize("tol", TOLERANCE)
@pytest.mark.parametrize("alpha", ALPHA)
@@ -209,7 +206,7 @@ def test_pagerank_nx(

# NetworkX PageRank
M = utils.read_csv_for_nx(graph_file)
nnz_vtx = np.unique(M)
nnz_vtx = np.unique(M[['0', '1']])
Gnx = nx.from_pandas_edgelist(
M, source="0", target="1", create_using=nx.DiGraph()
)
@@ -232,6 +229,7 @@ def test_pagerank_nx(
cugraph_pr = sorted(cugraph_pr.items(), key=lambda x: x[0])
err = 0
assert len(cugraph_pr) == len(networkx_pr)

for i in range(len(cugraph_pr)):
if (
abs(cugraph_pr[i][1] - networkx_pr[i][1]) > tol * 1.1
2 changes: 2 additions & 0 deletions python/cugraph/tests/test_wjaccard.py
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ def networkx_call(M):
edges = []
for i in range(len(sources)):
edges.append((sources[i], destinations[i]))
edges.append((destinations[i], sources[i]))
edges = list(dict.fromkeys(edges))
edges = sorted(edges)
# in NVGRAPH tests we read as CSR and feed as CSC, so here we doing this
# explicitly