Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cuDF merge benchmark #867

Merged
merged 14 commits into from
Aug 10, 2022
151 changes: 118 additions & 33 deletions benchmarks/cudf-merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,37 @@
import argparse
import asyncio
import cProfile
import gc
import io
import os
import pickle
import pstats
import sys
from time import perf_counter as clock
from time import monotonic as clock

import cupy
import numpy as np

from dask.utils import format_bytes, format_time
import ucp
from ucp._libs.utils import (
format_bytes,
format_time,
print_multi,
print_separator,
)
from ucp.utils import hmean, run_on_local_network

import cudf
import rmm
# Must be set _before_ importing RAPIDS libraries (cuDF, RMM)
os.environ["RAPIDS_NO_INITIALIZE"] = "True"

import ucp
from ucp.utils import run_on_local_network

import cudf # noqa
import rmm # noqa


def sizeof_cudf_dataframe(df):
return int(
sum(col.memory_usage for col in df._data.columns) + df._index.memory_usage()
)


async def send_df(ep, df):
Expand Down Expand Up @@ -80,7 +95,12 @@ async def exchange_and_concat_bins(rank, eps, bins, timings=None):
if timings is not None:
t2 = clock()
timings.append(
(t2 - t1, sum([sys.getsizeof(b) for i, b in enumerate(bins) if i != rank]))
(
t2 - t1,
sum(
[sizeof_cudf_dataframe(b) for i, b in enumerate(bins) if i != rank]
),
)
)
return cudf.concat(ret)

Expand All @@ -91,7 +111,7 @@ async def distributed_join(args, rank, eps, left_table, right_table, timings=Non

left_df = await exchange_and_concat_bins(rank, eps, left_bins, timings)
right_df = await exchange_and_concat_bins(rank, eps, right_bins, timings)
return left_df.merge(right_df)
return left_df.merge(right_df, on="key")


def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):
Expand Down Expand Up @@ -157,11 +177,7 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match):

async def worker(rank, eps, args):
# Setting current device and make RMM use it
dev_id = args.devs[rank % len(args.devs)]
cupy.cuda.runtime.setDevice(dev_id)
rmm.reinitialize(
pool_allocator=True, devices=dev_id, initial_pool_size=args.rmm_init_pool_size
)
rmm.reinitialize(pool_allocator=True, initial_pool_size=args.rmm_init_pool_size)

# Make cupy use RMM
cupy.cuda.set_allocator(rmm.rmm_cupy_allocator)
Expand All @@ -170,8 +186,11 @@ async def worker(rank, eps, args):
df2 = generate_chunk(rank, args.chunk_size, args.n_chunks, "other", args.frac_match)

# Let's warmup and sync before benchmarking
await distributed_join(args, rank, eps, df1, df2)
await barrier(rank, eps)
for i in range(args.warmup_iter):
await distributed_join(args, rank, eps, df1, df2)
await barrier(rank, eps)
if args.collect_garbage:
gc.collect()

if args.cuda_profile:
cupy.cuda.profiler.start()
Expand All @@ -180,10 +199,37 @@ async def worker(rank, eps, args):
pr = cProfile.Profile()
pr.enable()

iter_results = {"bw": [], "wallclock": [], "throughput": [], "data_processed": []}
timings = []
t1 = clock()
await distributed_join(args, rank, eps, df1, df2, timings)
await barrier(rank, eps)
for i in range(args.iter):
iter_timings = []

iter_t = clock()
ret = await distributed_join(args, rank, eps, df1, df2, iter_timings)
await barrier(rank, eps)
iter_took = clock() - iter_t

# Ensure the number of matches falls within `args.frac_match` +/- 2%
expected_len = args.chunk_size * args.frac_match
expected_len_err = expected_len * 0.02
assert abs(len(ret) - expected_len) <= expected_len_err

if args.collect_garbage:
gc.collect()

