Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Graph to store a Pylibcugraph Graph (SG/MG Graph) #2394

Merged
merged 29 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 55 additions & 85 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,19 @@
# limitations under the License.

import numpy
from dask.distributed import wait, default_client
from dask.distributed import wait

import dask_cudf
import cudf

from pylibcugraph import (ResourceHandle,
GraphProperties,
MGGraph
)
from pylibcugraph import ResourceHandle

from pylibcugraph import \
uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample

from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.dask.comms import comms as Comms


def call_nbr_sampling(sID,
data,
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
h_fan_out,
with_replacement):

# Preparation for graph creation
handle = Comms.get_handle(sID)
handle = ResourceHandle(handle.getHandle())
graph_properties = GraphProperties(is_symmetric=False, is_multigraph=False)
srcs = data[0][src_col_name]
dsts = data[0][dst_col_name]
weights = None
if "value" in data[0].columns:
weights = data[0]['value']

store_transposed = False

mg = MGGraph(handle,
graph_properties,
srcs,
dsts,
weights,
store_transposed,
num_edges,
do_expensive_check)

ret_val = pylibcugraph_uniform_neighbor_sample(handle,
mg,
start_list,
h_fan_out,
with_replacement,
do_expensive_check)
return ret_val


def convert_to_cudf(cp_arrays, weight_t):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
Expand All @@ -89,6 +45,24 @@ def convert_to_cudf(cp_arrays, weight_t):
return df


def _call_plc_uniform_neighbor_sample(sID,
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
mg_graph_x,
st_x,
fanout_vals,
with_replacement):
return pylibcugraph_uniform_neighbor_sample(
resource_handle=ResourceHandle(
Comms.get_handle(sID).getHandle()
),
input_graph=mg_graph_x,
start_list=st_x.to_cupy(),
h_fan_out=fanout_vals,
with_replacement=with_replacement,
# FIXME: should we add this parameter as an option?
do_expensive_check=True
)


