Skip to content

Commit

Permalink
fix inconsistent graph properties between the SG and the MG API (#3757)
Browse files Browse the repository at this point in the history
Several graph methods are failing, some being an effect of migrating away from cython.cu renumbering.
This PR fixes couple graph methods and fixes the inconsistency in results returned by the SG and MG API


closes #3740 
closes #3766

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Brad Rees (https://github.com/BradReesWork)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #3757
  • Loading branch information
jnke2016 authored Aug 4, 2023
1 parent f6543f6 commit 62ecea2
Show file tree
Hide file tree
Showing 23 changed files with 498 additions and 134 deletions.
7 changes: 5 additions & 2 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,14 @@ def __init__(self, m_graph=None, directed=False):
if isinstance(m_graph, MultiGraph):
elist = m_graph.view_edge_list()
if m_graph.is_weighted():
weights = "weights"
weights = m_graph.weight_column
else:
weights = None
self.from_cudf_edgelist(
elist, source="src", destination="dst", edge_attr=weights
elist,
source=m_graph.source_columns,
destination=m_graph.destination_columns,
edge_attr=weights,
)
else:
raise TypeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, properties):
self.properties = simpleDistributedGraphImpl.Properties(properties)
self.source_columns = None
self.destination_columns = None
self.weight_column = None
self.vertex_columns = None

def _make_plc_graph(
sID,
Expand Down Expand Up @@ -175,6 +177,7 @@ def __from_edgelist(
"and destination parameters"
)
ddf_columns = s_col + d_col
self.vertex_columns = ddf_columns.copy()
_client = default_client()
workers = _client.scheduler_info()["workers"]
# Repartition to 2 partitions per GPU for memory efficient process
Expand Down Expand Up @@ -214,10 +217,11 @@ def __from_edgelist(
# The symmetrize step may add additional edges with unknown
# ids and types for an undirected graph. Therefore, only
# directed graphs may be used with ids and types.
# FIXME: Drop the check in symmetrize.py as it is redundant
if len(edge_attr) == 3:
if not self.properties.directed:
raise ValueError(
"User-provided edge ids and edge "
"User-provided edge ids and/or edge "
"types are not permitted for an "
"undirected graph."
)
Expand Down Expand Up @@ -285,6 +289,7 @@ def __from_edgelist(
self.properties.renumber = renumber
self.source_columns = source
self.destination_columns = destination
self.weight_column = weight

# If renumbering is not enabled, this function will only create
# the edgelist_df and not do any renumbering.
Expand Down Expand Up @@ -316,7 +321,6 @@ def __from_edgelist(
ddf = ddf.map_partitions(lambda df: df.copy())
ddf = persist_dask_df_equal_parts_per_worker(ddf, _client)
num_edges = len(ddf)
self._number_of_edges = num_edges
ddf = get_persisted_df_worker_map(ddf, _client)
delayed_tasks_d = {
w: delayed(simpleDistributedGraphImpl._make_plc_graph)(
Expand Down Expand Up @@ -356,6 +360,8 @@ def renumbered(self):

def view_edge_list(self):
"""
FIXME: Should this also return the edge ids and types?
Display the edge list. Compute it if needed.
NOTE: If the graph is of type Graph() then the displayed undirected
edges are the same as displayed by networkx Graph(), but the direction
Expand Down Expand Up @@ -386,7 +392,59 @@ def view_edge_list(self):
"""
if self.edgelist is None:
raise RuntimeError("Graph has no Edgelist.")
return self.edgelist.edgelist_df

edgelist_df = self.input_df
is_string_dtype = False
is_multi_column = False
wgtCol = simpleDistributedGraphImpl.edgeWeightCol
if not self.properties.directed:
srcCol = self.source_columns
dstCol = self.destination_columns
if self.renumber_map.unrenumbered_id_type == "object":
# FIXME: Use the renumbered vertices instead and then un-renumber.
# This operation can be expensive.
is_string_dtype = True
edgelist_df = self.edgelist.edgelist_df
srcCol = self.renumber_map.renumbered_src_col_name
dstCol = self.renumber_map.renumbered_dst_col_name

if isinstance(srcCol, list):
srcCol = self.renumber_map.renumbered_src_col_name
dstCol = self.renumber_map.renumbered_dst_col_name
edgelist_df = self.edgelist.edgelist_df
# unrenumber before extracting the upper triangular part
if len(self.source_columns) == 1:
edgelist_df = self.renumber_map.unrenumber(edgelist_df, srcCol)
edgelist_df = self.renumber_map.unrenumber(edgelist_df, dstCol)
else:
is_multi_column = True

edgelist_df[srcCol], edgelist_df[dstCol] = edgelist_df[
[srcCol, dstCol]
].min(axis=1), edgelist_df[[srcCol, dstCol]].max(axis=1)

edgelist_df = edgelist_df.groupby(by=[srcCol, dstCol]).sum().reset_index()
if wgtCol in edgelist_df.columns:
# FIXME: This breaks if there are are multi edges as those will
# be dropped during the symmetrization step and the original 'weight'
# will be halved.
edgelist_df[wgtCol] /= 2

if is_string_dtype or is_multi_column:
# unrenumber the vertices
edgelist_df = self.renumber_map.unrenumber(edgelist_df, srcCol)
edgelist_df = self.renumber_map.unrenumber(edgelist_df, dstCol)

if self.properties.renumbered:
edgelist_df = edgelist_df.rename(
columns=self.renumber_map.internal_to_external_col_names
)

# If there is no 'wgt' column, nothing will happen
edgelist_df = edgelist_df.rename(columns={wgtCol: self.weight_column})

self.properties.edge_count = len(edgelist_df)
return edgelist_df

def delete_edge_list(self):
"""
Expand All @@ -405,23 +463,7 @@ def number_of_vertices(self):
Get the number of nodes in the graph.
"""
if self.properties.node_count is None:
if self.edgelist is not None:
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

ddf = self.edgelist.edgelist_df[[src_col_name, dst_col_name]]
# ddf = self.edgelist.edgelist_df[["src", "dst"]]
self.properties.node_count = ddf.max().max().compute() + 1
else:
raise RuntimeError("Graph is Empty")
self.properties.node_count = len(self.nodes())
return self.properties.node_count

def number_of_nodes(self):
Expand All @@ -434,10 +476,16 @@ def number_of_edges(self, directed_edges=False):
"""
Get the number of edges in the graph.
"""
if self.edgelist is not None:
return self._number_of_edges
else:
raise RuntimeError("Graph is Empty")

if directed_edges and self.edgelist is not None:
return len(self.edgelist.edgelist_df)

if self.properties.edge_count is None:
if self.edgelist is not None:
self.view_edge_list()
else:
raise RuntimeError("Graph is Empty")
return self.properties.edge_count

def in_degree(self, vertex_subset=None):
"""
Expand Down Expand Up @@ -1021,19 +1069,8 @@ def edges(self):
sources and destinations. It does not return the edge weights.
For viewing edges with weights use view_edge_list()
"""
if self.renumbered is True:
src_col_name = self.renumber_map.renumbered_src_col_name
dst_col_name = self.renumber_map.renumbered_dst_col_name
# FIXME: from_dask_cudf_edgelist() currently requires
# renumber=True for MG, so this else block will not be
# used. Should this else block be removed and added back when
# the restriction is removed?
else:
src_col_name = "src"
dst_col_name = "dst"

# return self.view_edge_list()[["src", "dst"]]
return self.view_edge_list()[[src_col_name, dst_col_name]]
return self.view_edge_list()[self.vertex_columns]

def nodes(self):
"""
Expand All @@ -1045,23 +1082,26 @@ def nodes(self):
a dataframe and do 'renumber_map.unrenumber' or 'G.unrenumber'
"""

if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap
if self.edgelist is not None:
if self.renumbered:
# FIXME: This relies on current implementation
# of NumberMap, should not really expose
# this, perhaps add a method to NumberMap

df = self.renumber_map.implementation.ddf.drop(columns="global_id")
df = self.renumber_map.implementation.ddf.drop(columns="global_id")

if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]
if len(df.columns) > 1:
return df
else:
return df[df.columns[0]]

else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
else:
df = self.input_df
return dask_cudf.concat(
[df[self.source_columns], df[self.destination_columns]]
).drop_duplicates()
raise RuntimeError("Graph is Empty")

def neighbors(self, n):
if self.edgelist is None:
Expand Down
Loading

0 comments on commit 62ecea2

Please sign in to comment.