Skip to content

Commit

Permalink
Added get_vertex_data() and get_edge_data() to SG/MG PropertyGraph (
Browse files Browse the repository at this point in the history
rapidsai#2444)

closes rapidsai#2421 

Added `get_vertex_data()` and `get_edge_data()` to SG and MG PropertyGraph, and corresponding tests.

Prior to these methods, users had to either call `pG.annotate_dataframe()` to get properties for edges or access the internal debug dataframes directly via `pG._vertex_prop_dataframe` and `pG._edge_prop_dataframe`.

Users can now call `pG.get_vertex_data(vertex_ids, types, columns)` to get vertex properties for the vertices specified by `vertex_ids`, and 'types', with data for each column specified. All args are optional and default to "all" for each category.  `pG.get_edge_data(edge_ids, types, columns)` works the same for edges.  The return value for both is a dataframe.

Authors:
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Erik Welch (https://github.com/eriknw)
  - Brad Rees (https://github.com/BradReesWork)

URL: rapidsai#2444
  • Loading branch information
rlratzel authored Jul 29, 2022
1 parent 6b62002 commit 2263011
Show file tree
Hide file tree
Showing 4 changed files with 550 additions and 94 deletions.
126 changes: 97 additions & 29 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,38 @@ def add_vertex_data(self,
for n in self.__vertex_prop_dataframe.columns])
self.__vertex_prop_eval_dict.update(latest)

def get_vertex_data(self, vertex_ids=None, types=None, columns=None):
"""
Return a dataframe containing vertex properties for only the specified
vertex_ids, columns, and/or types, or all vertex IDs if not specified.
"""
if self.__vertex_prop_dataframe is not None:
if vertex_ids is not None:
df_mask = (
self.__vertex_prop_dataframe[self.vertex_col_name]
.isin(vertex_ids)
)
df = self.__vertex_prop_dataframe.loc[df_mask]
else:
df = self.__vertex_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" pG.vertex_col_name and pG.type_col_name columns
# are also included/added since they are assumed to be needed by
# the caller.
if columns is None:
return df
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.vertex_col_name, self.type_col_name] + columns]

return None

def add_edge_data(self,
dataframe,
vertex_col_names,
Expand Down Expand Up @@ -512,22 +544,22 @@ def add_edge_data(self,
# columns. The copied DataFrame is then merged (another copy) and then
# deleted when out-of-scope.
tmp_df = dataframe.copy()
# FIXME: Find a better way to create the edge id
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]]
starting_eid = prev_eid + 1
data_size = len(tmp_df.compute().index)
cudf_series = \
cudf.Series(range(starting_eid, starting_eid + data_size))
dask_series =\
dask_cudf.from_cudf(cudf_series, self.__num_workers)
dask_series = dask_series.reset_index(drop=True)
self.__last_edge_id = starting_eid + data_size
tmp_df = tmp_df.reset_index(drop=True)
tmp_df[self.edge_id_col_name] = dask_series
tmp_df[self.type_col_name] = type_name

# Add unique edge IDs to the new rows. This is just a count for each
# row starting from the last edge ID value, with initial edge ID 0.
starting_eid = (
-1 if self.__last_edge_id is None else self.__last_edge_id
)
tmp_df[self.edge_id_col_name] = 1
tmp_df[self.edge_id_col_name] = (
tmp_df[self.edge_id_col_name].cumsum() + starting_eid
)
self.__last_edge_id = starting_eid + len(tmp_df.index)
tmp_df.persist()

