diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 7399d818d23..541360e64ec 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -429,6 +429,38 @@ def add_vertex_data(self, for n in self.__vertex_prop_dataframe.columns]) self.__vertex_prop_eval_dict.update(latest) + def get_vertex_data(self, vertex_ids=None, types=None, columns=None): + """ + Return a dataframe containing vertex properties for only the specified + vertex_ids, columns, and/or types, or all vertex IDs if not specified. + """ + if self.__vertex_prop_dataframe is not None: + if vertex_ids is not None: + df_mask = ( + self.__vertex_prop_dataframe[self.vertex_col_name] + .isin(vertex_ids) + ) + df = self.__vertex_prop_dataframe.loc[df_mask] + else: + df = self.__vertex_prop_dataframe + + if types is not None: + # FIXME: coerce types to a list-like if not? + df_mask = df[self.type_col_name].isin(types) + df = df.loc[df_mask] + + # The "internal" pG.vertex_col_name and pG.type_col_name columns + # are also included/added since they are assumed to be needed by + # the caller. + if columns is None: + return df + else: + # FIXME: invalid columns will result in a KeyError, should a + # check be done here and a more PG-specific error raised? + return df[[self.vertex_col_name, self.type_col_name] + columns] + + return None + def add_edge_data(self, dataframe, vertex_col_names, @@ -512,22 +544,22 @@ def add_edge_data(self, # columns. The copied DataFrame is then merged (another copy) and then # deleted when out-of-scope. tmp_df = dataframe.copy() - # FIXME: Find a better way to create the edge id - prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - starting_eid = prev_eid + 1 - data_size = len(tmp_df.compute().index) - cudf_series = \ - cudf.Series(range(starting_eid, starting_eid + data_size)) - dask_series =\ - dask_cudf.from_cudf(cudf_series, self.__num_workers) - dask_series = dask_series.reset_index(drop=True) - self.__last_edge_id = starting_eid + data_size - tmp_df = tmp_df.reset_index(drop=True) - tmp_df[self.edge_id_col_name] = dask_series tmp_df[self.type_col_name] = type_name + + # Add unique edge IDs to the new rows. This is just a count for each + # row starting from the last edge ID value, with initial edge ID 0. + starting_eid = ( + -1 if self.__last_edge_id is None else self.__last_edge_id + ) + tmp_df[self.edge_id_col_name] = 1 + tmp_df[self.edge_id_col_name] = ( + tmp_df[self.edge_id_col_name].cumsum() + starting_eid + ) + self.__last_edge_id = starting_eid + len(tmp_df.index) tmp_df.persist() + if property_columns: # all columns column_names_to_drop = set(tmp_df.columns) @@ -542,13 +574,50 @@ def add_edge_data(self, new_col_info = self.__get_new_column_dtypes( tmp_df, self.__edge_prop_dataframe) self.__edge_prop_dtypes.update(new_col_info) + self.__edge_prop_dataframe = \ self.__edge_prop_dataframe.merge(tmp_df, how="outer") + # Update the vertex eval dict with the latest column instances latest = dict([(n, self.__edge_prop_dataframe[n]) for n in self.__edge_prop_dataframe.columns]) self.__edge_prop_eval_dict.update(latest) + def get_edge_data(self, edge_ids=None, types=None, columns=None): + """ + Return a dataframe containing edge properties for only the specified + edge_ids, columns, and/or edge type, or all edge IDs if not specified. + """ + if self.__edge_prop_dataframe is not None: + if edge_ids is not None: + df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\ + .isin(edge_ids) + df = self.__edge_prop_dataframe.loc[df_mask] + else: + df = self.__edge_prop_dataframe + + if types is not None: + # FIXME: coerce types to a list-like if not? + df_mask = df[self.type_col_name].isin(types) + df = df.loc[df_mask] + + # The "internal" src, dst, edge_id, and type columns are also + # included/added since they are assumed to be needed by the caller. + if columns is None: + # remove the "internal" weight column if one was added + all_columns = list(self.__edge_prop_dataframe.columns) + if self.weight_col_name in all_columns: + all_columns.remove(self.weight_col_name) + return df[all_columns] + else: + # FIXME: invalid columns will result in a KeyError, should a + # check be done here and a more PG-specific error raised? + return df[[self.src_col_name, self.dst_col_name, + self.edge_id_col_name, self.type_col_name] + + columns] + + return None + def select_vertices(self, expr, from_previous_selection=None): raise NotImplementedError @@ -766,16 +835,21 @@ def edge_props_to_graph(self, raise RuntimeError("query resulted in duplicate edges which " f"cannot be represented with the {msg}") - # FIXME: MNMG Graphs required renumber to be True due to requirements - # on legacy code that needed segment offsets, partition offsets, - # etc. which were previously computed during the "legacy" C - # renumbering. The workaround is to pass renumber=True, then manually - # call G.compute_renumber_edge_list(legacy_renum_only=True) to compute - # the required meta-data without changing vertex IDs. + # FIXME: This forces the renumbering code to run a python-only + # renumbering without the newer C++ renumbering step. This is + # required since the newest graph algos which are using the + # pylibcugraph library will crash if passed data renumbered using the + # C++ renumbering. The consequence of this is that these extracted + # subgraphs can only be used with newer pylibcugraph-based MG algos. + # + # NOTE: if the vertices are integers (int32 or int64), renumbering is + # actually skipped with the assumption that the C renumbering will + # take place. The C renumbering only occurs for pylibcugraph algos, + # hence the reason these extracted subgraphs only work with PLC algos. if renumber_graph is False: - renumber = True - else: - renumber = renumber_graph + raise ValueError("currently, renumber_graph must be set to True " + "for MG") + legacy_renum_only = True col_names = [self.src_col_name, self.dst_col_name] if edge_attr is not None: @@ -785,14 +859,8 @@ def edge_props_to_graph(self, source=self.src_col_name, destination=self.dst_col_name, edge_attr=edge_attr, - renumber=renumber) - # FIXME: see FIXME above - to generate the edgelist, - # compute_renumber_edge_list() must be called, but legacy mode needs to - # be used based on if renumbering was to be done or not. - if renumber_graph is False: - G.compute_renumber_edge_list(legacy_renum_only=True) - else: - G.compute_renumber_edge_list(legacy_renum_only=False) + renumber=renumber_graph, + legacy_renum_only=legacy_renum_only) if add_edge_data: # Set the edge_data on the resulting Graph to a DataFrame diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 6137b6952a0..feeafd32026 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -23,6 +23,7 @@ _dataframe_types.append(pd.DataFrame) +# FIXME: remove leading EXPERIMENTAL__ when no longer experimental class EXPERIMENTAL__PropertySelection: """ Instances of this class are returned from the PropertyGraph.select_*() @@ -50,7 +51,7 @@ def __add__(self, other): return EXPERIMENTAL__PropertySelection(vs, es) -# FIXME: remove leading __ when no longer experimental +# FIXME: remove leading EXPERIMENTAL__ when no longer experimental class EXPERIMENTAL__PropertyGraph: """ Class which stores vertex and edge properties that can be used to construct @@ -144,7 +145,8 @@ def __init__(self): def edges(self): if self.__edge_prop_dataframe is not None: return self.__edge_prop_dataframe[[self.src_col_name, - self.dst_col_name]] + self.dst_col_name, + self.edge_id_col_name]] return None @property @@ -439,6 +441,38 @@ def add_vertex_data(self, for n in self.__vertex_prop_dataframe.columns]) self.__vertex_prop_eval_dict.update(latest) + def get_vertex_data(self, vertex_ids=None, types=None, columns=None): + """ + Return a dataframe containing vertex properties for only the specified + vertex_ids, columns, and/or types, or all vertex IDs if not specified. + """ + if self.__vertex_prop_dataframe is not None: + if vertex_ids is not None: + df_mask = ( + self.__vertex_prop_dataframe[self.vertex_col_name] + .isin(vertex_ids) + ) + df = self.__vertex_prop_dataframe.loc[df_mask] + else: + df = self.__vertex_prop_dataframe + + if types is not None: + # FIXME: coerce types to a list-like if not? + df_mask = df[self.type_col_name].isin(types) + df = df.loc[df_mask] + + # The "internal" pG.vertex_col_name and pG.type_col_name columns + # are also included/added since they are assumed to be needed by + # the caller. + if columns is None: + return df + else: + # FIXME: invalid columns will result in a KeyError, should a + # check be done here and a more PG-specific error raised? + return df[[self.vertex_col_name, self.type_col_name] + columns] + + return None + def add_edge_data(self, dataframe, vertex_col_names, @@ -538,9 +572,19 @@ def add_edge_data(self, tmp_df = dataframe.copy(deep=True) tmp_df[self.src_col_name] = tmp_df[vertex_col_names[0]] tmp_df[self.dst_col_name] = tmp_df[vertex_col_names[1]] - # FIXME: handle case of a type_name column already being in tmp_df tmp_df[self.type_col_name] = type_name + # Add unique edge IDs to the new rows. This is just a count for each + # row starting from the last edge ID value, with initial edge ID 0. + starting_eid = ( + -1 if self.__last_edge_id is None else self.__last_edge_id + ) + tmp_df[self.edge_id_col_name] = 1 + tmp_df[self.edge_id_col_name] = ( + tmp_df[self.edge_id_col_name].cumsum() + starting_eid + ) + self.__last_edge_id = starting_eid + len(tmp_df.index) + if property_columns: # all columns column_names_to_drop = set(tmp_df.columns) @@ -559,13 +603,46 @@ def add_edge_data(self, self.__edge_prop_dataframe = \ self.__edge_prop_dataframe.merge(tmp_df, how="outer") - self.__add_edge_ids() - # Update the vertex eval dict with the latest column instances latest = dict([(n, self.__edge_prop_dataframe[n]) for n in self.__edge_prop_dataframe.columns]) self.__edge_prop_eval_dict.update(latest) + def get_edge_data(self, edge_ids=None, types=None, columns=None): + """ + Return a dataframe containing edge properties for only the specified + edge_ids, columns, and/or edge type, or all edge IDs if not specified. + """ + if self.__edge_prop_dataframe is not None: + if edge_ids is not None: + df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\ + .isin(edge_ids) + df = self.__edge_prop_dataframe.loc[df_mask] + else: + df = self.__edge_prop_dataframe + + if types is not None: + # FIXME: coerce types to a list-like if not? + df_mask = df[self.type_col_name].isin(types) + df = df.loc[df_mask] + + # The "internal" src, dst, edge_id, and type columns are also + # included/added since they are assumed to be needed by the caller. + if columns is None: + # remove the "internal" weight column if one was added + all_columns = list(self.__edge_prop_dataframe.columns) + if self.weight_col_name in all_columns: + all_columns.remove(self.weight_col_name) + return df[all_columns] + else: + # FIXME: invalid columns will result in a KeyError, should a + # check be done here and a more PG-specific error raised? + return df[[self.src_col_name, self.dst_col_name, + self.edge_id_col_name, self.type_col_name] + + columns] + + return None + def select_vertices(self, expr, from_previous_selection=None): """ Evaluate expr and return a PropertySelection object representing the @@ -957,26 +1034,6 @@ def __create_property_lookup_table(self, edge_prop_df): self.dst_col_name: dst, self.edge_id_col_name: edge_id}) - def __add_edge_ids(self): - """ - Replace nans with unique edge IDs. Edge IDs are simply numbers - incremented by 1 for each edge. - """ - prev_eid = -1 if self.__last_edge_id is None else self.__last_edge_id - nans = self.__edge_prop_dataframe[self.edge_id_col_name].isna() - - if nans.any(): - indices = nans.index[nans] - num_indices = len(indices) - starting_eid = prev_eid + 1 - new_eids = self.__series_type( - range(starting_eid, starting_eid + num_indices)) - - self.__edge_prop_dataframe[self.edge_id_col_name]\ - .iloc[indices] = new_eids - - self.__last_edge_id = starting_eid + num_indices - 1 - def __get_all_vertices_series(self): """ Return a list of all Series objects that contain vertices from all diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index bae807d5e3a..eceec8b658f 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -215,7 +215,7 @@ def dataset1_PropertyGraph(request): vertex_col_names=("user_id_1", "user_id_2"), property_columns=None) - return pG + return (pG, dataset1) @pytest.fixture(scope="module") @@ -275,7 +275,24 @@ def dataset1_MGPropertyGraph(dask_client): vertex_col_names=("user_id_1", "user_id_2"), property_columns=None) - return mpG + return (mpG, dataset1) + + +@pytest.fixture(scope="module") +def dataset2_simple_MGPropertyGraph(dask_client): + from cugraph.experimental import MGPropertyGraph + + dataframe_type = cudf.DataFrame + simple = dataset2["simple"] + mpG = MGPropertyGraph() + + sg_df = dataframe_type(columns=simple[0], data=simple[1]) + mgdf = dask_cudf.from_cudf(sg_df, npartitions=2) + + mpG.add_edge_data(mgdf, + vertex_col_names=("src", "dst")) + + return (mpG, simple) @pytest.fixture(scope="module") @@ -350,8 +367,8 @@ def test_extract_subgraph_no_query(net_MGPropertyGraph, net_PropertyGraph): @pytest.mark.skip(reason="Skipping tests because it is a work in progress") def test_adding_fixture(dataset1_PropertyGraph, dataset1_MGPropertyGraph): - sgpG = dataset1_PropertyGraph - mgPG = dataset1_MGPropertyGraph + (sgpG, _) = dataset1_PropertyGraph + (mgPG, _) = dataset1_MGPropertyGraph subgraph = sgpG.extract_subgraph(allow_multi_edges=True) dask_subgraph = mgPG.extract_subgraph(allow_multi_edges=True) sg_subgraph_df = \ @@ -367,8 +384,8 @@ def test_adding_fixture(dataset1_PropertyGraph, dataset1_MGPropertyGraph): @pytest.mark.skip(reason="Skipping tests because it is a work in progress") def test_frame_data(dataset1_PropertyGraph, dataset1_MGPropertyGraph): - sgpG = dataset1_PropertyGraph - mgpG = dataset1_MGPropertyGraph + (sgpG, _) = dataset1_PropertyGraph + (mgpG, _) = dataset1_MGPropertyGraph edge_sort_col = ['_SRC_', '_DST_', '_TYPE_'] vert_sort_col = ['_VERTEX_', '_TYPE_'] @@ -392,7 +409,7 @@ def test_property_names_attrs(dataset1_MGPropertyGraph): Ensure the correct number of user-visible properties for vertices and edges are returned. This should exclude the internal bookkeeping properties. """ - pG = dataset1_MGPropertyGraph + (pG, data) = dataset1_MGPropertyGraph expected_vert_prop_names = ["merchant_id", "merchant_location", "merchant_size", "merchant_sales", @@ -414,39 +431,48 @@ def test_property_names_attrs(dataset1_MGPropertyGraph): assert sorted(actual_edge_prop_names) == sorted(expected_edge_prop_names) -def test_extract_subgraph_nonrenumbered_noedgedata(dataset2_MGPropertyGraph): +def test_extract_subgraph_nonrenumbered_noedgedata( + dataset2_simple_MGPropertyGraph): """ - Ensure a subgraph can be extracted that is not renumbered and contains no - edge_data. + Ensure a subgraph can be extracted that contains no edge_data. Also ensure + renumber cannot be False since that is currently not allowed for MG. """ from cugraph import Graph - (pG, data) = dataset2_MGPropertyGraph + (pG, data) = dataset2_simple_MGPropertyGraph + + # renumber=False is currently not allowed for MG. + with pytest.raises(ValueError): + G = pG.extract_subgraph(create_using=Graph(directed=True), + renumber_graph=False, + add_edge_data=False) + G = pG.extract_subgraph(create_using=Graph(directed=True), - renumber_graph=False, add_edge_data=False) actual_edgelist = G.edgelist.edgelist_df.compute() + src_col_name = pG.src_col_name + dst_col_name = pG.dst_col_name + # create a DF without the properties (ie. the last column) - expected_edgelist = cudf.DataFrame(columns=[pG.src_col_name, - pG.dst_col_name], + expected_edgelist = cudf.DataFrame(columns=[src_col_name, dst_col_name], data=[(i, j) for (i, j, k) in data[1]]) - assert_frame_equal(expected_edgelist.sort_values(by=pG.src_col_name, + assert_frame_equal(expected_edgelist.sort_values(by=src_col_name, ignore_index=True), - actual_edgelist.sort_values(by=pG.src_col_name, + actual_edgelist.sort_values(by=src_col_name, ignore_index=True)) assert hasattr(G, "edge_data") is False -def test_num_vertices_with_properties(dataset2_MGPropertyGraph): +def test_num_vertices_with_properties(dataset2_simple_MGPropertyGraph): """ Checks that the num_vertices_with_properties attr is set to the number of vertices that have properties, as opposed to just num_vertices which also includes all verts in the graph edgelist. """ - (pG, data) = dataset2_MGPropertyGraph + (pG, data) = dataset2_simple_MGPropertyGraph # assume no repeated vertices assert pG.get_num_vertices() == len(data[1]) * 2 @@ -463,11 +489,11 @@ def test_num_vertices_with_properties(dataset2_MGPropertyGraph): assert pG.get_num_vertices(include_edge_data=False) == 2 -def test_edges_attr(dataset2_MGPropertyGraph): +def test_edges_attr(dataset2_simple_MGPropertyGraph): """ Ensure the edges attr returns the src, dst, edge_id columns properly. """ - (pG, data) = dataset2_MGPropertyGraph + (pG, data) = dataset2_simple_MGPropertyGraph # create a DF without the properties (ie. the last column) expected_edges = cudf.DataFrame(columns=[pG.src_col_name, pG.dst_col_name], @@ -482,3 +508,118 @@ def test_edges_attr(dataset2_MGPropertyGraph): expected_num_edges = len(data[1]) assert len(edge_ids) == expected_num_edges assert edge_ids.nunique() == expected_num_edges + + +def test_get_vertex_data(dataset1_MGPropertyGraph): + """ + Ensure PG.get_vertex_data() returns the correct data based on vertex IDs + passed in. + """ + (pG, data) = dataset1_MGPropertyGraph + + # Ensure the generated vertex IDs are unique + all_vertex_data = pG.get_vertex_data() + assert all_vertex_data[pG.vertex_col_name].nunique().compute() == \ + len(all_vertex_data) + + # Test with specific columns and types + vert_type = "merchants" + columns = ["merchant_location", "merchant_size"] + + some_vertex_data = pG.get_vertex_data(types=[vert_type], columns=columns) + # Ensure the returned df is the right length and includes only the + # vert/type + specified columns + standard_vert_columns = [pG.vertex_col_name, pG.type_col_name] + assert len(some_vertex_data) == len(data[vert_type][1]) + assert ( + sorted(some_vertex_data.columns) == + sorted(columns + standard_vert_columns) + ) + + # Test with all params specified + vert_ids = [11, 4, 21] + vert_type = "merchants" + columns = ["merchant_location", "merchant_size"] + + some_vertex_data = pG.get_vertex_data(vertex_ids=vert_ids, + types=[vert_type], + columns=columns) + # Ensure the returned df is the right length and includes at least the + # specified columns. + assert len(some_vertex_data) == len(vert_ids) + assert set(columns) - set(some_vertex_data.columns) == set() + + +def test_get_edge_data(dataset1_MGPropertyGraph): + """ + Ensure PG.get_edge_data() returns the correct data based on edge IDs passed + in. + """ + (pG, data) = dataset1_MGPropertyGraph + + # Ensure the generated edge IDs are unique + all_edge_data = pG.get_edge_data() + assert all_edge_data[pG.edge_id_col_name].nunique().compute() == \ + len(all_edge_data) + + # Test with specific edge IDs + edge_ids = [4, 5, 6] + some_edge_data = pG.get_edge_data(edge_ids) + actual_edge_ids = some_edge_data[pG.edge_id_col_name].compute() + if hasattr(actual_edge_ids, "values_host"): + actual_edge_ids = actual_edge_ids.values_host + assert sorted(actual_edge_ids) == sorted(edge_ids) + + # Create a list of expected column names from the three input tables + expected_columns = set([pG.src_col_name, pG.dst_col_name, + pG.edge_id_col_name, pG.type_col_name]) + for d in ["transactions", "relationships", "referrals"]: + for name in data[d][0]: + expected_columns.add(name) + + actual_columns = set(some_edge_data.columns) + + assert actual_columns == expected_columns + + # Test with specific columns and types + edge_type = "transactions" + columns = ["card_num", "card_type"] + + some_edge_data = pG.get_edge_data(types=[edge_type], columns=columns) + # Ensure the returned df is the right length and includes only the + # src/dst/id/type + specified columns + standard_edge_columns = [pG.src_col_name, pG.dst_col_name, + pG.edge_id_col_name, pG.type_col_name] + assert len(some_edge_data) == len(data[edge_type][1]) + assert ( + sorted(some_edge_data.columns) == + sorted(columns + standard_edge_columns) + ) + + # Test with all params specified + # FIXME: since edge IDs are generated, assume that these are correct based + # on the intended edges being the first three added. + edge_ids = [0, 1, 2] + edge_type = "transactions" + columns = ["card_num", "card_type"] + some_edge_data = pG.get_edge_data(edge_ids=edge_ids, + types=[edge_type], + columns=columns) + # Ensure the returned df is the right length and includes at least the + # specified columns. + assert len(some_edge_data) == len(edge_ids) + assert set(columns) - set(some_edge_data.columns) == set() + + +def test_get_data_empty_graphs(dask_client): + """ + Ensures that calls to pG.get_*_data() on an empty pG are handled correctly. + """ + from cugraph.experimental import MGPropertyGraph + + pG = MGPropertyGraph() + + assert pG.get_vertex_data() is None + assert pG.get_vertex_data([0, 1, 2]) is None + assert pG.get_edge_data() is None + assert pG.get_edge_data([0, 1, 2]) is None diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index c0fb2299224..586f0a80a56 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -97,6 +97,18 @@ ], } + +dataset2 = { + "simple": [ + ["src", "dst", "some_property"], + [(99, 22, "a"), + (98, 34, "b"), + (97, 56, "c"), + (96, 88, "d"), + ] + ], +} + # Placeholder for a directed Graph instance. This is not constructed here in # order to prevent cuGraph code from running on import, which would prevent # proper pytest collection if an exception is raised. See setup_function(). @@ -214,7 +226,27 @@ def dataset1_PropertyGraph(request): vertex_col_names=("user_id_1", "user_id_2"), property_columns=None) - return pG + + return (pG, dataset1) + + +@pytest.fixture(scope="module", params=df_types_fixture_params) +def dataset2_simple_PropertyGraph(request): + """ + Fixture which returns an instance of a PropertyGraph with only edge + data added from dataset2, parameterized for different DataFrame types. + """ + dataframe_type = request.param[0] + from cugraph.experimental import PropertyGraph + + dataframe_type = cudf.DataFrame + simple = dataset2["simple"] + pG = PropertyGraph() + df = dataframe_type(columns=simple[0], data=simple[1]) + + pG.add_edge_data(df, vertex_col_names=("src", "dst")) + + return (pG, simple) @pytest.fixture(scope="module", params=df_types_fixture_params) @@ -479,6 +511,150 @@ def test_num_vertices_with_properties(df_type): assert pG.get_num_vertices(include_edge_data=False) == 2 +def test_edges_attr(dataset2_simple_PropertyGraph): + """ + Ensure the edges attr returns the src, dst, edge_id columns properly. + """ + (pG, data) = dataset2_simple_PropertyGraph + + # create a DF without the properties (ie. the last column) + expected_edges = cudf.DataFrame(columns=[pG.src_col_name, pG.dst_col_name], + data=[(i, j) for (i, j, k) in data[1]]) + actual_edges = pG.edges[[pG.src_col_name, pG.dst_col_name]] + + assert_frame_equal(expected_edges.sort_values(by=pG.src_col_name, + ignore_index=True), + actual_edges.sort_values(by=pG.src_col_name, + ignore_index=True)) + edge_ids = pG.edges[pG.edge_id_col_name] + expected_num_edges = len(data[1]) + assert len(edge_ids) == expected_num_edges + assert edge_ids.nunique() == expected_num_edges + + +def test_get_vertex_data(dataset1_PropertyGraph): + """ + Ensure PG.get_vertex_data() returns the correct data based on vertex IDs + passed in. + """ + (pG, data) = dataset1_PropertyGraph + + # Ensure the generated vertex IDs are unique + all_vertex_data = pG.get_vertex_data() + assert all_vertex_data[pG.vertex_col_name].nunique() == \ + len(all_vertex_data) + + # Test getting a subset of data + # Use the appropriate series type based on input + # FIXME: do not use the debug _vertex_prop_dataframe to determine type + if isinstance(pG._vertex_prop_dataframe, cudf.DataFrame): + vert_ids = cudf.Series([11, 4, 21]) + else: + vert_ids = pd.Series([11, 4, 21]) + + some_vertex_data = pG.get_vertex_data(vert_ids) + actual_vertex_ids = some_vertex_data[pG.vertex_col_name] + if hasattr(actual_vertex_ids, "values_host"): + actual_vertex_ids = actual_vertex_ids.values_host + if hasattr(vert_ids, "values_host"): + vert_ids = vert_ids.values_host + assert sorted(actual_vertex_ids) == sorted(vert_ids) + + expected_columns = set([pG.vertex_col_name, pG.type_col_name]) + for d in ["merchants", "users"]: + for name in data[d][0]: + expected_columns.add(name) + actual_columns = set(some_vertex_data.columns) + assert actual_columns == expected_columns + + # Test with specific columns and types + vert_type = "merchants" + columns = ["merchant_location", "merchant_size"] + + some_vertex_data = pG.get_vertex_data(types=[vert_type], columns=columns) + # Ensure the returned df is the right length and includes only the + # vert/type + specified columns + standard_vert_columns = [pG.vertex_col_name, pG.type_col_name] + assert len(some_vertex_data) == len(data[vert_type][1]) + assert ( + sorted(some_vertex_data.columns) == + sorted(columns + standard_vert_columns) + ) + + # Test with all params specified + vert_ids = [11, 4, 21] + vert_type = "merchants" + columns = ["merchant_location", "merchant_size"] + + some_vertex_data = pG.get_vertex_data(vertex_ids=vert_ids, + types=[vert_type], + columns=columns) + # Ensure the returned df is the right length and includes at least the + # specified columns. + assert len(some_vertex_data) == len(vert_ids) + assert set(columns) - set(some_vertex_data.columns) == set() + + +def test_get_edge_data(dataset1_PropertyGraph): + """ + Ensure PG.get_edge_data() returns the correct data based on edge IDs passed + in. + """ + (pG, data) = dataset1_PropertyGraph + + # Ensure the generated edge IDs are unique + all_edge_data = pG.get_edge_data() + assert all_edge_data[pG.edge_id_col_name].nunique() == len(all_edge_data) + + # Test with specific edge IDs + edge_ids = [4, 5, 6] + some_edge_data = pG.get_edge_data(edge_ids) + actual_edge_ids = some_edge_data[pG.edge_id_col_name] + if hasattr(actual_edge_ids, "values_host"): + actual_edge_ids = actual_edge_ids.values_host + assert sorted(actual_edge_ids) == sorted(edge_ids) + + # Create a list of expected column names from the three input tables + expected_columns = set([pG.src_col_name, pG.dst_col_name, + pG.edge_id_col_name, pG.type_col_name]) + for d in ["transactions", "relationships", "referrals"]: + for name in data[d][0]: + expected_columns.add(name) + + actual_columns = set(some_edge_data.columns) + + assert actual_columns == expected_columns + + # Test with specific columns and types + edge_type = "transactions" + columns = ["card_num", "card_type"] + + some_edge_data = pG.get_edge_data(types=[edge_type], columns=columns) + # Ensure the returned df is the right length and includes only the + # src/dst/id/type + specified columns + standard_edge_columns = [pG.src_col_name, pG.dst_col_name, + pG.edge_id_col_name, pG.type_col_name] + assert len(some_edge_data) == len(data[edge_type][1]) + assert ( + sorted(some_edge_data.columns) == + sorted(columns + standard_edge_columns) + ) + + # Test with all params specified + # FIXME: since edge IDs are generated, assume that these are correct based + # on the intended edges being the first three added. + edge_ids = [0, 1, 2] + edge_type = "transactions" + columns = ["card_num", "card_type"] + some_edge_data = pG.get_edge_data(edge_ids=edge_ids, + types=[edge_type], + columns=columns) + # Ensure the returned df is the right length and includes at least the + # specified columns. + assert len(some_edge_data) == len(edge_ids) + assert set(columns) - set(some_edge_data.columns) == set() + + @pytest.mark.parametrize("df_type", df_types, ids=df_type_id) def test_null_data(df_type): """ @@ -651,7 +827,7 @@ def test_add_edge_data_bad_args(): def test_extract_subgraph_vertex_prop_condition_only(dataset1_PropertyGraph): - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph # This should result in two users: 78634 and 89216 selection = pG.select_vertices( @@ -678,7 +854,7 @@ def test_extract_subgraph_vertex_prop_condition_only(dataset1_PropertyGraph): def test_extract_subgraph_vertex_edge_prop_condition(dataset1_PropertyGraph): from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name selection = pG.select_vertices("(user_location==47906) | " @@ -702,7 +878,7 @@ def test_extract_subgraph_vertex_edge_prop_condition(dataset1_PropertyGraph): def test_extract_subgraph_edge_prop_condition_only(dataset1_PropertyGraph): from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name selection = pG.select_edges(f"{tcn} =='transactions'") @@ -732,7 +908,7 @@ def test_extract_subgraph_unweighted(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name selection = pG.select_edges(f"{tcn} == 'transactions'") @@ -749,7 +925,7 @@ def test_extract_subgraph_specific_query(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name selection = pG.select_edges(f"({tcn}=='transactions') & " @@ -777,7 +953,7 @@ def test_select_vertices_from_previous_selection(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name # Select referrals from only users 89216 and 78634 using an intentionally @@ -845,7 +1021,7 @@ def test_extract_subgraph_no_edges(dataset1_PropertyGraph): """ Valid query that only matches a single vertex. """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph selection = pG.select_vertices("(_TYPE_=='merchants') & (merchant_id==86)") G = pG.extract_subgraph(selection=selection) @@ -858,7 +1034,7 @@ def test_extract_subgraph_no_query(dataset1_PropertyGraph): """ Call extract with no args, should result in the entire property graph. """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph G = pG.extract_subgraph(create_using=DiGraph_inst, allow_multi_edges=True) @@ -881,7 +1057,7 @@ def test_extract_subgraph_multi_edges(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name # referrals has multiple edges @@ -896,7 +1072,7 @@ def test_extract_subgraph_multi_edges(dataset1_PropertyGraph): def test_extract_subgraph_bad_args(dataset1_PropertyGraph): from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name # non-PropertySelection selection @@ -932,7 +1108,7 @@ def test_extract_subgraph_default_edge_weight(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph tcn = PropertyGraph.type_col_name selection = pG.select_edges(f"{tcn}=='transactions'") @@ -971,7 +1147,7 @@ def test_extract_subgraph_default_edge_weight_no_property( Ensure default_edge_weight can be used to provide an edge value when a property for the edge weight is not specified. """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph edge_weight = 99.2 G = pG.extract_subgraph(allow_multi_edges=True, default_edge_weight=edge_weight) @@ -1014,7 +1190,7 @@ def test_graph_edge_data_added(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph eicn = PropertyGraph.edge_id_col_name expected_num_edges = \ @@ -1052,7 +1228,7 @@ def test_annotate_dataframe(dataset1_PropertyGraph): copy=False invalid args raise correct exceptions """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph selection = pG.select_edges("(_TYPE_ == 'referrals') & (stars > 3)") G = pG.extract_subgraph(selection=selection, @@ -1139,7 +1315,7 @@ def test_get_vertices(dataset1_PropertyGraph): Test that get_vertices() returns the correct set of vertices without duplicates. """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph (merchants, users, taxpayers, transactions, relationships, referrals) = dataset1.values() @@ -1158,7 +1334,7 @@ def test_get_edges(dataset1_PropertyGraph): """ from cugraph.experimental import PropertyGraph - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph (merchants, users, taxpayers, transactions, relationships, referrals) = dataset1.values() @@ -1182,7 +1358,7 @@ def test_property_names_attrs(dataset1_PropertyGraph): Ensure the correct number of user-visible properties for vertices and edges are returned. This should exclude the internal bookkeeping properties. """ - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph expected_vert_prop_names = ["merchant_id", "merchant_location", "merchant_size", "merchant_sales", @@ -1215,17 +1391,31 @@ def test_extract_subgraph_with_vertex_ids(): raise NotImplementedError +def test_get_data_empty_graphs(): + """ + Ensures that calls to pG.get_*_data() on an empty pG are handled correctly. + """ + from cugraph.experimental import PropertyGraph + + pG = PropertyGraph() + + assert pG.get_vertex_data() is None + assert pG.get_vertex_data([0, 1, 2]) is None + assert pG.get_edge_data() is None + assert pG.get_edge_data([0, 1, 2]) is None + + # ============================================================================= # Benchmarks # ============================================================================= def bench_num_vertices(gpubenchmark, dataset1_PropertyGraph): - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph assert gpubenchmark(pG.get_num_vertices) == 9 def bench_get_vertices(gpubenchmark, dataset1_PropertyGraph): - pG = dataset1_PropertyGraph + (pG, data) = dataset1_PropertyGraph gpubenchmark(pG.get_vertices)