Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW]Optimize cugraph-DGL csc codepath #3977

Merged
merged 12 commits into from
Nov 8, 2023
152 changes: 152 additions & 0 deletions benchmarks/cugraph-dgl/scale-benchmarks/cugraph_dgl_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Copyright (c) 2018-2023, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

os.environ["LIBCUDF_CUFILE_POLICY"] = "KVIKIO"
os.environ["KVIKIO_NTHREADS"] = "64"
os.environ["RAPIDS_NO_INITIALIZE"] = "1"
import json
import pandas as pd
import os
import time
from rmm.allocators.torch import rmm_torch_allocator
import rmm
import torch
from cugraph_dgl.dataloading import HomogenousBulkSamplerDataset
from model import run_1_epoch
from argparse import ArgumentParser
from load_graph_feats import load_node_labels, load_node_features


def create_dataloader(sampled_dir, total_num_nodes, sparse_format, return_type):
print("Creating dataloader", flush=True)
st = time.time()
dataset = HomogenousBulkSamplerDataset(
total_num_nodes,
edge_dir="in",
sparse_format=sparse_format,
return_type=return_type,
)

dataset.set_input_files(sampled_dir)
dataloader = torch.utils.data.DataLoader(
dataset, collate_fn=lambda x: x, shuffle=False, num_workers=0, batch_size=None
)
et = time.time()
print(f"Time to create dataloader = {et - st:.2f} seconds", flush=True)
return dataloader


def setup_common_pool():
rmm.reinitialize(initial_pool_size=5e9, pool_allocator=True)
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def main(args):
print(
f"Running cugraph-dgl dataloading benchmark with the following parameters:\n"
f"Dataset path = {args.dataset_path}\n"
f"Sampling path = {args.sampling_path}\n"
)
with open(os.path.join(args.dataset_path, "meta.json"), "r") as f:
input_meta = json.load(f)

sampled_dirs = [
os.path.join(args.sampling_path, f) for f in os.listdir(args.sampling_path)
]

time_ls = []
for sampled_dir in sampled_dirs:
with open(os.path.join(sampled_dir, "output_meta.json"), "r") as f:
sampled_meta_d = json.load(f)

replication_factor = sampled_meta_d["replication_factor"]
feat_load_st = time.time()
label_data = load_node_labels(
args.dataset_path, replication_factor, input_meta
)["paper"]["y"]
feat_data = feat_data = load_node_features(
args.dataset_path, replication_factor, node_type="paper"
)
print(
f"Feature and label data loading took = {time.time()-feat_load_st}",
flush=True,
)

r_time_ls = e2e_benchmark(sampled_dir, feat_data, label_data, sampled_meta_d)
[x.update({"replication_factor": replication_factor}) for x in r_time_ls]
[x.update({"num_edges": sampled_meta_d["total_num_edges"]}) for x in r_time_ls]
time_ls.extend(r_time_ls)

print(
f"Benchmark completed for replication factor = {replication_factor}\n{'=' * 30}",
flush=True,
)

df = pd.DataFrame(time_ls)
df.to_csv("cugraph_dgl_e2e_benchmark.csv", index=False)
print(f"Benchmark completed for all replication factors\n{'=' * 30}", flush=True)


def e2e_benchmark(
sampled_dir: str, feat: torch.Tensor, y: torch.Tensor, sampled_meta_d: dict
):
"""
Run the e2e_benchmark
Args:
sampled_dir: directory containing the sampled graph
feat: node features
y: node labels
sampled_meta_d: dictionary containing the sampled graph metadata
"""
time_ls = []

# TODO: Make this a parameter in bulk sampling script
sampled_meta_d["sparse_format"] = "csc"
sampled_dir = os.path.join(sampled_dir, "samples")
dataloader = create_dataloader(
sampled_dir,
sampled_meta_d["total_num_nodes"],
sampled_meta_d["sparse_format"],
return_type="cugraph_dgl.nn.SparseGraph",
)
time_d = run_1_epoch(
dataloader,
feat,
y,
fanout=sampled_meta_d["fanout"],
batch_size=sampled_meta_d["batch_size"],
model_backend="cugraph_dgl",
)
time_ls.append(time_d)
print("=" * 30)
return time_ls


def parse_arguments():
parser = ArgumentParser()
parser.add_argument(
"--dataset_path", type=str, default="/raid/vjawa/ogbn_papers100M/"
)
parser.add_argument(
"--sampling_path",
type=str,
default="/raid/vjawa/nov_1_bulksampling_benchmarks/",
)
return parser.parse_args()


if __name__ == "__main__":
setup_common_pool()
arguments = parse_arguments()
main(arguments)
17 changes: 11 additions & 6 deletions benchmarks/cugraph-dgl/scale-benchmarks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def create_model(feat_size, num_classes, num_layers, model_backend="dgl"):


