Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MFG creation on sampling gpus for cugraph dgl #3742

Merged
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
6263b9f
define sampling output renumbering function API
seunghwak Jul 13, 2023
6c88e86
update the API (remove multi_gpu flag)
seunghwak Jul 17, 2023
055496f
initial draft implementation
seunghwak Jul 17, 2023
27fe4f0
Merge branch 'branch-23.08' of github.com:rapidsai/cugraph into fea_mfg
seunghwak Jul 18, 2023
2211d24
add test code
seunghwak Jul 19, 2023
9fe5e13
bug fixes
seunghwak Jul 19, 2023
9ad3c8f
minor tweaks
seunghwak Jul 19, 2023
dfbc196
define API for MFG renumbering in the C API
ChuckHastings Jul 20, 2023
bc5d3e1
define API for MFG renumbering in the C API
ChuckHastings Jul 20, 2023
012d392
memory footprint cut
seunghwak Jul 20, 2023
63759b8
code improvemnt
seunghwak Jul 20, 2023
e21bcd0
bug fix and memory footprint optimization
seunghwak Jul 20, 2023
7010081
bug fix
seunghwak Jul 20, 2023
d4d5407
clang-format
seunghwak Jul 20, 2023
a746701
Merge branch 'branch-23.08' of github.com:rapidsai/cugraph into fea_mfg
seunghwak Jul 20, 2023
cf7dff8
clang-format
seunghwak Jul 20, 2023
948ad11
remove unnecessary template parameter
seunghwak Jul 20, 2023
b26523b
clang-format
seunghwak Jul 20, 2023
141bab9
Merge branch 'mfg_capi' of https://github.com/chuckhastings/cugraph i…
alexbarghi-nv Jul 21, 2023
ff813dd
Merge branch 'fea_mfg' of https://github.com/seunghwak/cugraph into c…
alexbarghi-nv Jul 21, 2023
37c08e8
Merge branch 'branch-23.08' into mfg_capi
alexbarghi-nv Jul 21, 2023
f0e5779
copyright year
seunghwak Jul 21, 2023
8222c18
Merge remote-tracking branch 'seunghwa/fea_mfg' into mfg_capi
ChuckHastings Jul 21, 2023
9a8abee
move expand_sparse_offsets from src/detail to include/cugraph/utiliti…
seunghwak Jul 21, 2023
d4d4c78
update renumber_sampled_edgelist to use the existing expand_sparse_of…
seunghwak Jul 21, 2023
c6d1d10
Merge branch 'branch-23.08' of github.com:rapidsai/cugraph into fea_mfg
seunghwak Jul 21, 2023
51a05bf
Merge branch 'mfg_capi' of github.com:chuckhastings/cugraph into mfg_…
ChuckHastings Jul 21, 2023
1e4174e
Testing with Seunghwa's branch merged in
ChuckHastings Jul 21, 2023
b1dac25
Merge branch 'fea_mfg' of https://github.com/seunghwak/cugraph into c…
alexbarghi-nv Jul 24, 2023
fa734fb
c
alexbarghi-nv Jul 24, 2023
899c0e3
merge in changes
alexbarghi-nv Jul 24, 2023
4b810fc
revert
alexbarghi-nv Jul 24, 2023
932e1ea
minor
alexbarghi-nv Jul 24, 2023
7609bac
Remove debug notebook
alexbarghi-nv Jul 24, 2023
a212d3b
refactor mg, fix test
alexbarghi-nv Jul 24, 2023
8a92353
basic functionality
alexbarghi-nv Jul 24, 2023
db77c16
Merge branch 'branch-23.08' into mfg_capi
ChuckHastings Jul 24, 2023
7e1d1d1
disable broken cugraph-pyg tests
alexbarghi-nv Jul 24, 2023
3bcdf7e
style
alexbarghi-nv Jul 24, 2023
8a16b95
pull in latest update
alexbarghi-nv Jul 24, 2023
6ffc699
Merge branch 'mfg_capi' of https://github.com/chuckhastings/cugraph i…
alexbarghi-nv Jul 24, 2023
2ceeaf7
fix pyg tests, make renumbering optional
alexbarghi-nv Jul 25, 2023
aab0b56
fix merge
alexbarghi-nv Jul 25, 2023
7979a0a
style
alexbarghi-nv Jul 25, 2023
68ef5ef
update pyg samplers
alexbarghi-nv Jul 25, 2023
665621e
sg loader tests
alexbarghi-nv Jul 25, 2023
6484b12
style
alexbarghi-nv Jul 25, 2023
2ee1ce9
remove prints
alexbarghi-nv Jul 25, 2023
bd3dbc5
add renumbering for homogenous graphs by default
alexbarghi-nv Jul 25, 2023
33c6353
style
alexbarghi-nv Jul 25, 2023
1b6dc5b
reformat
alexbarghi-nv Jul 25, 2023
bc06101
Support renumbering
VibhuJawa Jul 25, 2023
0af5c9e
Merge branch 'branch-23.08' into cugraph-dgl-sample-side-mfg
VibhuJawa Jul 25, 2023
0d0b3c8
Remove clone command
VibhuJawa Jul 25, 2023
e6e2fcd
wqMerge branch 'cugraph-dgl-sample-side-mfg' of https://github.com/Vi…
VibhuJawa Jul 25, 2023
0b3494d
wqMerge branch 'cugraph-sample-side-mfg-updated' into cugraph-dgl-sam…
VibhuJawa Jul 25, 2023
0063362
Update based on reviews
VibhuJawa Jul 26, 2023
927c7b5
Merge in updates
VibhuJawa Jul 26, 2023
2995c62
Make tests same as upstream
VibhuJawa Jul 26, 2023
f12d473
Add test based on reviews
VibhuJawa Jul 26, 2023
8d6d441
cugraph-dgl-sample-side-mfg
VibhuJawa Jul 26, 2023
4c2a42d
Add test for renumbering
VibhuJawa Jul 26, 2023
8ad21db
Add test_get_tensor_d_from_sampled_df
VibhuJawa Jul 26, 2023
cf3eb41
create_homogeneous_sampled_graphs_from_dataframe
VibhuJawa Jul 26, 2023
f62d773
Added nids to block
VibhuJawa Jul 26, 2023
42ebfda
Confirmed that pytests pass
VibhuJawa Jul 27, 2023
ff02d29
Remove print
VibhuJawa Jul 27, 2023
e4fb847
Merge branch 'branch-23.08' into cugraph-dgl-sample-side-mfg
VibhuJawa Jul 27, 2023
54d9d5f
Merge branch 'branch-23.08' into cugraph-dgl-sample-side-mfg
rlratzel Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/cugraph-dgl/cugraph_dgl/dataloading/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ def __iter__(self):
output_dir = os.path.join(
self._sampling_output_dir, "epoch_" + str(self.epoch_number)
)
if isinstance(self.cugraph_dgl_dataset, HomogenousBulkSamplerDataset):
renumber = True
else:
renumber = False

