Skip to content

Commit

Permalink
Update Graph to store a Pylibcugraph Graph (SG/MG Graph) (rapidsai#…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv authored Jul 22, 2022
1 parent e9e48c8 commit 5d0744c
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 274 deletions.
140 changes: 55 additions & 85 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,19 @@
# limitations under the License.

import numpy
from dask.distributed import wait, default_client
from dask.distributed import wait

import dask_cudf
import cudf

from pylibcugraph import (ResourceHandle,
GraphProperties,
MGGraph
)
from pylibcugraph import ResourceHandle

from pylibcugraph import \
uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample

from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.comms import comms as Comms


def call_nbr_sampling(sID,
data,
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
h_fan_out,
with_replacement):

# Preparation for graph creation
handle = Comms.get_handle(sID)
handle = ResourceHandle(handle.getHandle())
graph_properties = GraphProperties(is_symmetric=False, is_multigraph=False)
srcs = data[0][src_col_name]
dsts = data[0][dst_col_name]
weights = None
if "value" in data[0].columns:
weights = data[0]['value']

store_transposed = False

mg = MGGraph(handle,
graph_properties,
srcs,
dsts,
weights,
store_transposed,
num_edges,
do_expensive_check)

ret_val = pylibcugraph_uniform_neighbor_sample(handle,
mg,
start_list,
h_fan_out,
with_replacement,
do_expensive_check)
return ret_val


def convert_to_cudf(cp_arrays, weight_t):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
Expand All @@ -89,6 +45,24 @@ def convert_to_cudf(cp_arrays, weight_t):
return df


def _call_plc_uniform_neighbor_sample(sID,
mg_graph_x,
st_x,
fanout_vals,
with_replacement):
return pylibcugraph_uniform_neighbor_sample(
resource_handle=ResourceHandle(
Comms.get_handle(sID).getHandle()
),
input_graph=mg_graph_x,
start_list=st_x.to_cupy(),
h_fan_out=fanout_vals,
with_replacement=with_replacement,
# FIXME: should we add this parameter as an option?
do_expensive_check=True
)


def uniform_neighbor_sample(input_graph,
start_list,
fanout_vals,
Expand All @@ -97,6 +71,9 @@ def uniform_neighbor_sample(input_graph,
Does neighborhood sampling, which samples nodes from a graph based on the
current node's neighbors, with a corresponding fanout value at each hop.
Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.
Parameters
----------
input_graph : cugraph.Graph
Expand Down Expand Up @@ -127,23 +104,16 @@ def uniform_neighbor_sample(input_graph,
Contains the indices from the sampling result for path
reconstruction
"""
# Initialize dask client
client = default_client()
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

if isinstance(start_list, int):
start_list = [start_list]

if isinstance(start_list, list):
start_list = cudf.Series(start_list)
if start_list.dtype != "int32":
raise ValueError(f"'start_list' must have int32 values, "
f"got: {start_list.dtype}")
start_list = cudf.Series(start_list, dtype='int32')

if start_list.dtype != "int32":
raise ValueError(f"'start_list' must have int32 values, "
f"got: {start_list.dtype}")

# fanout_vals must be a host array!
# FIXME: ensure other sequence types (eg. cudf Series) can be handled.
Expand All @@ -153,39 +123,38 @@ def uniform_neighbor_sample(input_graph,
raise TypeError("fanout_vals must be a list, "
f"got: {type(fanout_vals)}")

ddf = input_graph.edgelist.edgelist_df
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

weight_t = ddf["value"].dtype
if weight_t == "int32":
ddf = ddf.astype({"value": "float32"})
elif weight_t == "int64":
ddf = ddf.astype({"value": "float64"})

num_edges = len(ddf)
data = get_distributed_data(ddf)
weight_t = input_graph.edgelist.edgelist_df["value"].dtype

# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(
start_list).compute()
# FIXME: should we add this parameter as an option?
do_expensive_check = False

result = [client.submit(call_nbr_sampling,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
fanout_vals,
with_replacement,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]

'''
FIXME update the API to scatter the start list as shown below.
start_list = dask_cudf.from_cudf(
start_list,
npartitions=input_graph._npartitions
)
start_list = get_distributed_data(start_list)
wait(start_list)
'''

client = input_graph._client

result = [
client.submit(
_call_plc_uniform_neighbor_sample,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_list,
fanout_vals,
with_replacement,
workers=[w],
)
for w in Comms.get_workers()
]

wait(result)

Expand All @@ -196,6 +165,7 @@ def uniform_neighbor_sample(input_graph,
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True)
Expand Down
121 changes: 37 additions & 84 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,17 @@
# limitations under the License.
#

from pylibcugraph import (MGGraph,
ResourceHandle,
GraphProperties,
from pylibcugraph import (ResourceHandle,
bfs as pylibcugraph_bfs
)

from dask.distributed import wait, default_client
from dask.distributed import wait
from cugraph.dask.common.input_utils import get_distributed_data
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf


def _call_plc_mg_bfs(
sID,
data,
sources,
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
direction_optimizing=False,
do_expensive_check=False,
return_predecessors=True):
comms_handle = Comms.get_handle(sID)
resource_handle = ResourceHandle(comms_handle.getHandle())
sources = sources[0]
srcs = cudf.Series(data[0][src_col_name], dtype='int32')
dsts = cudf.Series(data[0][dst_col_name], dtype='int32')
weights = cudf.Series(data[0]['value'], dtype='float32') \
if 'value' in data[0].columns \
else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32')

mg = MGGraph(
resource_handle=resource_handle,
graph_properties=graph_properties,
src_array=srcs,
dst_array=dsts,
weight_array=weights,
store_transposed=False,
num_edges=num_edges,
do_expensive_check=do_expensive_check
)

res = \
pylibcugraph_bfs(
resource_handle,
mg,
cudf.Series(sources, dtype='int32'),
direction_optimizing,
depth_limit if depth_limit is not None else 0,
return_predecessors,
True
)

return res


def convert_to_cudf(cp_arrays):
"""
create a cudf DataFrame from cupy arrays
Expand All @@ -84,17 +36,36 @@ def convert_to_cudf(cp_arrays):
return df


def _call_plc_bfs(sID,
mg_graph_x,
st_x,
depth_limit=None,
return_distances=True):
return pylibcugraph_bfs(
ResourceHandle(Comms.get_handle(sID).getHandle()),
mg_graph_x,
cudf.Series(st_x, dtype='int32'),
False,
depth_limit if depth_limit is not None else 0,
return_distances,
True
)


def bfs(input_graph,
start,
depth_limit=None,
return_distances=True,
check_start=True):
"""
Find the distances and predecessors for a breadth first traversal of a
Find the distances and predecessors for a breadth-first traversal of a
graph.
The input graph must contain edge list as dask-cudf dataframe with
The input graph must contain edge list as a dask-cudf dataframe with
one partition per GPU.
Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.
Parameters
----------
input_graph : cugraph.Graph
Expand Down Expand Up @@ -147,27 +118,14 @@ def bfs(input_graph,
"""

client = default_client()

input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
ddf = input_graph.edgelist.edgelist_df

graph_properties = GraphProperties(
is_multigraph=False)

num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
client = input_graph._client

if not isinstance(start, (dask_cudf.DataFrame, dask_cudf.Series)):
if not isinstance(start, (cudf.DataFrame, cudf.Series)):
start = cudf.Series(start)
if isinstance(start, (cudf.DataFrame, cudf.Series)):
# convert into a dask_cudf
start = dask_cudf.from_cudf(start, ddf.npartitions)
start = dask_cudf.from_cudf(start, input_graph._npartitions)

def check_valid_vertex(G, start):
is_valid_vertex = G.has_node(start)
Expand All @@ -190,23 +148,18 @@ def check_valid_vertex(G, start):

data_start = get_distributed_data(start)

cupy_result = [client.submit(
_call_plc_mg_bfs,
Comms.get_session_id(),
wf[1],
wf_start[1],
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
False,
True,
return_distances,
workers=[wf[0]])
for idx, (wf, wf_start) in enumerate(
zip(data.worker_to_parts.items(),
data_start.worker_to_parts.items()))]
cupy_result = [
client.submit(
_call_plc_bfs,
Comms.get_session_id(),
input_graph._plc_graph[w],
st[0],
depth_limit,
return_distances,
workers=[w]
)
for w, st in data_start.worker_to_parts.items()
]

wait(cupy_result)

Expand Down
3 changes: 3 additions & 0 deletions python/cugraph/cugraph/sampling/node2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def node2vec(G,
Computes random walks for each node in 'start_vertices', under the
node2vec sampling framework.
Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.
References
----------
Expand Down
Loading

0 comments on commit 5d0744c

Please sign in to comment.