Skip to content

Commit

Permalink
Refactored SG hits and MG katz_centrality (rapidsai#2276)
Browse files Browse the repository at this point in the history
This PR:
1. Refactors SG `hits` with the updated pylibcugraph implementation
2. Refactors MG `katz_centrality` with the updated pylibcugraph implementation, adding support for multiple arguments 
3. Improves support within `test_doctests.py` to ignore certain docstring examples based on the build architecture (such as ktruss in CUDA 11.4)
4. Passing MG `katz_centrality` testing

This PR also closes rapidsai#2025.

Authors:
  - https://github.com/betochimas

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Joseph Nke (https://github.com/jnke2016)
  - Rick Ratzel (https://github.com/rlratzel)

URL: rapidsai#2276
  • Loading branch information
betochimas authored Jun 1, 2022
1 parent 5da5be7 commit 83f9f0c
Show file tree
Hide file tree
Showing 15 changed files with 316 additions and 247 deletions.
7 changes: 7 additions & 0 deletions docs/cugraph/source/api_docs/pylibcugraph.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,12 @@ Methods
.. autosummary::
:toctree: api/

pylibcugraph.eigenvector_centrality
pylibcugraph.katz_centrality
pylibcugraph.strongly_connected_components
pylibcugraph.weakly_connected_components
pylibcugraph.pagerank
pylibcugraph.hits
pylibcugraph.node2vec
pylibcugraph.bfs
pylibcugraph.sssp
2 changes: 2 additions & 0 deletions python/cugraph/cugraph/community/ktruss_subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def k_truss(G, k):
Examples
--------
>>> import cudf # k_truss does not run on CUDA 11.5
>>> gdf = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ',
... dtype=['int32', 'int32', 'float32'], header=None)
>>> G = cugraph.Graph()
Expand Down Expand Up @@ -149,6 +150,7 @@ def ktruss_subgraph(G, k, use_weights=True):
Examples
--------
>>> import cudf # ktruss_subgraph does not run on CUDA 11.5
>>> gdf = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ',
... dtype=['int32', 'int32', 'float32'], header=None)
>>> G = cugraph.Graph()
Expand Down
185 changes: 122 additions & 63 deletions python/cugraph/cugraph/dask/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,22 @@
from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.centrality import\
mg_katz_centrality_wrapper as mg_katz_centrality
from pylibcugraph import (ResourceHandle,
GraphProperties,
MGGraph,
katz_centrality as pylibcugraph_katz
)
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf
import cupy


def call_katz_centrality(sID,
data,
graph_properties,
store_transposed,
do_expensive_check,
src_col_name,
dst_col_name,
num_verts,
Expand All @@ -34,37 +42,52 @@ def call_katz_centrality(sID,
beta,
max_iter,
tol,
nstart,
initial_hubs_guess_values,
normalized):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID)
segment_offsets = \
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
return mg_katz_centrality.mg_katz_centrality(data[0],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
wid,
handle,
segment_offsets,
alpha,
beta,
max_iter,
tol,
nstart,
normalized)


def katz_centrality(input_graph,
alpha=None,
beta=None,
max_iter=100,
tol=1.0e-5,
nstart=None,
normalized=True):
h = ResourceHandle(handle.getHandle())
srcs = data[0][src_col_name]
dsts = data[0][dst_col_name]
weights = cudf.Series(cupy.ones(srcs.size, dtype="float32"))

if "value" in data[0].columns:
weights = data[0]['value']

mg = MGGraph(h,
graph_properties,
srcs,
dsts,
weights,
store_transposed,
num_edges,
do_expensive_check)

result = pylibcugraph_katz(h,
mg,
initial_hubs_guess_values,
alpha,
beta,
tol,
max_iter,
do_expensive_check)
return result


def convert_to_cudf(cp_arrays):
"""
create a cudf DataFrame from cupy arrays
"""
cupy_vertices, cupy_values = cp_arrays
df = cudf.DataFrame()
df["vertex"] = cupy_vertices
df["katz_centrality"] = cupy_values
return df


def katz_centrality(
input_graph, alpha=None, beta=1.0, max_iter=100, tol=1.0e-6,
nstart=None, normalized=True
):
"""
Compute the Katz centrality for the nodes of the graph G.
Expand All @@ -89,8 +112,9 @@ def katz_centrality(input_graph,
guarantee that it will never exceed alpha_max thus in turn
fulfilling the requirement for convergence.
beta : None
A weight scalar - currently Not Supported
beta : float, optional (default=None)
Weight scalar added to each vertex's new Katz Centrality score in every
iteration. If beta is not specified then it is set as 1.0.
max_iter : int, optional (default=100)
The maximum number of iterations before an answer is returned. This can
Expand All @@ -109,7 +133,8 @@ def katz_centrality(input_graph,
acceptable.
nstart : dask_cudf.Dataframe, optional (default=None)
GPU Dataframe containing the initial guess for katz centrality
Distributed GPU Dataframe containing the initial guess for katz
centrality.
nstart['vertex'] : dask_cudf.Series
Contains the vertex identifiers
Expand All @@ -122,8 +147,8 @@ def katz_centrality(input_graph,
Returns
-------
katz_centrality : dask_cudf.DataFrame
GPU data frame containing two dask_cudf.Series of size V: the
vertex identifiers and the corresponding katz centrality values.
GPU distributed data frame containing two dask_cudf.Series of size V:
the vertex identifiers and the corresponding katz centrality values.
ddf['vertex'] : dask_cudf.Series
Contains the vertex identifiers
Expand All @@ -147,39 +172,73 @@ def katz_centrality(input_graph,
>>> pr = dcg.katz_centrality(dg)
"""
nstart = None

client = default_client()

input_graph.compute_renumber_edge_list(transposed=True)
graph_properties = GraphProperties(
is_multigraph=False)

store_transposed = False
do_expensive_check = False

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
# FIXME: Incorporate legacy_renum_only=True to only trigger the python
# renumbering when more support is added in the C/C++ API
input_graph.compute_renumber_edge_list(transposed=True,
legacy_renum_only=False)
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

result = [client.submit(call_katz_centrality,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
alpha,
beta,
max_iter,
tol,
nstart,
normalized,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)
ddf = dask_cudf.from_delayed(result)
initial_hubs_guess_values = None
if nstart:
if input_graph.renumbered:
if len(input_graph.renumber_map.implementation.col_names) > 1:
cols = nstart.columns[:-1].to_list()
else:
cols = 'vertex'
nstart = input_graph.add_internal_vertex_id(nstart, 'vertex', cols)
initial_hubs_guess_values = nstart[nstart.columns[0]].compute()
else:
initial_hubs_guess_values = nstart["values"].compute()

cupy_result = [client.submit(call_katz_centrality,
Comms.get_session_id(),
wf[1],
graph_properties,
store_transposed,
do_expensive_check,
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
alpha,
beta,
max_iter,
tol,
initial_hubs_guess_values,
normalized,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]

wait(cupy_result)

cudf_result = [client.submit(convert_to_cudf,
cp_arrays,
workers=client.who_has(
cp_arrays)[cp_arrays.key])
for cp_arrays in cupy_result]

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
if input_graph.renumbered:
return input_graph.unrenumber(ddf, 'vertex')

Expand Down
10 changes: 5 additions & 5 deletions python/cugraph/cugraph/dask/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import dask_cudf
import cudf

from pylibcugraph.experimental import (ResourceHandle,
GraphProperties,
MGGraph,
hits as pylibcugraph_hits
)
from pylibcugraph import (ResourceHandle,
GraphProperties,
MGGraph,
hits as pylibcugraph_hits
)


def call_hits(sID,
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/link_analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down
29 changes: 0 additions & 29 deletions python/cugraph/cugraph/link_analysis/hits.pxd

This file was deleted.

Loading

0 comments on commit 83f9f0c

Please sign in to comment.