iter_bw = sum(t[1] for t in iter_timings) / sum(t[0] for t in iter_timings)
iter_data_processed = len(df1) * sum([t.itemsize for t in df1.dtypes])
iter_data_processed += len(df2) * sum([t.itemsize for t in df2.dtypes])
iter_throughput = args.n_chunks * iter_data_processed / iter_took

iter_results["bw"].append(iter_bw)
iter_results["wallclock"].append(iter_took)
iter_results["throughput"].append(iter_throughput)
iter_results["data_processed"].append(iter_data_processed)

timings += iter_timings

took = clock() - t1

if args.profile:
Expand All @@ -195,14 +241,15 @@ async def worker(rank, eps, args):
if args.cuda_profile:
cupy.cuda.profiler.stop()

data_processed = len(df1) * sum([t.itemsize for t in df1.dtypes])
data_processed += len(df2) * sum([t.itemsize for t in df2.dtypes])
data_processed = len(df1) * sum([t.itemsize * args.iter for t in df1.dtypes])
data_processed += len(df2) * sum([t.itemsize * args.iter for t in df2.dtypes])

return {
"bw": sum(t[1] for t in timings) / sum(t[0] for t in timings),
"wallclock": took,
"throughput": args.n_chunks * data_processed / took,
"data_processed": data_processed,
"iter_results": iter_results,
}


Expand Down Expand Up @@ -260,6 +307,24 @@ def parse_args():
type=int,
help="Initial RMM pool size (default 1/2 total GPU memory)",
)
parser.add_argument(
"--collect-garbage",
default=False,
action="store_true",
help="Trigger Python garbage collection after each iteration.",
)
parser.add_argument(
"--iter",
default=1,
type=int,
help="Number of benchmark iterations.",
)
parser.add_argument(
"--warmup-iter",
default=5,
type=int,
help="Number of warmup iterations.",
)
args = parser.parse_args()
args.devs = [int(d) for d in args.devs.split(",")]
args.n_chunks = len(args.devs) * args.chunks_per_dev
Expand All @@ -282,24 +347,44 @@ def main():
worker,
worker_args=args,
server_address=args.server_address,
ensure_cuda_device=True,
)

wc = stats[0]["wallclock"]
bw = sum(s["bw"] for s in stats) / len(stats)
bw = hmean(np.array([s["bw"] for s in stats]))
tp = stats[0]["throughput"]
dp = sum(s["data_processed"] for s in stats)

print("cudf merge benchmark")
print("----------------------------")
print(f"device(s) | {args.devs}")
print(f"chunks-per-dev | {args.chunks_per_dev}")
print(f"rows-per-chunk | {args.chunk_size}")
print(f"data-processed | {format_bytes(dp)}")
print(f"frac-match | {args.frac_match}")
print("============================")
print(f"Wall-clock | {format_time(wc)}")
print(f"Bandwidth | {format_bytes(bw)}/s")
print(f"Throughput | {format_bytes(tp)}/s")
dp_iter = sum(s["iter_results"]["data_processed"][0] for s in stats)

print("cuDF merge benchmark")
print_separator(separator="-", length=110)
print_multi(values=["Device(s)", f"{args.devs}"])
print_multi(values=["Chunks per device", f"{args.chunks_per_dev}"])
print_multi(values=["Rows per chunk", f"{args.chunk_size}"])
print_multi(values=["Total data processed", f"{format_bytes(dp)}"])
print_multi(values=["Data processed per iter", f"{format_bytes(dp_iter)}"])
print_multi(values=["Row matching fraction", f"{args.frac_match}"])
print_separator(separator="=", length=110)
print_multi(values=["Wall-clock", f"{format_time(wc)}"])
print_multi(values=["Bandwidth", f"{format_bytes(bw)}/s"])
print_multi(values=["Throughput", f"{format_bytes(tp)}/s"])
print_separator(separator="=", length=110)
print_multi(values=["Run", "Wall-clock", "Bandwidth", "Throughput"])
for i in range(args.iter):
iter_results = stats[0]["iter_results"]