bs = BulkSampler(
output_path=output_dir,
batch_size=self._batch_size,
Expand All @@ -218,6 +223,7 @@ def __iter__(self):
seeds_per_call=self._seeds_per_call,
fanout_vals=self.graph_sampler._reversed_fanout_vals,
with_replacement=self.graph_sampler.replace,
renumber=renumber,
)
if self.shuffle:
self.tensorized_indices_ds.shuffle()
Expand Down
175 changes: 99 additions & 76 deletions python/cugraph-dgl/cugraph_dgl/dataloading/utils/sampling_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,53 +29,97 @@ def cast_to_tensor(ser: cudf.Series):
return torch.as_tensor(ser.values, device="cuda")


def _get_tensor_ls_from_sampled_df(df):
def _split_tensor(t, split_indices):
"""
Split a tensor into a list of tensors based on split_indices.
"""
# TODO: Switch to something below
# return [t[i:j] for i, j in zip(split_indices[:-1], split_indices[1:])]
if split_indices.device.type != "cpu":
split_indices = split_indices.to("cpu")
return torch.tensor_split(t, split_indices)


def _get_renumber_map(df):
map = df["map"]
df.drop(columns=["map"], inplace=True)

map_starting_offset = map.iloc[0]
renumber_map = map[map_starting_offset:].dropna().reset_index(drop=True)
renumber_map_batch_indices = map[1:map_starting_offset].reset_index(drop=True)
renumber_map_batch_indices = renumber_map_batch_indices - map_starting_offset

# Drop all rows with NaN values
df.dropna(axis=0, how="all", inplace=True)
df.reset_index(drop=True, inplace=True)

return df, cast_to_tensor(renumber_map), cast_to_tensor(renumber_map_batch_indices)


def _get_tensor_d_from_sampled_df(df):
"""
Converts a sampled cuDF DataFrame into a list of tensors.

Args:
df (cudf.DataFrame): The sampled cuDF DataFrame containing columns
'batch_id', 'sources', 'destinations', 'edge_id', and 'hop_id'.

Returns:
list: A list of tuples, where each tuple contains three tensors:
'sources', 'destinations', and 'edge_id'.
The tensors are split based on 'batch_id' and 'hop_id'.

dict: A dictionary of tensors, keyed by batch_id and hop_id.
"""
if "map" in df.columns:
df, renumber_map, renumber_map_batch_indices = _get_renumber_map(df)
else:
renumber_map, renumber_map_batch_indices = None, None