def train_model(model, dataloader, opt, feat, y):
times = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
times_d = {key: 0 for key in ["mfg_creation", "feature", "m_fwd", "m_bkwd"]}
epoch_st = time.time()
mfg_st = time.time()
for input_nodes, output_nodes, blocks in dataloader:
times["mfg_creation"] += time.time() - mfg_st
times_d["mfg_creation"] += time.time() - mfg_st
if feat is not None:
fst = time.time()
input_nodes = input_nodes.to("cpu")
Expand All @@ -71,23 +71,24 @@ def train_model(model, dataloader, opt, feat, y):
output_nodes = output_nodes["paper"]
output_nodes = output_nodes.to(y.device)
y_batch = y[output_nodes].to("cuda")
times["feature"] += time.time() - fst
times_d["feature"] += time.time() - fst

m_fwd_st = time.time()
y_hat = model(blocks, input_feat)
times["m_fwd"] += time.time() - m_fwd_st
times_d["m_fwd"] += time.time() - m_fwd_st

m_bkwd_st = time.time()
loss = F.cross_entropy(y_hat, y_batch)
opt.zero_grad()
loss.backward()
opt.step()
times["m_bkwd"] += time.time() - m_bkwd_st
times_d["m_bkwd"] += time.time() - m_bkwd_st
mfg_st = time.time()

print(f"Epoch time = {time.time() - epoch_st:.2f} seconds")
print(f"Time to create MFG = {times_d['mfg_creation']:.2f} seconds")

return times
return times_d


def analyze_time(dataloader, times, epoch_time, fanout, batch_size):
Expand Down Expand Up @@ -119,6 +120,10 @@ def run_1_epoch(dataloader, feat, y, fanout, batch_size, model_backend):
else:
model = None
opt = None

# Warmup RUN
times = train_model(model, dataloader, opt, feat, y)

epoch_st = time.time()
times = train_model(model, dataloader, opt, feat, y)
epoch_time = time.time() - epoch_st
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
get_allocation_counts_dask_lazy,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oorliu ,

The only dgl specific args for our benchmarking efforts are:

                --reverse_edges \
                --sampling_target_framework cugraph_dgl_csr

sizeof_fmt,
get_peak_output_ratio_across_workers,
restart_client,
start_dask_client,
stop_dask_client,
enable_spilling,
Expand Down Expand Up @@ -187,10 +186,10 @@ def sample_graph(
output_path,
seed=42,
batch_size=500,
seeds_per_call=200000,
seeds_per_call=400000,
batches_per_partition=100,
fanout=[5, 5, 5],
persist=False,
sampling_kwargs={},
):
cupy.random.seed(seed)

Expand All @@ -204,6 +203,7 @@ def sample_graph(
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
log_level=logging.INFO,
**sampling_kwargs,
)

n_workers = len(default_client().scheduler_info()["workers"])
Expand Down Expand Up @@ -469,6 +469,7 @@ def benchmark_cugraph_bulk_sampling(
batch_size,
seeds_per_call,
fanout,
sampling_target_framework,
reverse_edges=True,
dataset_dir=".",
replication_factor=1,
Expand Down Expand Up @@ -564,17 +565,30 @@ def benchmark_cugraph_bulk_sampling(
output_sample_path = os.path.join(output_subdir, "samples")
os.makedirs(output_sample_path)

batches_per_partition = 200_000 // batch_size
if sampling_target_framework == "cugraph_dgl_csr":
sampling_kwargs = {
"deduplicate_sources": True,
"prior_sources_behavior": "carryover",
"renumber": True,
"compression": "CSR",
"compress_per_hop": True,
"use_legacy_names": False,
"include_hop_column": False,
}
else:
sampling_kwargs = {}
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved

batches_per_partition = 400_000 // batch_size
execution_time, allocation_counts = sample_graph(
G,
dask_label_df,
output_sample_path,
G=G,
label_df=dask_label_df,
output_path=output_sample_path,
seed=seed,
batch_size=batch_size,
seeds_per_call=seeds_per_call,
batches_per_partition=batches_per_partition,
fanout=fanout,
persist=persist,
sampling_kwargs=sampling_kwargs,
)

output_meta = {
Expand Down Expand Up @@ -701,7 +715,13 @@ def get_args():
required=False,
default=False,
)

parser.add_argument(
"--sampling_target_framework",
type=str,
help="The target framework for sampling (i.e. cugraph_dgl_csr, cugraph_pyg_csc, ...)",
required=False,
default=None,
)
parser.add_argument(
"--dask_worker_devices",
type=str,
Expand Down Expand Up @@ -738,6 +758,12 @@ def get_args():
logging.basicConfig()

args = get_args()
if args.sampling_target_framework not in ["cugraph_dgl_csr", None]:
raise ValueError(
"sampling_target_framework must be one of cugraph_dgl_csr or None",
"Other frameworks are not supported at this time.",
)

fanouts = [
[int(f) for f in fanout.split("_")] for fanout in args.fanouts.split(",")
]
Expand Down Expand Up @@ -785,6 +811,7 @@ def get_args():
batch_size=batch_size,
seeds_per_call=seeds_per_call,
fanout=fanout,
sampling_target_framework=args.sampling_target_framework,
dataset_dir=args.dataset_root,
reverse_edges=args.reverse_edges,
replication_factor=replication_factor,
Expand All @@ -809,7 +836,6 @@ def get_args():
warnings.warn("An Exception Occurred!")
print(e)
traceback.print_exc()
restart_client(client)
sleep(10)

stats_df = pd.DataFrame(
Expand Down
Loading