iter_wc = iter_results["wallclock"][i]
iter_bw = hmean(np.array([s["iter_results"]["bw"][i] for s in stats]))
iter_tp = iter_results["throughput"][i]

print_multi(
values=[
i,
f"{format_time(iter_wc)}",
f"{format_bytes(iter_bw)}/s",
f"{format_bytes(iter_tp)}/s",
]
)


if __name__ == "__main__":
Expand Down
22 changes: 21 additions & 1 deletion ucp/_libs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ def nvtx_annotate(message=None, color=None, domain=None):


try:
from dask.utils import format_bytes, parse_bytes
from dask.utils import format_bytes, format_time, parse_bytes
except ImportError:

def format_time(x):
if x < 1e-6:
return f"{x * 1e9:.3f} ns"
if x < 1e-3:
return f"{x * 1e6:.3f} us"
if x < 1:
return f"{x * 1e3:.3f} ms"
else:
return f"{x:.3f} s"

def format_bytes(x):
"""Return formatted string in B, KiB, MiB, GiB or TiB"""
if x < 1024:
Expand All @@ -40,3 +50,13 @@ def print_separator(separator="-", length=80):
def print_key_value(key, value, key_length=25):
"""Print a key and value with fixed key-field length"""
print(f"{key: <{key_length}} | {value}")


def print_multi(values, key_length=25):
"""Print a key and value with fixed key-field length"""
assert isinstance(values, tuple) or isinstance(values, list)
assert len(values) > 1

print_str = "".join(f"{s: <{key_length}} | " for s in values[:-1])
print_str += values[-1]
print(print_str)
46 changes: 44 additions & 2 deletions ucp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,28 @@ def filter(self, record):
return logger


def _ensure_cuda_device(devs, rank):
import numba.cuda

dev_id = devs[rank % len(devs)]
os.environ["CUDA_VISIBLE_DEVICES"] = str(dev_id)
numba.cuda.current_context()


# Help function used by `run_on_local_network()`
def _worker_process(
queue, rank, server_address, n_workers, ucx_options_list, func, args
queue,
rank,
server_address,
n_workers,
ucx_options_list,
ensure_cuda_device,
func,
args,
):
if ensure_cuda_device is True:
_ensure_cuda_device(args.devs, rank)

import ucp

if ucx_options_list is not None:
Expand Down Expand Up @@ -115,7 +133,12 @@ async def server_handler(ep):


def run_on_local_network(
n_workers, worker_func, worker_args=None, server_address=None, ucx_options_list=None
n_workers,
worker_func,
worker_args=None,
server_address=None,
ucx_options_list=None,
ensure_cuda_device=False,
):
"""
Creates a local UCX network of `n_workers` that runs `worker_func`
Expand All @@ -136,6 +159,16 @@ def run_on_local_network(
Server address for the workers. If None, ucx_api.get_address() is used.
ucx_options_list: list of dict
Options to pass to UCX when initializing workers, one for each worker.
ensure_cuda_device: bool
If `True`, sets the `CUDA_VISIBLE_DEVICES` environment variable to match
the proper CUDA device based on the worker's rank and create the CUDA
context on the corresponding device before calling `import ucp` for the
first time on the newly-spawned worker process, otherwise continues
without modifying `CUDA_VISIBLE_DEVICES` and creating a CUDA context.
Please note that having this set to `False` may cause all workers to use
device 0 and will not ensure proper InfiniBand<->GPU mapping on UCX,
potentially leading to low performance as GPUDirectRDMA will not be
active.

Returns
-------
Expand All @@ -156,6 +189,7 @@ def run_on_local_network(
server_address,
n_workers,
ucx_options_list,
ensure_cuda_device,
worker_func,
worker_args,
),
Expand All @@ -182,3 +216,11 @@ def hash64bits(*args):
h = hashlib.sha1(bytes(repr(args), "utf-8")).hexdigest()[:16]
# Convert to an integer and return
return int(h, 16)


def hmean(a):
"""Harmonic mean"""
if len(a):
return 1 / np.mean(1 / a)
else:
return 0