Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds benchmarks for nx-cugraph #3854

Merged
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a63d350
Adds initial benchmarks for cugraph-nx, still WIP.
rlratzel Aug 24, 2023
87045be
Adds benchmarks specifically for larger datasets that use the k param…
rlratzel Aug 26, 2023
c08d603
Update the docstrings of the similarity algorithms (#3817)
jnke2016 Aug 25, 2023
3448b96
Use `copy-pr-bot` (#3827)
ajschmidt8 Aug 29, 2023
eb7de38
Disable mg tests (#3833)
naimnv Aug 30, 2023
0b79ea3
Fix OD shortest distance matrix computation test failures. (#3813)
seunghwak Aug 30, 2023
3c8a8c6
Remove legacy betweenness centrality (#3829)
jnke2016 Aug 30, 2023
b496254
Update README.md (#3826)
lmeyerov Aug 30, 2023
f270817
Add `louvain_communities` to cugraph-nx (#3803)
eriknw Aug 31, 2023
d909d8d
[BUG] Fix Batch Renumbering of Empty Batches (#3823)
alexbarghi-nv Aug 31, 2023
c0df6e2
Simplify wheel build scripts and allow alphas of RAPIDS dependencies …
vyasr Aug 31, 2023
262e281
Remove Deprecated Sampling Options (#3816)
alexbarghi-nv Sep 1, 2023
ccc8653
Use new `raft::compiled_static` targets (#3842)
divyegala Sep 6, 2023
7c5f38b
[IMP] Add ability to get batch size from the loader in cuGraph-PyG (#…
alexbarghi-nv Sep 6, 2023
80b7ae0
Rename `cugraph-nx` to `nx-cugraph` (#3840)
eriknw Sep 6, 2023
a24341d
Migrate upstream models to `cugraph-pyg` (#3763)
tingyu66 Sep 6, 2023
5574eb6
Expose threshold in louvain (#3792)
ChuckHastings Sep 6, 2023
a2972b5
Remove the assumption made on the client data's keys (#3835)
jnke2016 Sep 7, 2023
4b18ddf
Adding metadata getter methods to datasets API (#3821)
nv-rliu Sep 8, 2023
6282965
Uses `conda mambabuild` rather than `mamba mambabuild` (#3853)
rlratzel Sep 8, 2023
5babc71
Merge remote-tracking branch 'upstream/branch-23.10' into branch-23.1…
rlratzel Sep 8, 2023
c3194c4
Renames dir to nx-cugraph for consistency with new package name.
rlratzel Sep 8, 2023
96dc4f2
Merge remote-tracking branch 'upstream/branch-23.10' into branch-23.1…
rlratzel Sep 26, 2023
5091a8b
Merge remote-tracking branch 'upstream/branch-23.10' into branch-23.1…
rlratzel Sep 27, 2023
9248af0
Adds benchmark for louvain using small graphs, adds support for Netwo…
rlratzel Sep 27, 2023
15b28ee
Adds benchmark for louvain_communities using medium size graphs.
rlratzel Sep 27, 2023
c458918
Removed unused imports, adds comment describing fixture args (ids, et…
rlratzel Sep 28, 2023
eb703ac
Merge remote-tracking branch 'upstream/branch-23.10' into branch-23.1…
rlratzel Sep 28, 2023
ca5e255
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
rlratzel Sep 28, 2023
1232897
Removes FIXME, minor code cleanup.
rlratzel Sep 28, 2023
880ff6b
Initial black run on benchmark files.
rlratzel Sep 29, 2023
ade2937
Merge branch 'branch-23.10-cugraph_nx_benchmarks' of github.com:rlrat…
trxcllnt Sep 29, 2023
8295fd3
bump versions 23.10 -> 23.12
trxcllnt Sep 29, 2023
c8c8dc0
leave one thread free
trxcllnt Sep 29, 2023
f845799
Merge branch 'branch-23.12' of github.com:rapidsai/cugraph into rlrat…
trxcllnt Sep 29, 2023
7d6b350
fix dependencies.yaml merge from 23.10 -> 23.12
trxcllnt Sep 29, 2023
3fdb9ba
update ucx-py, dask, and distributed versions
trxcllnt Sep 29, 2023
c827b81
separate CUDA suffixes for pylibcugraphops
trxcllnt Sep 29, 2023
2ec390e
Merge branch 'branch-23.12' of github.com:rapidsai/cugraph into rlrat…
trxcllnt Sep 29, 2023
c00c3f1
WholeGraph Feature Store for cuGraph-PyG and cuGraph-DGL (#3874)
alexbarghi-nv Sep 30, 2023
dc87455
increase timeout
jnke2016 Oct 3, 2023
a863835
Integrate renumbering and compression to `cugraph-dgl` to accelerate …
tingyu66 Oct 3, 2023
a564957
add env var to the wheel run
jnke2016 Oct 3, 2023
3f1547f
Merge remote-tracking branch 'upstream/branch-23.10' into branch-23.1…
jnke2016 Oct 3, 2023
74c63fb
increase timeouts
jnke2016 Oct 4, 2023
8f00bbd
Adds k as a param for BC bench.
rlratzel Oct 6, 2023
9955b7c
Merge branch 'branch-23.10-cugraph_nx_benchmarks' of https://github.c…
rlratzel Oct 6, 2023
bf74a73
Merge remote-tracking branch 'upstream/branch-23.12' into branch-23.1…
rlratzel Oct 6, 2023
cbdbd8a
Merge remote-tracking branch 'jnke2016/branch-23.10_increase-timeout'…
rlratzel Oct 6, 2023
80b9cf2
Updates pylibwholegraph version to 23.12.
rlratzel Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
- id: black
language_version: python3
args: [--target-version=py38]
files: ^python/
files: ^(python/.*|benchmarks/.*)$
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
hooks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pytest
import numpy as np
import cupy as cp

# Facing issues with rapids-pytest-benchmark plugin
# pytest-benchmark.
import pytest_benchmark
Expand All @@ -33,6 +34,7 @@
import dgl
import torch
import rmm

_seed = 42


Expand Down Expand Up @@ -71,29 +73,31 @@ def create_graph(graph_data):
else:
raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}")

num_nodes = max(edgelist_df['src'].max(),
edgelist_df['dst'].max())+1
num_nodes = max(edgelist_df["src"].max(), edgelist_df["dst"].max()) + 1

num_nodes_dict = {'_N':num_nodes}
num_nodes_dict = {"_N": num_nodes}

gs = CuGraphStorage(num_nodes_dict=num_nodes_dict, single_gpu=True)
gs.add_edge_data(edgelist_df,
# reverse to make same graph as cugraph
node_col_names=['dst', 'src'],
canonical_etype=['_N', 'connects', '_N'])
gs.add_edge_data(
edgelist_df,
# reverse to make same graph as cugraph
node_col_names=["dst", "src"],
canonical_etype=["_N", "connects", "_N"],
)

return gs



def create_mg_graph(graph_data):
"""
Create a graph instance based on the data to be loaded/generated.
"""
# range starts at 1 to let let 0 be used by benchmark/client process
visible_devices = os.getenv("DASK_WORKER_DEVICES", "1,2,3,4")

cluster = LocalCUDACluster(protocol='ucx', rmm_pool_size='25GB', CUDA_VISIBLE_DEVICES=visible_devices)
cluster = LocalCUDACluster(
protocol="ucx", rmm_pool_size="25GB", CUDA_VISIBLE_DEVICES=visible_devices
)
client = Client(cluster)
Comms.initialize(p2p=True)
rmm.reinitialize(pool_allocator=True)
Expand Down Expand Up @@ -126,25 +130,23 @@ def create_mg_graph(graph_data):
else:
raise TypeError(f"graph_data can only be str or dict, got {type(graph_data)}")

num_nodes = max(edgelist_df['src'].max().compute(),
edgelist_df['dst'].max().compute())
num_nodes = max(
edgelist_df["src"].max().compute(), edgelist_df["dst"].max().compute()
)

# running into issues with smaller partitions
edgelist_df = edgelist_df.repartition(npartitions=edgelist_df.npartitions*2)
edgelist_df = edgelist_df.repartition(npartitions=edgelist_df.npartitions * 2)

num_nodes_dict = {'_N':num_nodes}
num_nodes_dict = {"_N": num_nodes}

gs = CuGraphStorage(num_nodes_dict=num_nodes_dict, single_gpu=False)
gs.add_edge_data(edgelist_df,
node_col_names=['dst', 'src'],
canonical_etype=['_N', 'C', '_N'])
gs = CuGraphStorage(num_nodes_dict=num_nodes_dict, single_gpu=False)
gs.add_edge_data(
edgelist_df, node_col_names=["dst", "src"], canonical_etype=["_N", "C", "_N"]
)
return (gs, client, cluster)



def get_uniform_neighbor_sample_args(
G, seed, batch_size, fanout, with_replacement
):
def get_uniform_neighbor_sample_args(G, seed, batch_size, fanout, with_replacement):
"""
Return a dictionary containing the args for uniform_neighbor_sample based
on the graph and desired args passed in. For example, if a large start list
Expand All @@ -165,7 +167,7 @@ def get_uniform_neighbor_sample_args(
else:
num_start_verts = batch_size

srcs = G.graphstore.gdata.get_edge_data()['_SRC_']
srcs = G.graphstore.gdata.get_edge_data()["_SRC_"]
start_list = srcs.head(num_start_verts)
assert len(start_list) == num_start_verts

Expand Down Expand Up @@ -205,7 +207,6 @@ def graph_objs(request):
dask_cluster.close()



################################################################################
# Benchmarks
@pytest.mark.parametrize("batch_size", params.batch_sizes.values())
Expand All @@ -223,15 +224,15 @@ def bench_cugraph_dgl_uniform_neighbor_sample(

# Reverse to match cugraph
# DGL does from dst to src
fanout_val = uns_args['fanout']
fanout_val = uns_args["fanout"]
fanout_val.reverse()
sampler = dgl.dataloading.NeighborSampler(uns_args["fanout"])
sampler_f = sampler.sample_blocks

# Warmup
_ = sampler_f(g=G, seed_nodes=uns_args["seed_nodes"])
# print(f"\n{uns_args}")
result_seed_nodes, output_nodes, blocks = benchmark(
result_seed_nodes, output_nodes, blocks = benchmark(
sampler_f,
g=G,
seed_nodes=uns_args["seed_nodes"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,9 @@ def load_edges_from_disk(parquet_path, replication_factor, input_meta):
src_ls = [ei["src"]]
dst_ls = [ei["dst"]]
for r in range(1, replication_factor):
new_src = ei["src"] + (
r * input_meta["num_nodes"][can_edge_type[0]]
)
new_src = ei["src"] + (r * input_meta["num_nodes"][can_edge_type[0]])
src_ls.append(new_src)
new_dst = ei["dst"] + (
r * input_meta["num_nodes"][can_edge_type[2]]
)
new_dst = ei["dst"] + (r * input_meta["num_nodes"][can_edge_type[2]])
dst_ls.append(new_dst)

ei["src"] = torch.cat(src_ls).contiguous()
Expand Down Expand Up @@ -92,16 +88,11 @@ def load_node_labels(dataset_path, replication_factor, input_meta):
]
),
"label": pd.concat(
[
node_label.label
for r in range(1, replication_factor)
]
[node_label.label for r in range(1, replication_factor)]
),
}
)
node_label = pd.concat([node_label, dfr]).reset_index(
drop=True
)
node_label = pd.concat([node_label, dfr]).reset_index(drop=True)

node_label_tensor = torch.full(
(num_nodes_dict[node_type],), -1, dtype=torch.float32
Expand Down Expand Up @@ -133,9 +124,7 @@ def create_dgl_graph_from_disk(dataset_path, replication_factor=1):
input_meta = json.load(f)

parquet_path = os.path.join(dataset_path, "parquet")
graph_data = load_edges_from_disk(
parquet_path, replication_factor, input_meta
)
graph_data = load_edges_from_disk(parquet_path, replication_factor, input_meta)
node_data = load_node_labels(dataset_path, replication_factor, input_meta)
g = dgl.heterograph(graph_data)

Expand All @@ -154,7 +143,7 @@ def create_dataloader(g, train_idx, batch_size, fanouts, use_uva):
Returns:
DGLGraph: DGLGraph with the loaded dataset.
"""

print("Creating dataloader", flush=True)
st = time.time()
if use_uva:
Expand Down Expand Up @@ -220,21 +209,21 @@ def dataloading_benchmark(g, train_idx, fanouts, batch_sizes, use_uva):
print("==============================================")
return time_ls


def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)


if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/datasets/abarghi/ogbn_papers100M"
)
parser.add_argument("--replication_factors", type=str, default="1,2,4,8")
parser.add_argument(
"--fanouts", type=str, default="25_25,10_10_10,5_10_20"
)
parser.add_argument("--fanouts", type=str, default="25_25,10_10_10,5_10_20")
parser.add_argument("--batch_sizes", type=str, default="512,1024")
parser.add_argument("--do_not_use_uva", action="store_true")
parser.add_argument("--seed", type=int, default=42)
Expand Down Expand Up @@ -267,22 +256,16 @@ def set_seed(seed):
et = time.time()
print(f"Replication factor = {replication_factor}")
print(
f"G has {g.num_edges()} edges and took",
f" {et - st:.2f} seconds to load"
f"G has {g.num_edges()} edges and took", f" {et - st:.2f} seconds to load"
)
train_idx = {"paper": node_data["paper"]["train_idx"]}
r_time_ls = dataloading_benchmark(
g, train_idx, fanouts, batch_sizes, use_uva=use_uva
)
print(
"Benchmark completed for replication factor = ", replication_factor
)
print("Benchmark completed for replication factor = ", replication_factor)
print("==============================================")
# Add replication factor to the time list
[
x.update({"replication_factor": replication_factor})
for x in r_time_ls
]
[x.update({"replication_factor": replication_factor}) for x in r_time_ls]
time_ls.extend(r_time_ls)

df = pd.DataFrame(time_ls)
Expand Down
89 changes: 51 additions & 38 deletions benchmarks/cugraph-dgl/python-script/ogbn_mag_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
import torch.nn.functional as F
import time
import argparse

## DGL Specific Import
from cugraph_dgl import c


def load_dgl_graph():
from ogb.nodeproppred import DglNodePropPredDataset

dataset = DglNodePropPredDataset(name="ogbn-mag", root='/datasets/vjawa/gnn/')
dataset = DglNodePropPredDataset(name="ogbn-mag", root="/datasets/vjawa/gnn/")
split_idx = dataset.get_idx_split()
g, labels = dataset[0]
g, labels = dataset[0]
# Uncomment for obgn-mag
labels = labels["paper"].flatten()
# labels = labels
Expand All @@ -33,69 +35,80 @@ def load_dgl_graph():
return g, labels, dataset.num_classes, split_idx


def sampling_func(g, seed_nodes,labels, train_loader):
def sampling_func(g, seed_nodes, labels, train_loader):
category = "paper"
for input_nodes, seeds, blocks in train_loader:
seeds = seeds[category]
feat = blocks[0].srcdata['feat']['paper']
feat = blocks[0].srcdata["feat"]["paper"]
return None


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Benchmark Sampling')
parser.add_argument('--batch_size', type=int, default=100_000)
parser.add_argument('--use_cugraph',dest='use_cugraph', action='store_true', default=True)
parser.add_argument('--use_dgl_upstream', dest='use_cugraph', action='store_false')
parser.add_argument('--single_gpu', dest='single_gpu', action='store_true', default=True)
parser.add_argument('--multi_gpu', dest='single_gpu', action='store_false')
parser.add_argument('--n_gpus', type=int, default=1)

parser = argparse.ArgumentParser(description="Benchmark Sampling")
parser.add_argument("--batch_size", type=int, default=100_000)
parser.add_argument(
"--use_cugraph", dest="use_cugraph", action="store_true", default=True
)
parser.add_argument("--use_dgl_upstream", dest="use_cugraph", action="store_false")
parser.add_argument(
"--single_gpu", dest="single_gpu", action="store_true", default=True
)
parser.add_argument("--multi_gpu", dest="single_gpu", action="store_false")
parser.add_argument("--n_gpus", type=int, default=1)

args = parser.parse_args()
print(args, flush=True)

single_gpu = args.single_gpu
use_cugraph = args.use_cugraph
batch_size = args.batch_size
n_gpus = args.n_gpus

if single_gpu:
import rmm
rmm.reinitialize(pool_allocator=True,initial_pool_size=5e+9, maximum_pool_size=22e+9)

rmm.reinitialize(
pool_allocator=True, initial_pool_size=5e9, maximum_pool_size=22e9
)
else:
#### Dask Cluster
from dask_cuda import LocalCUDACluster
from cugraph.dask.comms import comms as Comms
from dask.distributed import Client

#Change according to your GPUS
#Client at GPU-0
#Workers at specifed GPUS
#UCX seems to be freezing :-0 on DGX
cuda_visible_devices = ','.join([str(i) for i in range(1,n_gpus+1)])
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES=cuda_visible_devices, protocol='tcp', rmm_pool_size='12 GB',
jit_unspill=True)
# Change according to your GPUS
# Client at GPU-0
# Workers at specifed GPUS
# UCX seems to be freezing :-0 on DGX
cuda_visible_devices = ",".join([str(i) for i in range(1, n_gpus + 1)])
cluster = LocalCUDACluster(
CUDA_VISIBLE_DEVICES=cuda_visible_devices,
protocol="tcp",
rmm_pool_size="12 GB",
jit_unspill=True,
)
client = Client(cluster)
Comms.initialize(p2p=True)


device = 'cuda'

device = "cuda"
g, labels, num_classes, split_idx = load_dgl_graph()
g = g.to(device)

if use_cugraph:
if not single_gpu:
g = g.int()
g = cugraph_storage_from_heterograph(g, single_gpu=single_gpu)


indx_type = g.idtype
subset_split_idx = {'train': {k: v.to(device).to(g.idtype) for k,v in split_idx['train'].items()},
'valid' : {k: v.to(device).to(g.idtype) for k,v in split_idx['valid'].items()},
'test' : {k: v.to(device).to(g.idtype) for k,v in split_idx['test'].items()},
}

indx_type = g.idtype
subset_split_idx = {
"train": {k: v.to(device).to(g.idtype) for k, v in split_idx["train"].items()},
"valid": {k: v.to(device).to(g.idtype) for k, v in split_idx["valid"].items()},
"test": {k: v.to(device).to(g.idtype) for k, v in split_idx["test"].items()},
}

sampler = dgl.dataloading.MultiLayerNeighborSampler([20,25], prefetch_node_feats={'paper':['feat']})
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[20, 25], prefetch_node_feats={"paper": ["feat"]}
)
train_loader = dgl.dataloading.DataLoader(
g,
subset_split_idx["train"],
Expand All @@ -107,10 +120,10 @@ def sampling_func(g, seed_nodes,labels, train_loader):
)

### Warmup RUN
sampling_func(g, subset_split_idx['train'],labels, train_loader)
sampling_func(g, subset_split_idx["train"], labels, train_loader)

### Benchmarking RUN
st = time.time()
sampling_func(g, subset_split_idx['train'],labels, train_loader)
et = time.time()
print(f"Sampling time taken = {et-st} s")
sampling_func(g, subset_split_idx["train"], labels, train_loader)
et = time.time()
print(f"Sampling time taken = {et-st} s")
Loading
Loading