if property_columns:
# all columns
column_names_to_drop = set(tmp_df.columns)
Expand All @@ -542,13 +574,50 @@ def add_edge_data(self,
new_col_info = self.__get_new_column_dtypes(
tmp_df, self.__edge_prop_dataframe)
self.__edge_prop_dtypes.update(new_col_info)

self.__edge_prop_dataframe = \
self.__edge_prop_dataframe.merge(tmp_df, how="outer")

# Update the vertex eval dict with the latest column instances
latest = dict([(n, self.__edge_prop_dataframe[n])
for n in self.__edge_prop_dataframe.columns])
self.__edge_prop_eval_dict.update(latest)

def get_edge_data(self, edge_ids=None, types=None, columns=None):
"""
Return a dataframe containing edge properties for only the specified
edge_ids, columns, and/or edge type, or all edge IDs if not specified.
"""
if self.__edge_prop_dataframe is not None:
if edge_ids is not None:
df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\
.isin(edge_ids)
df = self.__edge_prop_dataframe.loc[df_mask]
else:
df = self.__edge_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" src, dst, edge_id, and type columns are also
# included/added since they are assumed to be needed by the caller.
if columns is None:
# remove the "internal" weight column if one was added
all_columns = list(self.__edge_prop_dataframe.columns)
if self.weight_col_name in all_columns:
all_columns.remove(self.weight_col_name)
return df[all_columns]
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.src_col_name, self.dst_col_name,
self.edge_id_col_name, self.type_col_name]
+ columns]

return None

def select_vertices(self, expr, from_previous_selection=None):
raise NotImplementedError

Expand Down Expand Up @@ -766,16 +835,21 @@ def edge_props_to_graph(self,
raise RuntimeError("query resulted in duplicate edges which "
f"cannot be represented with the {msg}")

# FIXME: MNMG Graphs required renumber to be True due to requirements
# on legacy code that needed segment offsets, partition offsets,
# etc. which were previously computed during the "legacy" C
# renumbering. The workaround is to pass renumber=True, then manually
# call G.compute_renumber_edge_list(legacy_renum_only=True) to compute
# the required meta-data without changing vertex IDs.
# FIXME: This forces the renumbering code to run a python-only
# renumbering without the newer C++ renumbering step. This is
# required since the newest graph algos which are using the
# pylibcugraph library will crash if passed data renumbered using the
# C++ renumbering. The consequence of this is that these extracted
# subgraphs can only be used with newer pylibcugraph-based MG algos.
#
# NOTE: if the vertices are integers (int32 or int64), renumbering is
# actually skipped with the assumption that the C renumbering will
# take place. The C renumbering only occurs for pylibcugraph algos,
# hence the reason these extracted subgraphs only work with PLC algos.
if renumber_graph is False:
renumber = True
else:
renumber = renumber_graph
raise ValueError("currently, renumber_graph must be set to True "
"for MG")
legacy_renum_only = True

col_names = [self.src_col_name, self.dst_col_name]
if edge_attr is not None:
Expand All @@ -785,14 +859,8 @@ def edge_props_to_graph(self,
source=self.src_col_name,
destination=self.dst_col_name,
edge_attr=edge_attr,
renumber=renumber)
# FIXME: see FIXME above - to generate the edgelist,
# compute_renumber_edge_list() must be called, but legacy mode needs to
# be used based on if renumbering was to be done or not.
if renumber_graph is False:
G.compute_renumber_edge_list(legacy_renum_only=True)
else:
G.compute_renumber_edge_list(legacy_renum_only=False)
renumber=renumber_graph,
legacy_renum_only=legacy_renum_only)

if add_edge_data:
# Set the edge_data on the resulting Graph to a DataFrame
Expand Down
107 changes: 82 additions & 25 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_dataframe_types.append(pd.DataFrame)


# FIXME: remove leading EXPERIMENTAL__ when no longer experimental
class EXPERIMENTAL__PropertySelection:
"""
Instances of this class are returned from the PropertyGraph.select_*()
Expand Down Expand Up @@ -50,7 +51,7 @@ def __add__(self, other):
return EXPERIMENTAL__PropertySelection(vs, es)


# FIXME: remove leading __ when no longer experimental
# FIXME: remove leading EXPERIMENTAL__ when no longer experimental
class EXPERIMENTAL__PropertyGraph:
"""
Class which stores vertex and edge properties that can be used to construct
Expand Down Expand Up @@ -144,7 +145,8 @@ def __init__(self):
def edges(self):
if self.__edge_prop_dataframe is not None:
return self.__edge_prop_dataframe[[self.src_col_name,
self.dst_col_name]]
self.dst_col_name,
self.edge_id_col_name]]
return None

@property
Expand Down Expand Up @@ -439,6 +441,38 @@ def add_vertex_data(self,
for n in self.__vertex_prop_dataframe.columns])
self.__vertex_prop_eval_dict.update(latest)

