Skip to content

Commit

Permalink
Uniform neighbor sample (rapidsai#2450)
Browse files Browse the repository at this point in the history
This PR switches `cugraphstore` to use  uniform neighbor sampling. 

Opening this in favor of rapidsai#2426

Authors:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Rick Ratzel (https://github.com/rlratzel)

URL: rapidsai#2450
  • Loading branch information
VibhuJawa authored Aug 2, 2022
1 parent ac42e0b commit d50622f
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 56 deletions.
146 changes: 98 additions & 48 deletions python/cugraph/cugraph/gnn/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
import cugraph
from cugraph.experimental import PropertyGraph
from cugraph.community.egonet import batched_ego_graphs
from cugraph.utilities.utils import sample_groups
from cugraph.sampling import uniform_neighbor_sample
import cupy as cp
from functools import cached_property


src_n = PropertyGraph.src_col_name
Expand Down Expand Up @@ -83,7 +84,8 @@ def get_node_storage(self, key, ntype=None):
)
)
ntype = ntypes[0]

# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/pull/2444
df = self.gdata._vertex_prop_dataframe
col_names = self.ndata_key_col_d[key]
return CuFeatureStorage(
Expand All @@ -107,6 +109,8 @@ def get_edge_storage(self, key, etype=None):

etype = etypes[0]
col_names = self.edata_key_col_d[key]
# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/pull/2444
df = self.gdata._edge_prop_dataframe
return CuFeatureStorage(
df=df,
Expand All @@ -124,26 +128,34 @@ def num_edges(self, etype=None):

@property
def ntypes(self):
# FIXME: Remove once below is fixed
# https://github.com/rapidsai/cugraph/issues/2423
s = self.gdata._vertex_prop_dataframe[type_n]
ntypes = s.drop_duplicates().to_arrow().to_pylist()
return ntypes

@property
def etypes(self):
# FIXME: Remove once below is fixed
# https://github.com/rapidsai/cugraph/issues/2423
s = self.gdata._edge_prop_dataframe[type_n]
ntypes = s.drop_duplicates().to_arrow().to_pylist()
return ntypes

@property
def ndata(self):
return {
# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/pull/2444
k: self.gdata._vertex_prop_dataframe[col_names].dropna(how="all")
for k, col_names in self.ndata_key_col_d.items()
}

@property
def edata(self):
return {
# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/pull/2444
k: self.gdata._edge_prop_dataframe[col_names].dropna(how="all")
for k, col_names in self.edata_key_col_d.items()
}
Expand Down Expand Up @@ -202,67 +214,105 @@ def sample_neighbors(
DLPack capsule
The corresponding eids for the sampled bipartite graph
"""
nodes = cudf.from_dlpack(nodes)
num_nodes = len(nodes)
current_seeds = nodes.reindex(index=cp.arange(0, num_nodes))
_g = self.__G.extract_subgraph(
create_using=cugraph.Graph, allow_multi_edges=True
)
ego_edge_list, seeds_offsets = batched_ego_graphs(
_g, current_seeds, radius=1
)

del _g
# filter and get a certain size neighborhood
if edge_dir not in ["in", "out"]:
raise ValueError(
f"edge_dir must be either 'in' or 'out' got {edge_dir} instead"
)

# Step 1
# Get Filtered List of ego_edge_list corresposing to current_seeds
# We filter by creating a series of destination nodes
# corresponding to the offsets and filtering non matching vallues
if edge_dir == "in":
sg = self.extracted_reverse_subgraph_without_renumbering
else:
sg = self.extracted_subgraph_without_renumbering

seeds_offsets_s = cudf.Series(seeds_offsets).values
offset_lens = seeds_offsets_s[1:] - seeds_offsets_s[0:-1]
dst_seeds = current_seeds.repeat(offset_lens)
dst_seeds.index = ego_edge_list.index
filtered_list = ego_edge_list[ego_edge_list["dst"] == dst_seeds]
if not hasattr(self, '_sg_node_dtype'):
self._sg_node_dtype = sg.edgelist.edgelist_df['src'].dtype

del dst_seeds, offset_lens, seeds_offsets_s
del ego_edge_list, seeds_offsets
# Uniform sampling assumes fails when the dtype
# if the seed dtype is not same as the node dtype
nodes = cudf.from_dlpack(nodes).astype(self._sg_node_dtype)

# Step 2
# Sample Fan Out
# for each dst take maximum of fanout samples
filtered_list = sample_groups(
filtered_list, by="dst", n_samples=fanout
sampled_df = uniform_neighbor_sample(
sg, start_list=nodes, fanout_vals=[fanout],
with_replacement=replace
)

# TODO: Verify order of execution
sample_df = cudf.DataFrame(
{src_n: filtered_list["src"], dst_n: filtered_list["dst"]}
)
del filtered_list
sampled_df.drop(columns=["indices"], inplace=True)

# del parents_nodes, children_nodes
edge_df = sample_df.merge(
self.gdata._edge_prop_dataframe[[src_n, dst_n, eid_n]],
on=[src_n, dst_n],
)
# handle empty graph case
if len(sampled_df) == 0:
return None, None, None

# we reverse directions when directions=='in'
if edge_dir == "in":
sampled_df.rename(
columns={"destinations": src_n, "sources": dst_n}, inplace=True
)
else:
sampled_df.rename(
columns={"sources": src_n, "destinations": dst_n}, inplace=True
)

# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/issues/2444
edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n, eid_n]]
sampled_df = edge_df.merge(sampled_df)

return (
edge_df[src_n].to_dlpack(),
edge_df[dst_n].to_dlpack(),
edge_df[eid_n].to_dlpack(),
sampled_df[src_n].to_dlpack(),
sampled_df[dst_n].to_dlpack(),
sampled_df[eid_n].to_dlpack(),
)

def find_edges(self, edge_ids, etype):
@cached_property
def extracted_reverse_subgraph_without_renumbering(self):
# TODO: Switch to extract_subgraph based on response on
# https://github.com/rapidsai/cugraph/issues/2458
subset_df = self.gdata._edge_prop_dataframe[[src_n, dst_n]]
subset_df.rename(columns={src_n: dst_n, dst_n: src_n}, inplace=True)
subset_df["weight"] = cp.float32(1.0)
subgraph = cugraph.Graph(directed=True)
subgraph.from_cudf_edgelist(
subset_df,
source=src_n,
destination=dst_n,
edge_attr="weight",
legacy_renum_only=True,
)
return subgraph

@cached_property
def extracted_subgraph_without_renumbering(self):
gr_template = cugraph.Graph(directed=True)
subgraph = self.gdata.extract_subgraph(create_using=gr_template,
default_edge_weight=1.0,
renumber_graph=True)
return subgraph

def find_edges(self, edge_ids_cap, etype):
"""Return the source and destination node IDs given the edge IDs within
the given edge type.
Return type is
cudf.Series, cudf.Series
Parameters
----------
edge_ids_cap : Dlpack of Node IDs (single dimension)
The edge ids to find
Returns
-------
DLPack capsule
The src nodes for the given ids
DLPack capsule
The dst nodes for the given ids
"""
edge_df = self.gdata._edge_prop_dataframe[
[src_n, dst_n, eid_n, type_n]
]
edge_ids = cudf.from_dlpack(edge_ids_cap)

# FIXME: Remove once below lands
# https://github.com/rapidsai/cugraph/issues/2444
edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n,
eid_n, type_n]]

subset_df = get_subset_df(
edge_df, PropertyGraph.edge_id_col_name, edge_ids, etype
)
Expand Down
100 changes: 92 additions & 8 deletions python/cugraph/cugraph/tests/test_graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def test_sample_neighbors(graph_file):
assert len(parents_list) > 0


@pytest.mark.skip(reason="Neg one fanout fails see cugraph/issues/2446")
@pytest.mark.parametrize("graph_file", utils.DATASETS)
def test_sample_neighbor_neg_one_fanout(graph_file):
cu_M = utils.read_csv_file(graph_file)
Expand Down Expand Up @@ -418,11 +419,94 @@ def test_get_edge_storage_gs(dataset1_CuGraphStore):

def test_sampling_gs(dataset1_CuGraphStore):
node_pack = cp.asarray([4]).toDlpack()
(
parents_cap,
children_cap,
edge_id_cap,
) = dataset1_CuGraphStore.sample_neighbors(node_pack, fanout=1)
x = cudf.from_dlpack(parents_cap)

assert x is not None
gs = dataset1_CuGraphStore
src_cap, _, _ = gs.sample_neighbors(node_pack, fanout=1)
src_ser = cudf.from_dlpack(src_cap)
assert len(src_ser) != 0


@pytest.mark.skip(reason="Neg one fanout fails see cugraph/issues/2446")
def test_sampling_dataset_gs_neg_one_fanout(dataset1_CuGraphStore):
node_pack = cp.asarray([4]).toDlpack()
gs = dataset1_CuGraphStore
src_cap, _, _ = gs.sample_neighbors(node_pack, fanout=-1)
src_ser = cudf.from_dlpack(src_cap)
assert len(src_ser) != 0


def test_sampling_gs_out_dir():
src_ser = cudf.Series([1, 1, 1, 1, 1, 2, 2, 3])
dst_ser = cudf.Series([2, 3, 4, 5, 6, 3, 4, 7])
df = cudf.DataFrame(
{"src": src_ser, "dst": dst_ser, "edge_id": np.arange(len(src_ser))}
)
pg = PropertyGraph()
gs = CuGraphStore(pg)
gs.add_edge_data(df, ["src", "dst"], edge_key="edges")

# below are obtained from dgl runs on the same graph
expected_out = {
1: ([1, 1, 1, 1, 1], [2, 3, 4, 5, 6]),
2: ([2, 2], [3, 4]),
3: ([3], [7]),
4: ([], []),
}

for seed in expected_out.keys():
seed_cap = cudf.Series([seed]).to_dlpack()
sample_src, sample_dst, sample_eid = gs.sample_neighbors(
nodes=seed_cap, fanout=9, edge_dir="out"
)
if sample_src is None:
sample_src = cudf.Series([]).astype(np.int64)
sample_dst = cudf.Series([]).astype(np.int64)
else:
sample_src = cudf.from_dlpack(sample_src)
sample_dst = cudf.from_dlpack(sample_dst)

output_df = cudf.DataFrame({"src": sample_src, "dst": sample_dst})
output_df = output_df.sort_values(by=["src", "dst"])
output_df = output_df.reset_index(drop=True)

expected_df = cudf.DataFrame(
{"src": expected_out[seed][0], "dst": expected_out[seed][1]}
).astype(np.int64)
cudf.testing.assert_frame_equal(output_df, expected_df)


def test_sampling_gs_in_dir():
src_ser = cudf.Series([1, 1, 1, 1, 1, 2, 2, 3])
dst_ser = cudf.Series([2, 3, 4, 5, 6, 3, 4, 7])
df = cudf.DataFrame(
{"src": src_ser, "dst": dst_ser, "edge_id": np.arange(len(src_ser))}
)
pg = PropertyGraph()
gs = CuGraphStore(pg)
gs.add_edge_data(df, ["src", "dst"], edge_key="edges")

# below are obtained from dgl runs on the same graph
expected_in = {1: ([], []),
2: ([1], [2]),
3: ([1, 2], [3, 3]),
4: ([1, 2], [4, 4])}

for seed in expected_in.keys():
seed_cap = cudf.Series([seed]).to_dlpack()
sample_src, sample_dst, sample_eid = gs.sample_neighbors(
nodes=seed_cap, fanout=9, edge_dir="in"
)
if sample_src is None:
sample_src = cudf.Series([]).astype(np.int64)
sample_dst = cudf.Series([]).astype(np.int64)
else:
sample_src = cudf.from_dlpack(sample_src)
sample_dst = cudf.from_dlpack(sample_dst)

output_df = cudf.DataFrame({"src": sample_src, "dst": sample_dst})
output_df = output_df.sort_values(by=["src", "dst"])
output_df = output_df.reset_index(drop=True)

expected_df = cudf.DataFrame(
{"src": expected_in[seed][0], "dst": expected_in[seed][1]}
).astype(np.int64)
cudf.testing.assert_frame_equal(output_df, expected_df)

0 comments on commit d50622f

Please sign in to comment.