From 5d0744c29a125373cef50569ec695cceccacaf61 Mon Sep 17 00:00:00 2001 From: Alex Barghi <105237337+alexbarghi-nv@users.noreply.github.com> Date: Fri, 22 Jul 2022 09:14:13 -0400 Subject: [PATCH] Update `Graph` to store a Pylibcugraph Graph (SG/MG Graph) (#2394) Closes #2373 Authors: - Alex Barghi (https://github.com/alexbarghi-nv) Approvers: - Brad Rees (https://github.com/BradReesWork) - Rick Ratzel (https://github.com/rlratzel) URL: https://github.com/rapidsai/cugraph/pull/2394 --- .../dask/sampling/uniform_neighbor_sample.py | 140 +++++++----------- python/cugraph/cugraph/dask/traversal/bfs.py | 121 +++++---------- python/cugraph/cugraph/sampling/node2vec.py | 3 + .../sampling/uniform_neighbor_sample.py | 44 ++---- .../cugraph/structure/graph_classes.py | 72 +++++++-- .../simpleDistributedGraph.py | 86 +++++++++++ .../graph_implementation/simpleGraph.py | 61 +++++++- .../cugraph/cugraph/structure/number_map.py | 2 +- .../cugraph/cugraph/tests/mg/test_mg_graph.py | 75 +++++++++- .../tests/mg/test_mg_neighborhood_sampling.py | 7 +- python/cugraph/cugraph/tests/test_graph.py | 43 ++++++ python/cugraph/cugraph/traversal/bfs.py | 80 ++++------ 12 files changed, 460 insertions(+), 274 deletions(-) diff --git a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py index 8a745b15262..3de90e64a80 100644 --- a/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py @@ -13,63 +13,19 @@ # limitations under the License. import numpy -from dask.distributed import wait, default_client +from dask.distributed import wait import dask_cudf import cudf -from pylibcugraph import (ResourceHandle, - GraphProperties, - MGGraph - ) +from pylibcugraph import ResourceHandle from pylibcugraph import \ uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample -from cugraph.dask.common.input_utils import get_distributed_data from cugraph.dask.comms import comms as Comms -def call_nbr_sampling(sID, - data, - src_col_name, - dst_col_name, - num_edges, - do_expensive_check, - start_list, - h_fan_out, - with_replacement): - - # Preparation for graph creation - handle = Comms.get_handle(sID) - handle = ResourceHandle(handle.getHandle()) - graph_properties = GraphProperties(is_symmetric=False, is_multigraph=False) - srcs = data[0][src_col_name] - dsts = data[0][dst_col_name] - weights = None - if "value" in data[0].columns: - weights = data[0]['value'] - - store_transposed = False - - mg = MGGraph(handle, - graph_properties, - srcs, - dsts, - weights, - store_transposed, - num_edges, - do_expensive_check) - - ret_val = pylibcugraph_uniform_neighbor_sample(handle, - mg, - start_list, - h_fan_out, - with_replacement, - do_expensive_check) - return ret_val - - def convert_to_cudf(cp_arrays, weight_t): """ Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper @@ -89,6 +45,24 @@ def convert_to_cudf(cp_arrays, weight_t): return df +def _call_plc_uniform_neighbor_sample(sID, + mg_graph_x, + st_x, + fanout_vals, + with_replacement): + return pylibcugraph_uniform_neighbor_sample( + resource_handle=ResourceHandle( + Comms.get_handle(sID).getHandle() + ), + input_graph=mg_graph_x, + start_list=st_x.to_cupy(), + h_fan_out=fanout_vals, + with_replacement=with_replacement, + # FIXME: should we add this parameter as an option? + do_expensive_check=True + ) + + def uniform_neighbor_sample(input_graph, start_list, fanout_vals, @@ -97,6 +71,9 @@ def uniform_neighbor_sample(input_graph, Does neighborhood sampling, which samples nodes from a graph based on the current node's neighbors, with a corresponding fanout value at each hop. + Note: This is a pylibcugraph-enabled algorithm, which requires that the + graph was created with legacy_renum_only=True. + Parameters ---------- input_graph : cugraph.Graph @@ -127,23 +104,16 @@ def uniform_neighbor_sample(input_graph, Contains the indices from the sampling result for path reconstruction """ - # Initialize dask client - client = default_client() - # FIXME: 'legacy_renum_only' will not trigger the C++ renumbering - # In the future, once all the algos follow the C/Pylibcugraph path, - # compute_renumber_edge_list will only be used for multicolumn and - # string vertices since the renumbering will be done in pylibcugraph - input_graph.compute_renumber_edge_list( - transposed=False, legacy_renum_only=True) if isinstance(start_list, int): start_list = [start_list] if isinstance(start_list, list): - start_list = cudf.Series(start_list) - if start_list.dtype != "int32": - raise ValueError(f"'start_list' must have int32 values, " - f"got: {start_list.dtype}") + start_list = cudf.Series(start_list, dtype='int32') + + if start_list.dtype != "int32": + raise ValueError(f"'start_list' must have int32 values, " + f"got: {start_list.dtype}") # fanout_vals must be a host array! # FIXME: ensure other sequence types (eg. cudf Series) can be handled. @@ -153,39 +123,38 @@ def uniform_neighbor_sample(input_graph, raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") - ddf = input_graph.edgelist.edgelist_df - src_col_name = input_graph.renumber_map.renumbered_src_col_name - dst_col_name = input_graph.renumber_map.renumbered_dst_col_name - - weight_t = ddf["value"].dtype - if weight_t == "int32": - ddf = ddf.astype({"value": "float32"}) - elif weight_t == "int64": - ddf = ddf.astype({"value": "float64"}) - - num_edges = len(ddf) - data = get_distributed_data(ddf) + weight_t = input_graph.edgelist.edgelist_df["value"].dtype # start_list uses "external" vertex IDs, but if the graph has been # renumbered, the start vertex IDs must also be renumbered. if input_graph.renumbered: start_list = input_graph.lookup_internal_vertex_id( start_list).compute() - # FIXME: should we add this parameter as an option? - do_expensive_check = False - - result = [client.submit(call_nbr_sampling, - Comms.get_session_id(), - wf[1], - src_col_name, - dst_col_name, - num_edges, - do_expensive_check, - start_list, - fanout_vals, - with_replacement, - workers=[wf[0]]) - for idx, wf in enumerate(data.worker_to_parts.items())] + + ''' + FIXME update the API to scatter the start list as shown below. + start_list = dask_cudf.from_cudf( + start_list, + npartitions=input_graph._npartitions + ) + start_list = get_distributed_data(start_list) + wait(start_list) + ''' + + client = input_graph._client + + result = [ + client.submit( + _call_plc_uniform_neighbor_sample, + Comms.get_session_id(), + input_graph._plc_graph[w], + start_list, + fanout_vals, + with_replacement, + workers=[w], + ) + for w in Comms.get_workers() + ] wait(result) @@ -196,6 +165,7 @@ def uniform_neighbor_sample(input_graph, wait(cudf_result) ddf = dask_cudf.from_delayed(cudf_result) + if input_graph.renumbered: ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True) ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True) diff --git a/python/cugraph/cugraph/dask/traversal/bfs.py b/python/cugraph/cugraph/dask/traversal/bfs.py index 472627717aa..6174ee3bccf 100644 --- a/python/cugraph/cugraph/dask/traversal/bfs.py +++ b/python/cugraph/cugraph/dask/traversal/bfs.py @@ -13,65 +13,17 @@ # limitations under the License. # -from pylibcugraph import (MGGraph, - ResourceHandle, - GraphProperties, +from pylibcugraph import (ResourceHandle, bfs as pylibcugraph_bfs ) -from dask.distributed import wait, default_client +from dask.distributed import wait from cugraph.dask.common.input_utils import get_distributed_data import cugraph.dask.comms.comms as Comms import cudf import dask_cudf -def _call_plc_mg_bfs( - sID, - data, - sources, - depth_limit, - src_col_name, - dst_col_name, - graph_properties, - num_edges, - direction_optimizing=False, - do_expensive_check=False, - return_predecessors=True): - comms_handle = Comms.get_handle(sID) - resource_handle = ResourceHandle(comms_handle.getHandle()) - sources = sources[0] - srcs = cudf.Series(data[0][src_col_name], dtype='int32') - dsts = cudf.Series(data[0][dst_col_name], dtype='int32') - weights = cudf.Series(data[0]['value'], dtype='float32') \ - if 'value' in data[0].columns \ - else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') - - mg = MGGraph( - resource_handle=resource_handle, - graph_properties=graph_properties, - src_array=srcs, - dst_array=dsts, - weight_array=weights, - store_transposed=False, - num_edges=num_edges, - do_expensive_check=do_expensive_check - ) - - res = \ - pylibcugraph_bfs( - resource_handle, - mg, - cudf.Series(sources, dtype='int32'), - direction_optimizing, - depth_limit if depth_limit is not None else 0, - return_predecessors, - True - ) - - return res - - def convert_to_cudf(cp_arrays): """ create a cudf DataFrame from cupy arrays @@ -84,17 +36,36 @@ def convert_to_cudf(cp_arrays): return df +def _call_plc_bfs(sID, + mg_graph_x, + st_x, + depth_limit=None, + return_distances=True): + return pylibcugraph_bfs( + ResourceHandle(Comms.get_handle(sID).getHandle()), + mg_graph_x, + cudf.Series(st_x, dtype='int32'), + False, + depth_limit if depth_limit is not None else 0, + return_distances, + True + ) + + def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start=True): """ - Find the distances and predecessors for a breadth first traversal of a + Find the distances and predecessors for a breadth-first traversal of a graph. - The input graph must contain edge list as dask-cudf dataframe with + The input graph must contain edge list as a dask-cudf dataframe with one partition per GPU. + Note: This is a pylibcugraph-enabled algorithm, which requires that the + graph was created with legacy_renum_only=True. + Parameters ---------- input_graph : cugraph.Graph @@ -147,27 +118,14 @@ def bfs(input_graph, """ - client = default_client() - - input_graph.compute_renumber_edge_list( - transposed=False, legacy_renum_only=True) - ddf = input_graph.edgelist.edgelist_df - - graph_properties = GraphProperties( - is_multigraph=False) - - num_edges = len(ddf) - data = get_distributed_data(ddf) - - src_col_name = input_graph.renumber_map.renumbered_src_col_name - dst_col_name = input_graph.renumber_map.renumbered_dst_col_name + client = input_graph._client if not isinstance(start, (dask_cudf.DataFrame, dask_cudf.Series)): if not isinstance(start, (cudf.DataFrame, cudf.Series)): start = cudf.Series(start) if isinstance(start, (cudf.DataFrame, cudf.Series)): # convert into a dask_cudf - start = dask_cudf.from_cudf(start, ddf.npartitions) + start = dask_cudf.from_cudf(start, input_graph._npartitions) def check_valid_vertex(G, start): is_valid_vertex = G.has_node(start) @@ -190,23 +148,18 @@ def check_valid_vertex(G, start): data_start = get_distributed_data(start) - cupy_result = [client.submit( - _call_plc_mg_bfs, - Comms.get_session_id(), - wf[1], - wf_start[1], - depth_limit, - src_col_name, - dst_col_name, - graph_properties, - num_edges, - False, - True, - return_distances, - workers=[wf[0]]) - for idx, (wf, wf_start) in enumerate( - zip(data.worker_to_parts.items(), - data_start.worker_to_parts.items()))] + cupy_result = [ + client.submit( + _call_plc_bfs, + Comms.get_session_id(), + input_graph._plc_graph[w], + st[0], + depth_limit, + return_distances, + workers=[w] + ) + for w, st in data_start.worker_to_parts.items() + ] wait(cupy_result) diff --git a/python/cugraph/cugraph/sampling/node2vec.py b/python/cugraph/cugraph/sampling/node2vec.py index 4f0ff33bada..44af8e1182a 100644 --- a/python/cugraph/cugraph/sampling/node2vec.py +++ b/python/cugraph/cugraph/sampling/node2vec.py @@ -31,6 +31,9 @@ def node2vec(G, Computes random walks for each node in 'start_vertices', under the node2vec sampling framework. + Note: This is a pylibcugraph-enabled algorithm, which requires that the + graph was created with legacy_renum_only=True. + References ---------- diff --git a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py index 21125601dcb..c9ce13de831 100644 --- a/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py +++ b/python/cugraph/cugraph/sampling/uniform_neighbor_sample.py @@ -11,10 +11,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pylibcugraph import (ResourceHandle, - GraphProperties, - SGGraph, - ) +from pylibcugraph import ResourceHandle from pylibcugraph import uniform_neighbor_sample as \ pylibcugraph_uniform_neighbor_sample @@ -32,6 +29,9 @@ def uniform_neighbor_sample(G, Does neighborhood sampling, which samples nodes from a graph based on the current node's neighbors, with a corresponding fanout value at each hop. + Note: This is a pylibcugraph-enabled algorithm, which requires that the + graph was created with legacy_renum_only=True. + Parameters ---------- G : cugraph.Graph @@ -79,6 +79,8 @@ def uniform_neighbor_sample(G, raise TypeError("fanout_vals must be a list, " f"got: {type(fanout_vals)}") + weight_t = G.edgelist.edgelist_df['weights'].dtype + if G.renumbered is True: if isinstance(start_list, cudf.DataFrame): start_list = G.lookup_internal_vertex_id( @@ -86,33 +88,15 @@ def uniform_neighbor_sample(G, else: start_list = G.lookup_internal_vertex_id(start_list) - srcs = G.edgelist.edgelist_df['src'] - dsts = G.edgelist.edgelist_df['dst'] - weights = G.edgelist.edgelist_df['weights'] - weight_t = weights.dtype - - if weight_t == "int32": - weights = weights.astype("float32") - if weight_t == "int64": - weights = weights.astype("float64") - - if srcs.dtype != 'int32': - raise ValueError(f"Graph vertices must have int32 values, " - f"got: {srcs.dtype}") - - resource_handle = ResourceHandle() - graph_props = GraphProperties(is_multigraph=G.is_multigraph()) - store_transposed = False - renumber = False - do_expensive_check = False - - sg = SGGraph(resource_handle, graph_props, srcs, dsts, weights, - store_transposed, renumber, do_expensive_check) - sources, destinations, indices = \ - pylibcugraph_uniform_neighbor_sample(resource_handle, sg, start_list, - fanout_vals, with_replacement, - do_expensive_check) + pylibcugraph_uniform_neighbor_sample( + resource_handle=ResourceHandle(), + input_graph=G._plc_graph, + start_list=start_list, + h_fan_out=fanout_vals, + with_replacement=with_replacement, + do_expensive_check=False + ) df = cudf.DataFrame() df["sources"] = sources diff --git a/python/cugraph/cugraph/structure/graph_classes.py b/python/cugraph/cugraph/structure/graph_classes.py index cd2b6ff0565..aabb518b05c 100644 --- a/python/cugraph/cugraph/structure/graph_classes.py +++ b/python/cugraph/cugraph/structure/graph_classes.py @@ -99,6 +99,7 @@ def from_cudf_edgelist( destination="destination", edge_attr=None, renumber=True, + store_transposed=False, legacy_renum_only=False ): """ @@ -134,6 +135,15 @@ def from_cudf_edgelist( Indicate whether or not to renumber the source and destination vertex IDs. + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + + legacy_renum_only : bool, optional (default=False) + If True, skips the C++ renumbering step. Must be true for + pylibcugraph algorithms. Must be false for algorithms + not yet converted to the pylibcugraph C API. + Examples -------- >>> df = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', @@ -157,6 +167,7 @@ def from_cudf_edgelist( destination=destination, edge_attr=edge_attr, renumber=renumber, + store_transposed=store_transposed, legacy_renum_only=legacy_renum_only) def from_cudf_adjlist(self, offset_col, index_col, value_col=None): @@ -221,6 +232,8 @@ def from_dask_cudf_edgelist( destination="destination", edge_attr=None, renumber=True, + store_transposed=False, + legacy_renum_only=False ): """ Initializes the distributed graph from the dask_cudf.DataFrame @@ -250,6 +263,15 @@ def from_dask_cudf_edgelist( renumber : bool, optional (default=True) If source and destination indices are not in range 0 to V where V is number of vertices, renumber argument should be True. + + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + + legacy_renum_only : bool, optional (default=False) + If True, skips the C++ renumbering step. Must be true for + pylibcugraph algorithms. Must be false for algorithms + not yet converted to the pylibcugraph C API. """ if renumber is False: raise ValueError("'renumber' must be set to 'True' for MNMG algos") @@ -259,11 +281,15 @@ def from_dask_cudf_edgelist( raise RuntimeError("Graph is already initialized") elif (self._Impl.edgelist is not None): raise RuntimeError("Graph already has values") - self._Impl._simpleDistributedGraphImpl__from_edgelist(input_ddf, - source, - destination, - edge_attr, - renumber) + self._Impl._simpleDistributedGraphImpl__from_edgelist( + input_ddf, + source, + destination, + edge_attr, + renumber, + store_transposed, + legacy_renum_only + ) # Move to Compat Module def from_pandas_edgelist( @@ -687,7 +713,9 @@ def from_cudf_edgelist( source="source", destination="destination", edge_attr=None, - renumber=True + renumber=True, + store_transposed=False, + legacy_renum_only=False ): """ Initialize a graph from the edge list. It is an error to call this @@ -723,6 +751,15 @@ def from_cudf_edgelist( Indicate whether or not to renumber the source and destination vertex IDs + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + + legacy_renum_only : bool, optional (default=False) + If True, skips the C++ renumbering step. Must be true for + pylibcugraph algorithms. Must be false for algorithms + not yet converted to the pylibcugraph C API. + Examples -------- >>> df = cudf.read_csv(datasets_path / 'karate.csv', delimiter=' ', @@ -736,11 +773,13 @@ def from_cudf_edgelist( if self._Impl is None: self._Impl = npartiteGraphImpl(self.graph_properties) # API may change in future - self._Impl._npartiteGraphImpl__from_edgelist(input_df, - source=source, - destination=destination, - edge_attr=edge_attr, - renumber=renumber) + self._Impl._npartiteGraphImpl__from_edgelist( + input_df, + source=source, + destination=destination, + edge_attr=edge_attr, + renumber=renumber + ) def from_dask_cudf_edgelist( self, @@ -749,6 +788,8 @@ def from_dask_cudf_edgelist( destination="destination", edge_attr=None, renumber=True, + store_transposed=False, + legacy_renum_only=False ): """ Initializes the distributed graph from the dask_cudf.DataFrame @@ -778,6 +819,15 @@ def from_dask_cudf_edgelist( renumber : bool, optional (default=True) If source and destination indices are not in range 0 to V where V is number of vertices, renumber argument should be True. + + store_transposed : bool, optional (default=False) + If True, stores the transpose of the adjacency matrix. Required + for certain algorithms. + + legacy_renum_only : bool, optional (default=False) + If True, skips the C++ renumbering step. Must be true for + pylibcugraph algorithms. Must be false for algorithms + not yet converted to the pylibcugraph C API. """ raise TypeError("Distributed N-partite graph not supported") diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 90d4e9da549..51ef70435ee 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -15,9 +15,19 @@ from cugraph.structure.graph_primtypes_wrapper import Direction from cugraph.structure.number_map import NumberMap from cugraph.structure.symmetrize import symmetrize +import cupy import cudf import dask_cudf +from pylibcugraph import (MGGraph, + ResourceHandle, + GraphProperties, + ) + +from dask.distributed import wait, default_client +from cugraph.dask.common.input_utils import get_distributed_data +import cugraph.dask.comms.comms as Comms + class simpleDistributedGraphImpl: class EdgeList: @@ -53,6 +63,37 @@ def __init__(self, properties): self.source_columns = None self.destination_columns = None + def _make_plc_graph( + sID, + edata_x, + graph_props, + src_col_name, + dst_col_name, + store_transposed, + num_edges): + + if 'value' in edata_x[0]: + values = edata_x[0]['value'] + if values.dtype == 'int32': + values = values.astype('float32') + elif values.dtype == 'int64': + values = values.astype('float64') + else: + values = cudf.Series(cupy.ones(len(edata_x[0]))) + + return MGGraph( + resource_handle=ResourceHandle( + Comms.get_handle(sID).getHandle() + ), + graph_properties=graph_props, + src_array=edata_x[0][src_col_name], + dst_array=edata_x[0][dst_col_name], + weight_array=values, + store_transposed=store_transposed, + num_edges=num_edges, + do_expensive_check=False + ) + # Functions def __from_edgelist( self, @@ -62,6 +103,7 @@ def __from_edgelist( edge_attr=None, renumber=True, store_transposed=False, + legacy_renum_only=False ): if not isinstance(input_ddf, dask_cudf.DataFrame): raise TypeError("input should be a dask_cudf dataFrame") @@ -129,6 +171,45 @@ def __from_edgelist( self.source_columns = source self.destination_columns = destination + # If renumbering is not enabled, this function will only create + # the edgelist_df and not do any renumbering. + # C++ renumbering is enabled by default for algorithms that + # support it (but only called if renumbering is on) + self.compute_renumber_edge_list( + transposed=store_transposed, + legacy_renum_only=legacy_renum_only + ) + + self.properties.renumbered = self.renumber_map.implementation.numbered + ddf = self.edgelist.edgelist_df + + num_edges = len(ddf) + edge_data = get_distributed_data(ddf) + src_col_name = self.renumber_map.renumbered_src_col_name + dst_col_name = self.renumber_map.renumbered_dst_col_name + graph_props = GraphProperties( + is_multigraph=self.properties.multi_edge, + is_symmetric=not self.properties.directed + ) + + self._client = default_client() + self._plc_graph = { + w: self._client.submit( + simpleDistributedGraphImpl._make_plc_graph, + Comms.get_session_id(), + edata, + graph_props, + src_col_name, + dst_col_name, + store_transposed, + num_edges, + workers=[w], + ) + for w, edata in edge_data.worker_to_parts.items() + } + + wait(self._plc_graph) + @property def renumbered(self): # This property is now used to determine if a dataframe was renumbered @@ -666,6 +747,7 @@ def compute_renumber_edge_list(self, This parameter is added for new algos following the C/Pylibcugraph path """ + if not self.properties.renumber: self.edgelist = self.EdgeList(self.input_df) self.renumber_map = None @@ -697,3 +779,7 @@ def vertex_column_size(self): return self.renumber_map.vertex_column_size() else: return 1 + + @property + def _npartitions(self) -> int: + return len(self._plc_graph) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py index 5929eb75030..32ee82a30f6 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleGraph.py @@ -16,6 +16,7 @@ from cugraph.structure.symmetrize import symmetrize from cugraph.structure.number_map import NumberMap import cugraph.dask.common.mg_utils as mg_utils +import cupy import cudf import dask_cudf import cugraph.dask.comms.comms as Comms @@ -23,6 +24,11 @@ import numpy as np from cugraph.dask.structure import replication +from pylibcugraph import (ResourceHandle, + GraphProperties, + SGGraph, + ) + # FIXME: Change to consistent camel case naming class simpleGraphImpl: @@ -88,7 +94,8 @@ def __from_edgelist( destination="destination", edge_attr=None, renumber=True, - legacy_renum_only=False, + legacy_renum_only=True, + store_transposed=False, ): # Verify column names present in input DataFrame @@ -192,6 +199,11 @@ def __from_edgelist( if self.batch_enabled: self._replicate_edgelist() + self._make_plc_graph( + value_col=value_col, + store_transposed=store_transposed + ) + def to_pandas_edgelist(self, source='src', destination='dst', weight='weights'): """ @@ -751,7 +763,36 @@ def _degree(self, vertex_subset, direction=Direction.ALL): return df - def to_directed(self, DiG): + def _make_plc_graph(self, value_col=None, store_transposed=False): + if value_col is None: + value_col = cudf.Series( + cupy.ones(len(self.edgelist.edgelist_df), dtype='float32') + ) + else: + weight_t = value_col.dtype + + if weight_t == "int32": + value_col = value_col.astype("float32") + if weight_t == "int64": + value_col = value_col.astype("float64") + + graph_props = GraphProperties( + is_multigraph=self.properties.multi_edge, + is_symmetric=not self.properties.directed + ) + + self._plc_graph = SGGraph( + resource_handle=ResourceHandle(), + graph_properties=graph_props, + src_array=self.edgelist.edgelist_df['src'], + dst_array=self.edgelist.edgelist_df['dst'], + weight_array=value_col, + store_transposed=store_transposed, + renumber=False, + do_expensive_check=False + ) + + def to_directed(self, DiG, store_transposed=False): """ Return a directed representation of the graph Implementation. This function copies the internal structures and returns the @@ -763,7 +804,14 @@ def to_directed(self, DiG): DiG.adjlist = self.adjlist DiG.transposedadjlist = self.transposedadjlist - def to_undirected(self, G): + if 'weights' in self.edgelist.edgelist_df: + value_col = self.edgelist.edgelist_df['weights'] + else: + value_col = None + + DiG._make_plc_graph(value_col, store_transposed) + + def to_undirected(self, G, store_transposed=False): """ Return an undirected copy of the graph. """ @@ -785,6 +833,13 @@ def to_undirected(self, G): G.edgelist = simpleGraphImpl.EdgeList(source_col, dest_col, value_col) + if 'weights' in self.edgelist.edgelist_df: + value_col = self.edgelist.edgelist_df['weights'] + else: + value_col = None + + G._make_plc_graph(value_col, store_transposed) + def has_node(self, n): """ Returns True if the graph contains the node n. diff --git a/python/cugraph/cugraph/structure/number_map.py b/python/cugraph/cugraph/structure/number_map.py index 0d51d806c58..7f5b71a3762 100644 --- a/python/cugraph/cugraph/structure/number_map.py +++ b/python/cugraph/cugraph/structure/number_map.py @@ -286,7 +286,7 @@ def indirection_map(self, ddf, src_col_names, dst_col_names): self.ddf = tmp_ddf return tmp_ddf - def __init__(self, id_type=np.int32): + def __init__(self, id_type=np.int32, renumber_type=None): self.implementation = None self.id_type = id_type # The default src/dst column names in the resulting renumbered diff --git a/python/cugraph/cugraph/tests/mg/test_mg_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_graph.py index b5d3529926e..7c8d2830fa8 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_graph.py @@ -19,6 +19,16 @@ import cugraph import random +from pylibcugraph import bfs as pylibcugraph_bfs +from pylibcugraph import ResourceHandle + +from cugraph.dask.traversal.bfs import convert_to_cudf + +import cugraph.dask.comms.comms as Comms +from cugraph.dask.common.input_utils import get_distributed_data +from dask.distributed import wait +import cudf + # ============================================================================= # Pytest Setup / Teardown - called for each test function @@ -37,7 +47,7 @@ def setup_function(): fixture_params = utils.genFixtureParamsProduct( (datasets, "graph_file"), (IS_DIRECTED, "directed"), - ([True, False], "legacy_renum_only") + ([True], "legacy_renum_only") ) @@ -67,9 +77,8 @@ def input_combo(request): dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( - ddf, source='src', destination='dst', edge_attr='value') - - dg.compute_renumber_edge_list(legacy_renum_only=legacy_renum_only) + ddf, source='src', destination='dst', edge_attr='value', + legacy_renum_only=legacy_renum_only) parameters["MGGraph"] = dg @@ -117,3 +126,61 @@ def test_has_node_functionality(dask_client, input_combo): invalid_node = valid_nodes.max() + 1 assert G.has_node(invalid_node) is False + + +def test_create_mg_graph(dask_client, input_combo): + G = input_combo['MGGraph'] + + # ensure graph exists + assert G._plc_graph is not None + + # ensure graph is partitioned correctly + assert len(G._plc_graph) == len(dask_client.has_what()) + + start = dask_cudf.from_cudf( + cudf.Series([1], dtype='int32'), + len(G._plc_graph) + ) + data_start = get_distributed_data(start) + + res = [ + dask_client.submit( + lambda sID, mg_graph_x, st_x: pylibcugraph_bfs( + ResourceHandle(Comms.get_handle(sID).getHandle()), + mg_graph_x, + st_x, + False, + 0, + True, + False + ), + Comms.get_session_id(), + G._plc_graph[w], + data_start.worker_to_parts[w][0], + workers=[w] + ) + for w in Comms.get_workers() + ] + + wait(res) + + cudf_result = [ + dask_client.submit(convert_to_cudf, cp_arrays) + for cp_arrays in res + ] + wait(cudf_result) + + ddf = dask_cudf.from_delayed(cudf_result) + + ddf = ddf.compute() + + if 'dolphins.csv' == input_combo['graph_file'].name: + assert ddf[ddf.vertex == 33].distance.iloc[0] == 3 + assert ddf[ddf.vertex == 33].predecessor.iloc[0] == 37 + assert ddf[ddf.vertex == 11].distance.iloc[0] == 4 + assert ddf[ddf.vertex == 11].predecessor.iloc[0] == 51 + else: + assert ddf[ddf.vertex == 33].distance.iloc[0] == 2 + assert ddf[ddf.vertex == 33].predecessor.iloc[0] == 30 + assert ddf[ddf.vertex == 11].distance.iloc[0] == 2 + assert ddf[ddf.vertex == 11].predecessor.iloc[0] == 0 diff --git a/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py index 13122b2bfe4..932b4d20d93 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_neighborhood_sampling.py @@ -71,7 +71,8 @@ def input_combo(request): dg = cugraph.Graph(directed=directed) dg.from_dask_cudf_edgelist( - ddf, source='src', destination='dst', edge_attr='value') + ddf, source='src', destination='dst', edge_attr='value', + store_transposed=False, legacy_renum_only=True) parameters["MGGraph"] = dg @@ -174,7 +175,9 @@ def test_mg_neighborhood_sampling_tree(dask_client, directed): ) G = cugraph.Graph(directed=directed) - G.from_dask_cudf_edgelist(ddf, "src", "dst", "value") + G.from_dask_cudf_edgelist(ddf, "src", "dst", "value", + store_transposed=False, + legacy_renum_only=True) # TODO: Incomplete, include more testing for tree graph as well as # for larger graphs diff --git a/python/cugraph/cugraph/tests/test_graph.py b/python/cugraph/cugraph/tests/test_graph.py index b6c285fb0a7..19c09e27cc4 100644 --- a/python/cugraph/cugraph/tests/test_graph.py +++ b/python/cugraph/cugraph/tests/test_graph.py @@ -30,6 +30,11 @@ from dask.distributed import Client import dask_cudf +from pylibcugraph import bfs as pylibcugraph_bfs +from pylibcugraph import ResourceHandle + +from cugraph.dask.traversal.bfs import convert_to_cudf + # Temporarily suppress warnings till networkX fixes deprecation warnings # (Using or importing the ABCs from 'collections' instead of from # 'collections.abc' is deprecated, and in 3.8 it will stop working) for @@ -539,6 +544,7 @@ def test_to_directed(graph_file): assert DiG.is_directed() assert DiG.number_of_nodes() == DiGnx.number_of_nodes() assert DiG.number_of_edges() == DiGnx.number_of_edges() + assert DiG._plc_graph is not None for index, row in cu_M.to_pandas().iterrows(): assert G.has_edge(row['0'], row['1']) @@ -574,6 +580,7 @@ def test_to_undirected(graph_file): assert not G.is_directed() assert G.number_of_nodes() == Gnx.number_of_nodes() assert G.number_of_edges() == Gnx.number_of_edges() + assert G._plc_graph is not None for index, row in cu_M.to_pandas().iterrows(): assert G.has_edge(row['0'], row['1']) @@ -696,3 +703,39 @@ def test_graph_init_with_multigraph(): cDiMG = cugraph.MultiDiGraph() # deprecated, but should still work cDiMG.from_cudf_edgelist(gdf, source="src", destination="dst") cugraph.Graph(m_graph=cDiMG) + + +@pytest.mark.parametrize("graph_file", utils.DATASETS) +def test_create_sg_graph(graph_file): + el = utils.read_csv_file(graph_file) + G = cugraph.from_cudf_edgelist( + el, + source='0', + destination='1', + edge_attr='2' + ) + + # ensure graph exists + assert G._plc_graph is not None + + start = cudf.Series([1], dtype='int32') + start = G.lookup_internal_vertex_id(start) + + if graph_file.name == 'dolphins.csv': + res = pylibcugraph_bfs( + ResourceHandle(), + G._plc_graph, + start, + False, + 0, + True, + False) + + cdr = convert_to_cudf(res) + cdr = G.unrenumber(cdr, column_name='vertex') + cdr = G.unrenumber(cdr, column_name='predecessor') + + assert cdr[cdr.vertex == 33].distance.to_numpy()[0] == 3 + assert cdr[cdr.vertex == 33].predecessor.to_numpy()[0] == 37 + assert cdr[cdr.vertex == 11].distance.to_numpy()[0] == 4 + assert cdr[cdr.vertex == 11].predecessor.to_numpy()[0] == 51 diff --git a/python/cugraph/cugraph/traversal/bfs.py b/python/cugraph/cugraph/traversal/bfs.py index dea5f9327db..4938b6bb200 100644 --- a/python/cugraph/cugraph/traversal/bfs.py +++ b/python/cugraph/cugraph/traversal/bfs.py @@ -14,11 +14,8 @@ import cudf import dask_cudf -from pylibcugraph import (ResourceHandle, - GraphProperties, - SGGraph, - bfs as pylibcugraph_bfs - ) +from pylibcugraph import ResourceHandle +from pylibcugraph import bfs as pylibcugraph_bfs from cugraph.structure.graph_classes import Graph, DiGraph from cugraph.utilities import (ensure_cugraph_obj, @@ -127,45 +124,6 @@ def _convert_df_to_output_type(df, input_type): raise TypeError(f"input type {input_type} is not a supported type.") -def _call_plc_bfs(G, sources, depth_limit, do_expensive_check=False, - direction_optimizing=False, return_predecessors=True): - handle = ResourceHandle() - - srcs = G.edgelist.edgelist_df['src'] - dsts = G.edgelist.edgelist_df['dst'] - weights = G.edgelist.edgelist_df['weights'] \ - if 'weights' in G.edgelist.edgelist_df \ - else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32') - - sg = SGGraph( - resource_handle=handle, - graph_properties=GraphProperties(is_multigraph=G.is_multigraph()), - src_array=srcs, - dst_array=dsts, - weight_array=weights, - store_transposed=False, - renumber=False, - do_expensive_check=do_expensive_check - ) - - distances, predecessors, vertices = \ - pylibcugraph_bfs( - handle, - sg, - sources, - direction_optimizing, - depth_limit if depth_limit is not None else -1, - return_predecessors, - do_expensive_check - ) - - return cudf.DataFrame({ - 'distance': cudf.Series(distances), - 'vertex': cudf.Series(vertices), - 'predecessor': cudf.Series(predecessors), - }) - - def bfs(G, start=None, depth_limit=None, @@ -176,6 +134,9 @@ def bfs(G, Find the distances and predecessors for a breadth first traversal of a graph. + Note: This is a pylibcugraph-enabled algorithm, which requires that the + graph was created with legacy_renum_only=True. + Parameters ---------- G : cugraph.Graph, networkx.Graph, CuPy or SciPy sparse matrix @@ -277,18 +238,29 @@ def bfs(G, else: start = cudf.Series(start, name='starts') - df = _call_plc_bfs( - G, - start, - depth_limit, - return_predecessors=return_predecessors - ) + distances, predecessors, vertices = \ + pylibcugraph_bfs( + handle=ResourceHandle(), + graph=G._plc_graph, + sources=start, + direction_optimizing=False, + depth_limit=depth_limit if depth_limit is not None else -1, + compute_predecessors=return_predecessors, + do_expensive_check=False + ) + + result_df = cudf.DataFrame({ + 'vertex': cudf.Series(vertices), + 'distance': cudf.Series(distances), + 'predecessor': cudf.Series(predecessors), + }) + if G.renumbered: - df = G.unrenumber(df, "vertex") - df = G.unrenumber(df, "predecessor") - df.fillna(-1, inplace=True) + result_df = G.unrenumber(result_df, "vertex") + result_df = G.unrenumber(result_df, "predecessor") + result_df.fillna(-1, inplace=True) - return _convert_df_to_output_type(df, input_type) + return _convert_df_to_output_type(result_df, input_type) def bfs_edges(G, source, reverse=False, depth_limit=None, sort_neighbors=None):