batch_id_tensor = cast_to_tensor(df["batch_id"])
batch_id_min = batch_id_tensor.min()
batch_id_max = batch_id_tensor.max()
batch_indices = torch.arange(
start=batch_id_tensor.min() + 1,
end=batch_id_tensor.max() + 1,
start=batch_id_min + 1,
end=batch_id_max + 1,
device=batch_id_tensor.device,
)
batch_indices = torch.searchsorted(batch_id_tensor, batch_indices)

split_d = {}

for column in ["sources", "destinations", "edge_id", "hop_id"]:
if column in df.columns:
tensor = cast_to_tensor(df[column])
split_d[column] = torch.tensor_split(tensor, batch_indices.cpu())
# TODO: Fix below
# batch_indices = _get_id_tensor_boundaries(batch_id_tensor)
batch_indices = torch.searchsorted(batch_id_tensor, batch_indices).to("cpu")
split_d = {i: {} for i in range(batch_id_min, batch_id_max + 1)}

for column in df.columns:
if column != "batch_id":
t = cast_to_tensor(df[column])
split_t = _split_tensor(t, batch_indices)
for bid, batch_t in zip(split_d.keys(), split_t):
split_d[bid][column] = batch_t

if renumber_map is not None:
split_t = _split_tensor(renumber_map, renumber_map_batch_indices)
for bid, batch_t in zip(split_d.keys(), split_t):
split_d[bid]["map"] = batch_t
del df
result_tensor_d = {}
for batch_id, batch_d in split_d.items():
hop_id_tensor = batch_d["hop_id"]
hop_id_min = hop_id_tensor.min()
hop_id_max = hop_id_tensor.max()

result_tensor_ls = []
for i, hop_id_tensor in enumerate(split_d["hop_id"]):
hop_indices = torch.arange(
start=hop_id_tensor.min() + 1,
end=hop_id_tensor.max() + 1,
start=hop_id_min + 1,
end=hop_id_max + 1,
device=hop_id_tensor.device,
)
hop_indices = torch.searchsorted(hop_id_tensor, hop_indices)
s = torch.tensor_split(split_d["sources"][i], hop_indices.cpu())
d = torch.tensor_split(split_d["destinations"][i], hop_indices.cpu())
if "edge_id" in split_d:
eid = torch.tensor_split(split_d["edge_id"][i], hop_indices.cpu())
else:
eid = [None] * len(s)

result_tensor_ls.append((x, y, z) for x, y, z in zip(s, d, eid))

return result_tensor_ls
# TODO: Fix below
# hop_indices = _get_id_tensor_boundaries(hop_id_tensor)
hop_indices = torch.searchsorted(hop_id_tensor, hop_indices).to("cpu")
hop_split_d = {i: {} for i in range(hop_id_min, hop_id_max + 1)}
for column, t in batch_d.items():
if column not in ["hop_id", "map"]:
split_t = _split_tensor(t, hop_indices)
for hid, ht in zip(hop_split_d.keys(), split_t):
hop_split_d[hid][column] = ht

result_tensor_d[batch_id] = hop_split_d
if "map" in batch_d:
result_tensor_d[batch_id]["map"] = batch_d["map"]
return result_tensor_d