def uniform_neighbor_sample(input_graph,
start_list,
fanout_vals,
Expand All @@ -97,6 +71,9 @@ def uniform_neighbor_sample(input_graph,
Does neighborhood sampling, which samples nodes from a graph based on the
current node's neighbors, with a corresponding fanout value at each hop.

Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.

Parameters
----------
input_graph : cugraph.Graph
Expand Down Expand Up @@ -127,23 +104,16 @@ def uniform_neighbor_sample(input_graph,
Contains the indices from the sampling result for path
reconstruction
"""
# Initialize dask client
client = default_client()
# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

if isinstance(start_list, int):
start_list = [start_list]

if isinstance(start_list, list):
start_list = cudf.Series(start_list)
if start_list.dtype != "int32":
raise ValueError(f"'start_list' must have int32 values, "
f"got: {start_list.dtype}")
start_list = cudf.Series(start_list, dtype='int32')

if start_list.dtype != "int32":
raise ValueError(f"'start_list' must have int32 values, "
f"got: {start_list.dtype}")

# fanout_vals must be a host array!
# FIXME: ensure other sequence types (eg. cudf Series) can be handled.
Expand All @@ -153,39 +123,38 @@ def uniform_neighbor_sample(input_graph,
raise TypeError("fanout_vals must be a list, "
f"got: {type(fanout_vals)}")

ddf = input_graph.edgelist.edgelist_df
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

weight_t = ddf["value"].dtype
if weight_t == "int32":
ddf = ddf.astype({"value": "float32"})
elif weight_t == "int64":
ddf = ddf.astype({"value": "float64"})

num_edges = len(ddf)
data = get_distributed_data(ddf)
weight_t = input_graph.edgelist.edgelist_df["value"].dtype

# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(
start_list).compute()
# FIXME: should we add this parameter as an option?
do_expensive_check = False

result = [client.submit(call_nbr_sampling,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
fanout_vals,
with_replacement,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]

'''
FIXME update the API to scatter the start list as shown below.
start_list = dask_cudf.from_cudf(
start_list,
npartitions=input_graph._npartitions
)
start_list = get_distributed_data(start_list)
wait(start_list)
'''
alexbarghi-nv marked this conversation as resolved.
Show resolved Hide resolved

client = input_graph._client
rlratzel marked this conversation as resolved.
Show resolved Hide resolved

result = [
client.submit(
_call_plc_uniform_neighbor_sample,
Comms.get_session_id(),
input_graph._plc_graph[w],
start_list,
fanout_vals,
with_replacement,
workers=[w],
)
for w in Comms.get_workers()
]

wait(result)

Expand All @@ -196,6 +165,7 @@ def uniform_neighbor_sample(input_graph,
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True)
Expand Down
121 changes: 37 additions & 84 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,17 @@
# limitations under the License.
#

from pylibcugraph import (MGGraph,
ResourceHandle,
GraphProperties,
from pylibcugraph import (ResourceHandle,
bfs as pylibcugraph_bfs
)

from dask.distributed import wait, default_client
from dask.distributed import wait
from cugraph.dask.common.input_utils import get_distributed_data
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf


def _call_plc_mg_bfs(
sID,
data,
sources,
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
direction_optimizing=False,
do_expensive_check=False,
return_predecessors=True):
comms_handle = Comms.get_handle(sID)
resource_handle = ResourceHandle(comms_handle.getHandle())
sources = sources[0]
srcs = cudf.Series(data[0][src_col_name], dtype='int32')
dsts = cudf.Series(data[0][dst_col_name], dtype='int32')
weights = cudf.Series(data[0]['value'], dtype='float32') \
if 'value' in data[0].columns \
else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32')

mg = MGGraph(
resource_handle=resource_handle,
graph_properties=graph_properties,
src_array=srcs,
dst_array=dsts,
weight_array=weights,
store_transposed=False,
num_edges=num_edges,
do_expensive_check=do_expensive_check
)

res = \
pylibcugraph_bfs(
resource_handle,
mg,
cudf.Series(sources, dtype='int32'),
direction_optimizing,
depth_limit if depth_limit is not None else 0,
return_predecessors,
True
)

return res


def convert_to_cudf(cp_arrays):
"""
create a cudf DataFrame from cupy arrays
Expand All @@ -84,17 +36,36 @@ def convert_to_cudf(cp_arrays):
return df


def _call_plc_bfs(sID,
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
mg_graph_x,
st_x,
depth_limit=None,
return_distances=True):
return pylibcugraph_bfs(
ResourceHandle(Comms.get_handle(sID).getHandle()),
mg_graph_x,
cudf.Series(st_x, dtype='int32'),
False,
depth_limit if depth_limit is not None else 0,
return_distances,
True
)


def bfs(input_graph,
start,
depth_limit=None,
return_distances=True,
check_start=True):
"""
Find the distances and predecessors for a breadth first traversal of a
Find the distances and predecessors for a breadth-first traversal of a
graph.
The input graph must contain edge list as dask-cudf dataframe with
The input graph must contain edge list as a dask-cudf dataframe with
one partition per GPU.

Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.

Parameters
----------
input_graph : cugraph.Graph
Expand Down Expand Up @@ -147,27 +118,14 @@ def bfs(input_graph,

"""

client = default_client()

input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
ddf = input_graph.edgelist.edgelist_df

graph_properties = GraphProperties(
is_multigraph=False)

num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
client = input_graph._client

if not isinstance(start, (dask_cudf.DataFrame, dask_cudf.Series)):
if not isinstance(start, (cudf.DataFrame, cudf.Series)):
start = cudf.Series(start)
if isinstance(start, (cudf.DataFrame, cudf.Series)):
# convert into a dask_cudf
start = dask_cudf.from_cudf(start, ddf.npartitions)
start = dask_cudf.from_cudf(start, input_graph._npartitions)

def check_valid_vertex(G, start):
is_valid_vertex = G.has_node(start)
Expand All @@ -190,23 +148,18 @@ def check_valid_vertex(G, start):

data_start = get_distributed_data(start)

cupy_result = [client.submit(
_call_plc_mg_bfs,
Comms.get_session_id(),
wf[1],
wf_start[1],
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
False,
True,
return_distances,
workers=[wf[0]])
for idx, (wf, wf_start) in enumerate(
zip(data.worker_to_parts.items(),
data_start.worker_to_parts.items()))]
cupy_result = [
client.submit(
_call_plc_bfs,
Comms.get_session_id(),
input_graph._plc_graph[w],
st[0],
depth_limit,
return_distances,
workers=[w]
)
for w, st in data_start.worker_to_parts.items()
]

wait(cupy_result)

Expand Down
3 changes: 3 additions & 0 deletions python/cugraph/cugraph/sampling/node2vec.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def node2vec(G,
Computes random walks for each node in 'start_vertices', under the
node2vec sampling framework.

Note: This is a pylibcugraph-enabled algorithm, which requires that the
graph was created with legacy_renum_only=True.

References
----------

Expand Down
Loading