def get_vertex_data(self, vertex_ids=None, types=None, columns=None):
"""
Return a dataframe containing vertex properties for only the specified
vertex_ids, columns, and/or types, or all vertex IDs if not specified.
"""
if self.__vertex_prop_dataframe is not None:
if vertex_ids is not None:
df_mask = (
self.__vertex_prop_dataframe[self.vertex_col_name]
.isin(vertex_ids)
)
df = self.__vertex_prop_dataframe.loc[df_mask]
else:
df = self.__vertex_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" pG.vertex_col_name and pG.type_col_name columns
# are also included/added since they are assumed to be needed by
# the caller.
if columns is None:
return df
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.vertex_col_name, self.type_col_name] + columns]

return None

def add_edge_data(self,
dataframe,
vertex_col_names,
Expand Down Expand Up @@ -538,9 +572,19 @@ def add_edge_data(self,
tmp_df = dataframe.copy(deep=True)
tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]]
tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]]
# FIXME: handle case of a type_name column already being in tmp_df
tmp_df[self.type_col_name] = type_name

# Add unique edge IDs to the new rows. This is just a count for each
# row starting from the last edge ID value, with initial edge ID 0.
starting_eid = (
-1 if self.__last_edge_id is None else self.__last_edge_id
)
tmp_df[self.edge_id_col_name] = 1
tmp_df[self.edge_id_col_name] = (
tmp_df[self.edge_id_col_name].cumsum() + starting_eid
)
self.__last_edge_id = starting_eid + len(tmp_df.index)

if property_columns:
# all columns
column_names_to_drop = set(tmp_df.columns)
Expand All @@ -559,13 +603,46 @@ def add_edge_data(self,
self.__edge_prop_dataframe = \
self.__edge_prop_dataframe.merge(tmp_df, how="outer")

self.__add_edge_ids()

# Update the vertex eval dict with the latest column instances
latest = dict([(n, self.__edge_prop_dataframe[n])
for n in self.__edge_prop_dataframe.columns])
self.__edge_prop_eval_dict.update(latest)

def get_edge_data(self, edge_ids=None, types=None, columns=None):
"""
Return a dataframe containing edge properties for only the specified
edge_ids, columns, and/or edge type, or all edge IDs if not specified.
"""
if self.__edge_prop_dataframe is not None:
if edge_ids is not None:
df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\
.isin(edge_ids)
df = self.__edge_prop_dataframe.loc[df_mask]
else:
df = self.__edge_prop_dataframe

if types is not None:
# FIXME: coerce types to a list-like if not?
df_mask = df[self.type_col_name].isin(types)
df = df.loc[df_mask]

# The "internal" src, dst, edge_id, and type columns are also
# included/added since they are assumed to be needed by the caller.
if columns is None:
# remove the "internal" weight column if one was added
all_columns = list(self.__edge_prop_dataframe.columns)
if self.weight_col_name in all_columns:
all_columns.remove(self.weight_col_name)
return df[all_columns]
else:
# FIXME: invalid columns will result in a KeyError, should a
# check be done here and a more PG-specific error raised?
return df[[self.src_col_name, self.dst_col_name,
self.edge_id_col_name, self.type_col_name]
+ columns]

return None

def select_vertices(self, expr, from_previous_selection=None):
"""
Evaluate expr and return a PropertySelection object representing the
Expand Down Expand Up @@ -957,26 +1034,6 @@ def __create_property_lookup_table(self, edge_prop_df):
self.dst_col_name: dst,
self.edge_id_col_name: edge_id})

def __add_edge_ids(self):
"""
Replace nans with unique edge IDs. Edge IDs are simply numbers
incremented by 1 for each edge.
"""
prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id
nans = self.__edge_prop_dataframe[self.edge_id_col_name].isna()

if nans.any():
indices = nans.index[nans]
num_indices = len(indices)
starting_eid = prev_eid + 1
new_eids = self.__series_type(
range(starting_eid, starting_eid + num_indices))

self.__edge_prop_dataframe[self.edge_id_col_name]\
.iloc[indices] = new_eids

self.__last_edge_id = starting_eid + num_indices - 1

def __get_all_vertices_series(self):
"""
Return a list of all Series objects that contain vertices from all
Expand Down
Loading

0 comments on commit 2263011

Please sign in to comment.