Skip to content

Commit

Permalink
Raise a warning for certain algorithms (#2756)
Browse files Browse the repository at this point in the history
Certain algorithms (Katz, HITS, PageRank, Eigenvector centrality) require the flag `store_transposed` to be set to `True` for optimal performance. Although the CAPI internally `transposed` the graph at the algo's call if it wasn't at the graph creation, this adds extra overheads. This PR raises an exception if the user doesn't set the flag to `True` at the graph creation

closes #2742

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2756
  • Loading branch information
jnke2016 authored Oct 3, 2022
1 parent 6a7ea66 commit 2b8395a
Show file tree
Hide file tree
Showing 21 changed files with 294 additions and 25 deletions.
6 changes: 6 additions & 0 deletions python/cugraph/cugraph/centrality/eigenvector_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
df_score_to_dictionary,
)
import cudf
import warnings


def eigenvector_centrality(
Expand Down Expand Up @@ -77,6 +78,11 @@ def eigenvector_centrality(
raise ValueError(f"'tol' must be a positive float, got: {tol}")

G, isNx = ensure_cugraph_obj_for_nx(G)
if G.store_transposed is False:
warning_msg = ("Eigenvector centrality expects the 'store_transposed' "
"flag to be set to 'True' for optimal performance "
"during the graph creation")
warnings.warn(warning_msg, UserWarning)

vertices, values = \
pylib_eigen(
Expand Down
7 changes: 7 additions & 0 deletions python/cugraph/cugraph/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
df_score_to_dictionary,
)
import cudf
import warnings


def katz_centrality(
Expand Down Expand Up @@ -112,6 +113,12 @@ def katz_centrality(
"""
G, isNx = ensure_cugraph_obj_for_nx(G)

if G.store_transposed is False:
warning_msg = ("Katz centrality expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

if alpha is None:
degree_max = G.degree()['degree'].max()
alpha = 1 / (degree_max)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
import warnings


def _call_plc_eigenvector_centrality(sID,
Expand Down Expand Up @@ -115,6 +116,12 @@ def eigenvector_centrality(
"""
client = input_graph._client

if input_graph.store_transposed is False:
warning_msg = ("Eigenvector centrality expects the 'store_transposed' "
"flag to be set to 'True' for optimal performance "
"during the graph creation")
warnings.warn(warning_msg, UserWarning)

# FIXME: should we add this parameter as an option?
do_expensive_check = False

Expand Down
7 changes: 7 additions & 0 deletions python/cugraph/cugraph/dask/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
import warnings


def _call_plc_katz_centrality(sID,
Expand Down Expand Up @@ -146,6 +147,12 @@ def katz_centrality(
"""
client = input_graph._client

if input_graph.store_transposed is False:
warning_msg = ("Katz centrality expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

if alpha is None:
degree_max = input_graph.degree()['degree'].max().compute()
alpha = 1 / (degree_max)
Expand Down
4 changes: 4 additions & 0 deletions python/cugraph/cugraph/dask/common/part_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def persist_distributed_data(dask_df, client):
async def _extract_partitions(dask_obj, client=None, batch_enabled=False):
client = default_client() if client is None else client
worker_list = Comms.get_workers()

# repartition the 'dask_obj' to get as many partitions as there
# are workers
dask_obj = dask_obj.repartition(npartitions=len(worker_list))
# dask.dataframe or dask.array
if isinstance(dask_obj, (daskDataFrame, daskArray, daskSeries)):
# parts = persist_distributed_data(dask_obj, client)
Expand Down
7 changes: 7 additions & 0 deletions python/cugraph/cugraph/dask/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
import warnings

from pylibcugraph import (ResourceHandle,
hits as pylibcugraph_hits
Expand Down Expand Up @@ -131,6 +132,12 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):

client = input_graph._client

if input_graph.store_transposed is False:
warning_msg = ("HITS expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

do_expensive_check = False
initial_hubs_guess_vertices = None
initial_hubs_guess_values = None
Expand Down
6 changes: 6 additions & 0 deletions python/cugraph/cugraph/dask/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ def pagerank(input_graph,
# Initialize dask client
client = input_graph._client

if input_graph.store_transposed is False:
warning_msg = ("Pagerank expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

initial_guess_vertices = None
initial_guess_values = None
precomputed_vertex_out_weight_vertices = None
Expand Down
6 changes: 6 additions & 0 deletions python/cugraph/cugraph/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
hits as pylibcugraph_hits
)
import cudf
import warnings


def hits(
Expand Down Expand Up @@ -84,6 +85,11 @@ def hits(
"""

G, isNx = ensure_cugraph_obj_for_nx(G)
if G.store_transposed is False:
warning_msg = ("HITS expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

do_expensive_check = False
init_hubs_guess_vertices = None
Expand Down
6 changes: 6 additions & 0 deletions python/cugraph/cugraph/link_analysis/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ def pagerank(
pre_vtx_o_wgt_sums = None

G, isNx = ensure_cugraph_obj_for_nx(G, weight)
if G.store_transposed is False:
warning_msg = ("Pagerank expects the 'store_transposed' flag "
"to be set to 'True' for optimal performance during "
"the graph creation")
warnings.warn(warning_msg, UserWarning)

do_expensive_check = False

if nstart is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,10 @@ def in_degree(self, vertex_subset=None):
nodes.columns = vertex_col_names

df["degree"] = 1
in_degree = df.groupby(dst_col_name).degree.count().reset_index()

# FIXME: leverage the C++ in_degree for optimal performance
in_degree = df.groupby(dst_col_name).degree.count(
split_out=df.npartitions).reset_index()

# Add vertices with zero in_degree
in_degree = nodes.merge(in_degree, how='outer').fillna(0)
Expand Down Expand Up @@ -494,7 +497,9 @@ def out_degree(self, vertex_subset=None):
nodes.columns = vertex_col_names

df["degree"] = 1
out_degree = df.groupby(src_col_name).degree.count().reset_index()
# leverage the C++ out_degree for optimal performance
out_degree = df.groupby(src_col_name).degree.count(
split_out=df.npartitions).reset_index()

# Add vertices with zero out_degree
out_degree = nodes.merge(out_degree, how='outer').fillna(0)
Expand Down Expand Up @@ -560,8 +565,10 @@ def degree(self, vertex_subset=None):

vertex_in_degree = self.in_degree(vertex_subset)
vertex_out_degree = self.out_degree(vertex_subset)
# FIXME: leverage the C++ degree for optimal performance
vertex_degree = dask_cudf.concat([vertex_in_degree, vertex_out_degree])
vertex_degree = vertex_degree.groupby(['vertex'], as_index=False).sum()
vertex_degree = vertex_degree.groupby(['vertex'], as_index=False).sum(
split_out=self.input_df.npartitions)

return vertex_degree

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, properties):
self.directed = properties.directed
self.renumbered = False
self.self_loop = None
self.store_transposed = False
self.isolated_vertices = None
self.node_count = None
self.edge_count = None
Expand Down Expand Up @@ -184,6 +185,7 @@ def __from_edgelist(

# Renumbering
self.renumber_map = None
self.store_transposed = store_transposed
if renumber:
# FIXME: Should SG do lazy evaluation like MG?
elist, renumber_map = NumberMap.renumber(
Expand Down
7 changes: 2 additions & 5 deletions python/cugraph/cugraph/structure/symmetrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from cugraph.structure import graph_classes as csg
import cudf
import dask_cudf
from cugraph.dask.comms import comms as Comms


def symmetrize_df(df, src_name, dst_name,
Expand Down Expand Up @@ -162,8 +161,6 @@ def symmetrize_ddf(ddf, src_name, dst_name,
if weight_name is not None and not isinstance(weight_name, list):
weight_name = [weight_name]

worker_list = Comms.get_workers()
num_partitions = len(worker_list)
if symmetrize:
if weight_name:
ddf2 = ddf[[*dst_name, *src_name, *weight_name]]
Expand All @@ -178,13 +175,13 @@ def symmetrize_ddf(ddf, src_name, dst_name,
# The concat call doubles the number of partitions therefore,
# repartition the result so that the number of partitions equals
# the number of workers
result = result.repartition(npartitions=num_partitions)
result = result.repartition(npartitions=ddf.npartitions)
return result
else:
vertex_col_name = src_name + dst_name
result = result.groupby(
by=[*vertex_col_name]).min(
split_out=num_partitions).reset_index()
split_out=ddf.npartitions).reset_index()

return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def test_dask_eigenvector_centrality(dask_client, directed, input_data_path):
dtype=["int32", "int32", "float32"],
)
dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", legacy_renum_only=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=True)
mg_res = dcg.eigenvector_centrality(dg, tol=1e-6)
mg_res = mg_res.compute()
import networkx as nx
Expand Down Expand Up @@ -84,3 +85,28 @@ def test_dask_eigenvector_centrality(dask_client, directed, input_data_path):
if diff > tol * 1.1:
err = err + 1
assert err == 0


def test_dask_eigenvector_centrality_transposed_false(dask_client):
input_data_path = DATASETS[0]

chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=False)

warning_msg = ("Eigenvector centrality expects the 'store_transposed' "
"flag to be set to 'True' for optimal performance during "
"the graph creation")

with pytest.warns(UserWarning, match=warning_msg):
dcg.eigenvector_centrality(dg)
23 changes: 23 additions & 0 deletions python/cugraph/cugraph/tests/mg/test_mg_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,26 @@ def test_create_graph_with_edge_ids(dask_client, graph_file):
destination='1',
edge_attr=['2', 'id', 'etype']
)


def test_graph_repartition(dask_client):
input_data_path = (utils.RAPIDS_DATASET_ROOT_DIR_PATH /
"karate.csv").as_posix()
print(f"dataset={input_data_path}")
chunksize = dcg.get_chunksize(input_data_path)

num_workers = len(Comms.get_workers())

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)
more_partitions = num_workers * 100
ddf = ddf.repartition(npartitions=more_partitions)
ddf = get_distributed_data(ddf)

num_futures = len(ddf.worker_to_parts.values())
assert num_futures == num_workers
28 changes: 27 additions & 1 deletion python/cugraph/cugraph/tests/mg/test_mg_hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def input_expected_output(input_combo):
dg = cugraph.Graph(directed=directed)
dg.from_dask_cudf_edgelist(
ddf, source='src', destination='dst', edge_attr='value',
renumber=True, legacy_renum_only=True)
renumber=True, legacy_renum_only=True, store_transposed=True)

input_combo["MGGraph"] = dg

Expand Down Expand Up @@ -144,3 +144,29 @@ def test_dask_hits(dask_client, benchmark, input_expected_output):
assert len(hubs_diffs2) == 0
assert len(authorities_diffs1) == 0
assert len(authorities_diffs2) == 0


def test_dask_hots_transposed_false(dask_client):
input_data_path = (utils.RAPIDS_DATASET_ROOT_DIR_PATH /
"karate.csv").as_posix()

chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=False)

warning_msg = ("HITS expects the 'store_transposed' "
"flag to be set to 'True' for optimal performance during "
"the graph creation")

with pytest.warns(UserWarning, match=warning_msg):
dcg.hits(dg)
32 changes: 30 additions & 2 deletions python/cugraph/cugraph/tests/mg/test_mg_katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def test_dask_katz_centrality(dask_client, directed):
)

dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", legacy_renum_only=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=True)

degree_max = dg.degree()['degree'].max().compute()
katz_alpha = 1 / (degree_max)
Expand Down Expand Up @@ -113,7 +114,8 @@ def test_dask_katz_centrality_nstart(dask_client, directed):
)

dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(ddf, "src", "dst", legacy_renum_only=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=True)

mg_res = dcg.katz_centrality(dg, max_iter=50, tol=1e-6)
mg_res = mg_res.compute()
Expand Down Expand Up @@ -142,3 +144,29 @@ def test_dask_katz_centrality_nstart(dask_client, directed):
if diff > tol * 1.1:
err = err + 1
assert err == 0


def test_dask_katz_centrality_transposed_false(dask_client):
input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH /
"karate.csv").as_posix()

chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

dg = cugraph.Graph(directed=True)
dg.from_dask_cudf_edgelist(
ddf, "src", "dst", legacy_renum_only=True, store_transposed=False)

warning_msg = ("Katz centrality expects the 'store_transposed' "
"flag to be set to 'True' for optimal performance during "
"the graph creation")

with pytest.warns(UserWarning, match=warning_msg):
dcg.katz_centrality(dg)
Loading

0 comments on commit 2b8395a

Please sign in to comment.