diff --git a/python/cugraph/cugraph/gnn/graph_store.py b/python/cugraph/cugraph/gnn/graph_store.py index ed78e81d204..0b40cc3bf0a 100644 --- a/python/cugraph/cugraph/gnn/graph_store.py +++ b/python/cugraph/cugraph/gnn/graph_store.py @@ -16,8 +16,9 @@ import cugraph from cugraph.experimental import PropertyGraph from cugraph.community.egonet import batched_ego_graphs -from cugraph.utilities.utils import sample_groups +from cugraph.sampling import uniform_neighbor_sample import cupy as cp +from functools import cached_property src_n = PropertyGraph.src_col_name @@ -83,7 +84,8 @@ def get_node_storage(self, key, ntype=None): ) ) ntype = ntypes[0] - + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/pull/2444 df = self.gdata._vertex_prop_dataframe col_names = self.ndata_key_col_d[key] return CuFeatureStorage( @@ -107,6 +109,8 @@ def get_edge_storage(self, key, etype=None): etype = etypes[0] col_names = self.edata_key_col_d[key] + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/pull/2444 df = self.gdata._edge_prop_dataframe return CuFeatureStorage( df=df, @@ -124,12 +128,16 @@ def num_edges(self, etype=None): @property def ntypes(self): + # FIXME: Remove once below is fixed + # https://github.com/rapidsai/cugraph/issues/2423 s = self.gdata._vertex_prop_dataframe[type_n] ntypes = s.drop_duplicates().to_arrow().to_pylist() return ntypes @property def etypes(self): + # FIXME: Remove once below is fixed + # https://github.com/rapidsai/cugraph/issues/2423 s = self.gdata._edge_prop_dataframe[type_n] ntypes = s.drop_duplicates().to_arrow().to_pylist() return ntypes @@ -137,6 +145,8 @@ def etypes(self): @property def ndata(self): return { + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/pull/2444 k: self.gdata._vertex_prop_dataframe[col_names].dropna(how="all") for k, col_names in self.ndata_key_col_d.items() } @@ -144,6 +154,8 @@ def ndata(self): @property def edata(self): return { + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/pull/2444 k: self.gdata._edge_prop_dataframe[col_names].dropna(how="all") for k, col_names in self.edata_key_col_d.items() } @@ -202,67 +214,105 @@ def sample_neighbors( DLPack capsule The corresponding eids for the sampled bipartite graph """ - nodes = cudf.from_dlpack(nodes) - num_nodes = len(nodes) - current_seeds = nodes.reindex(index=cp.arange(0, num_nodes)) - _g = self.__G.extract_subgraph( - create_using=cugraph.Graph, allow_multi_edges=True - ) - ego_edge_list, seeds_offsets = batched_ego_graphs( - _g, current_seeds, radius=1 - ) - del _g - # filter and get a certain size neighborhood + if edge_dir not in ["in", "out"]: + raise ValueError( + f"edge_dir must be either 'in' or 'out' got {edge_dir} instead" + ) - # Step 1 - # Get Filtered List of ego_edge_list corresposing to current_seeds - # We filter by creating a series of destination nodes - # corresponding to the offsets and filtering non matching vallues + if edge_dir == "in": + sg = self.extracted_reverse_subgraph_without_renumbering + else: + sg = self.extracted_subgraph_without_renumbering - seeds_offsets_s = cudf.Series(seeds_offsets).values - offset_lens = seeds_offsets_s[1:] - seeds_offsets_s[0:-1] - dst_seeds = current_seeds.repeat(offset_lens) - dst_seeds.index = ego_edge_list.index - filtered_list = ego_edge_list[ego_edge_list["dst"] == dst_seeds] + if not hasattr(self, '_sg_node_dtype'): + self._sg_node_dtype = sg.edgelist.edgelist_df['src'].dtype - del dst_seeds, offset_lens, seeds_offsets_s - del ego_edge_list, seeds_offsets + # Uniform sampling assumes fails when the dtype + # if the seed dtype is not same as the node dtype + nodes = cudf.from_dlpack(nodes).astype(self._sg_node_dtype) - # Step 2 - # Sample Fan Out - # for each dst take maximum of fanout samples - filtered_list = sample_groups( - filtered_list, by="dst", n_samples=fanout + sampled_df = uniform_neighbor_sample( + sg, start_list=nodes, fanout_vals=[fanout], + with_replacement=replace ) - # TODO: Verify order of execution - sample_df = cudf.DataFrame( - {src_n: filtered_list["src"], dst_n: filtered_list["dst"]} - ) - del filtered_list + sampled_df.drop(columns=["indices"], inplace=True) - # del parents_nodes, children_nodes - edge_df = sample_df.merge( - self.gdata._edge_prop_dataframe[[src_n, dst_n, eid_n]], - on=[src_n, dst_n], - ) + # handle empty graph case + if len(sampled_df) == 0: + return None, None, None + + # we reverse directions when directions=='in' + if edge_dir == "in": + sampled_df.rename( + columns={"destinations": src_n, "sources": dst_n}, inplace=True + ) + else: + sampled_df.rename( + columns={"sources": src_n, "destinations": dst_n}, inplace=True + ) + + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/issues/2444 + edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n, eid_n]] + sampled_df = edge_df.merge(sampled_df) return ( - edge_df[src_n].to_dlpack(), - edge_df[dst_n].to_dlpack(), - edge_df[eid_n].to_dlpack(), + sampled_df[src_n].to_dlpack(), + sampled_df[dst_n].to_dlpack(), + sampled_df[eid_n].to_dlpack(), ) - def find_edges(self, edge_ids, etype): + @cached_property + def extracted_reverse_subgraph_without_renumbering(self): + # TODO: Switch to extract_subgraph based on response on + # https://github.com/rapidsai/cugraph/issues/2458 + subset_df = self.gdata._edge_prop_dataframe[[src_n, dst_n]] + subset_df.rename(columns={src_n: dst_n, dst_n: src_n}, inplace=True) + subset_df["weight"] = cp.float32(1.0) + subgraph = cugraph.Graph(directed=True) + subgraph.from_cudf_edgelist( + subset_df, + source=src_n, + destination=dst_n, + edge_attr="weight", + legacy_renum_only=True, + ) + return subgraph + + @cached_property + def extracted_subgraph_without_renumbering(self): + gr_template = cugraph.Graph(directed=True) + subgraph = self.gdata.extract_subgraph(create_using=gr_template, + default_edge_weight=1.0, + renumber_graph=True) + return subgraph + + def find_edges(self, edge_ids_cap, etype): """Return the source and destination node IDs given the edge IDs within the given edge type. - Return type is - cudf.Series, cudf.Series + + Parameters + ---------- + edge_ids_cap : Dlpack of Node IDs (single dimension) + The edge ids to find + + Returns + ------- + DLPack capsule + The src nodes for the given ids + + DLPack capsule + The dst nodes for the given ids """ - edge_df = self.gdata._edge_prop_dataframe[ - [src_n, dst_n, eid_n, type_n] - ] + edge_ids = cudf.from_dlpack(edge_ids_cap) + + # FIXME: Remove once below lands + # https://github.com/rapidsai/cugraph/issues/2444 + edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n, + eid_n, type_n]] + subset_df = get_subset_df( edge_df, PropertyGraph.edge_id_col_name, edge_ids, etype ) diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index 12c825dbb3a..3c7a7262025 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -160,6 +160,7 @@ def test_sample_neighbors(graph_file): assert len(parents_list) > 0 +@pytest.mark.skip(reason="Neg one fanout fails see cugraph/issues/2446") @pytest.mark.parametrize("graph_file", utils.DATASETS) def test_sample_neighbor_neg_one_fanout(graph_file): cu_M = utils.read_csv_file(graph_file) @@ -418,11 +419,94 @@ def test_get_edge_storage_gs(dataset1_CuGraphStore): def test_sampling_gs(dataset1_CuGraphStore): node_pack = cp.asarray([4]).toDlpack() - ( - parents_cap, - children_cap, - edge_id_cap, - ) = dataset1_CuGraphStore.sample_neighbors(node_pack, fanout=1) - x = cudf.from_dlpack(parents_cap) - - assert x is not None + gs = dataset1_CuGraphStore + src_cap, _, _ = gs.sample_neighbors(node_pack, fanout=1) + src_ser = cudf.from_dlpack(src_cap) + assert len(src_ser) != 0 + + +@pytest.mark.skip(reason="Neg one fanout fails see cugraph/issues/2446") +def test_sampling_dataset_gs_neg_one_fanout(dataset1_CuGraphStore): + node_pack = cp.asarray([4]).toDlpack() + gs = dataset1_CuGraphStore + src_cap, _, _ = gs.sample_neighbors(node_pack, fanout=-1) + src_ser = cudf.from_dlpack(src_cap) + assert len(src_ser) != 0 + + +def test_sampling_gs_out_dir(): + src_ser = cudf.Series([1, 1, 1, 1, 1, 2, 2, 3]) + dst_ser = cudf.Series([2, 3, 4, 5, 6, 3, 4, 7]) + df = cudf.DataFrame( + {"src": src_ser, "dst": dst_ser, "edge_id": np.arange(len(src_ser))} + ) + pg = PropertyGraph() + gs = CuGraphStore(pg) + gs.add_edge_data(df, ["src", "dst"], edge_key="edges") + + # below are obtained from dgl runs on the same graph + expected_out = { + 1: ([1, 1, 1, 1, 1], [2, 3, 4, 5, 6]), + 2: ([2, 2], [3, 4]), + 3: ([3], [7]), + 4: ([], []), + } + + for seed in expected_out.keys(): + seed_cap = cudf.Series([seed]).to_dlpack() + sample_src, sample_dst, sample_eid = gs.sample_neighbors( + nodes=seed_cap, fanout=9, edge_dir="out" + ) + if sample_src is None: + sample_src = cudf.Series([]).astype(np.int64) + sample_dst = cudf.Series([]).astype(np.int64) + else: + sample_src = cudf.from_dlpack(sample_src) + sample_dst = cudf.from_dlpack(sample_dst) + + output_df = cudf.DataFrame({"src": sample_src, "dst": sample_dst}) + output_df = output_df.sort_values(by=["src", "dst"]) + output_df = output_df.reset_index(drop=True) + + expected_df = cudf.DataFrame( + {"src": expected_out[seed][0], "dst": expected_out[seed][1]} + ).astype(np.int64) + cudf.testing.assert_frame_equal(output_df, expected_df) + + +def test_sampling_gs_in_dir(): + src_ser = cudf.Series([1, 1, 1, 1, 1, 2, 2, 3]) + dst_ser = cudf.Series([2, 3, 4, 5, 6, 3, 4, 7]) + df = cudf.DataFrame( + {"src": src_ser, "dst": dst_ser, "edge_id": np.arange(len(src_ser))} + ) + pg = PropertyGraph() + gs = CuGraphStore(pg) + gs.add_edge_data(df, ["src", "dst"], edge_key="edges") + + # below are obtained from dgl runs on the same graph + expected_in = {1: ([], []), + 2: ([1], [2]), + 3: ([1, 2], [3, 3]), + 4: ([1, 2], [4, 4])} + + for seed in expected_in.keys(): + seed_cap = cudf.Series([seed]).to_dlpack() + sample_src, sample_dst, sample_eid = gs.sample_neighbors( + nodes=seed_cap, fanout=9, edge_dir="in" + ) + if sample_src is None: + sample_src = cudf.Series([]).astype(np.int64) + sample_dst = cudf.Series([]).astype(np.int64) + else: + sample_src = cudf.from_dlpack(sample_src) + sample_dst = cudf.from_dlpack(sample_dst) + + output_df = cudf.DataFrame({"src": sample_src, "dst": sample_dst}) + output_df = output_df.sort_values(by=["src", "dst"]) + output_df = output_df.reset_index(drop=True) + + expected_df = cudf.DataFrame( + {"src": expected_in[seed][0], "dst": expected_in[seed][1]} + ).astype(np.int64) + cudf.testing.assert_frame_equal(output_df, expected_df)