def create_homogeneous_sampled_graphs_from_dataframe(
Expand All @@ -87,72 +131,51 @@ def create_homogeneous_sampled_graphs_from_dataframe(
This helper function creates DGL MFGS for
homogeneous graphs from cugraph sampled dataframe
"""
result_tensor_ls = _get_tensor_ls_from_sampled_df(sampled_df)
result_tensor_d = _get_tensor_d_from_sampled_df(sampled_df)
del sampled_df
result_mfgs = [
_create_homogeneous_sampled_graphs_from_tensors_perhop(
tensors_perhop_ls, total_number_of_nodes, edge_dir
tensors_batch_d, total_number_of_nodes, edge_dir
)
for tensors_perhop_ls in result_tensor_ls
for tensors_batch_d in result_tensor_d.values()
]
del result_tensor_ls
del result_tensor_d
return result_mfgs


def _create_homogeneous_sampled_graphs_from_tensors_perhop(
tensors_perhop_ls, total_number_of_nodes, edge_dir
tensors_batch_d, total_number_of_nodes, edge_dir
):
if edge_dir not in ["in", "out"]:
raise ValueError(f"Invalid edge_dir {edge_dir} provided")
if edge_dir == "out":
raise ValueError("Outwards edges not supported yet")
graph_per_hop_ls = []
output_nodes = None
seed_nodes = None
for src_ids, dst_ids, edge_ids in tensors_perhop_ls:
# print("Creating block", flush=True)
block = create_homogeneous_dgl_block_from_tensors_ls(
src_ids=src_ids,
dst_ids=dst_ids,
edge_ids=edge_ids,
seed_nodes=seed_nodes,
total_number_of_nodes=total_number_of_nodes,
)
seed_nodes = block.srcdata[dgl.NID]
if output_nodes is None:
output_nodes = block.dstdata[dgl.NID]
graph_per_hop_ls.append(block)
for hop_id, tensor_per_hop_d in tensors_batch_d.items():
if hop_id != "map":
block = _create_homogeneous_dgl_block_from_tensor_d(tensor_per_hop_d)
graph_per_hop_ls.append(block)

# default DGL behavior
if edge_dir == "in":
graph_per_hop_ls.reverse()
return seed_nodes, output_nodes, graph_per_hop_ls

input_nodes = graph_per_hop_ls[-1].srcnodes()
output_nodes = graph_per_hop_ls[-1].dstnodes()
original_input_nodes = tensors_batch_d["map"][input_nodes]
original_output_nodes = tensors_batch_d["map"][output_nodes]
return original_input_nodes, original_output_nodes, graph_per_hop_ls

def create_homogeneous_dgl_block_from_tensors_ls(
src_ids: torch.Tensor,
dst_ids: torch.Tensor,
edge_ids: Optional[torch.Tensor],
seed_nodes: Optional[torch.Tensor],
total_number_of_nodes: int,

def _create_homogeneous_dgl_block_from_tensor_d(
tensor_d,
):
sampled_graph = dgl.graph(
(src_ids, dst_ids),
num_nodes=total_number_of_nodes,
)
if edge_ids is not None:
sampled_graph.edata[dgl.EID] = edge_ids
# TODO: Check if unique is needed
if seed_nodes is None:
seed_nodes = dst_ids.unique()
rs = tensor_d["sources"]
rd = tensor_d["destinations"]
block = dgl.create_block((rs, rd))
if "edge_id" in tensor_d:
block.edata[dgl.EID] = tensor_d["edge_id"]

block = dgl.to_block(
sampled_graph,
dst_nodes=seed_nodes,
src_nodes=src_ids.unique(),
include_dst_in_src=True,
)
if edge_ids is not None:
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
return block


Expand Down
34 changes: 34 additions & 0 deletions python/cugraph-dgl/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,47 @@

import cudf
import cupy as cp
import numpy as np
import torch

from cugraph_dgl.dataloading.utils.sampling_helpers import cast_to_tensor
from cugraph_dgl.dataloading.utils.sampling_helpers import _get_renumber_map


def test_casting_empty_array():
ar = cp.zeros(shape=0, dtype=cp.int32)
ser = cudf.Series(ar)
output_tensor = cast_to_tensor(ser)
assert output_tensor.dtype == torch.int32


def get_dummy_sampled_df():
df = cudf.DataFrame()
df["sources"] = [0, 0, 0, 0, 0, 0, np.nan, np.nan, np.nan]
df["destinations"] = [1, 2, 1, 2, 1, 2, np.nan, np.nan, np.nan]
df["batch_id"] = [0, 0, 1, 1, 2, 2, np.nan, np.nan, np.nan]
df["hop_id"] = [0, 1, 0, 1, 0, 1, np.nan, np.nan, np.nan]

df["map"] = [3, 6, 9, 10, 11, 12, 13, 14, 15]
df = df.astype("int32")
df["hop_id"] = df["hop_id"].astype("uint8")
return df


def test_get_renumber_map():
sampled_df = get_dummy_sampled_df()

df, renumber_map, renumber_map_batch_indices = _get_renumber_map(sampled_df)
# Ensure that map was dropped
assert "map" not in df.columns

expected_map = torch.as_tensor(
[10, 11, 12, 13, 14, 15], dtype=torch.int32, device="cuda"
)
assert torch.equal(renumber_map, expected_map)

expected_batch_indices = torch.as_tensor([3, 6], dtype=torch.int32, device="cuda")
assert torch.equal(renumber_map_batch_indices, expected_batch_indices)

# Ensure we dropped the Nans for rows corresponding to the renumber_map
assert len(df) == 6