diff --git a/benchmarks/cudf-merge.py b/benchmarks/cudf-merge.py index c0abe9aa0..83f367ea2 100644 --- a/benchmarks/cudf-merge.py +++ b/benchmarks/cudf-merge.py @@ -4,22 +4,37 @@ import argparse import asyncio import cProfile +import gc import io +import os import pickle import pstats -import sys -from time import perf_counter as clock +from time import monotonic as clock import cupy import numpy as np -from dask.utils import format_bytes, format_time +import ucp +from ucp._libs.utils import ( + format_bytes, + format_time, + print_multi, + print_separator, +) +from ucp.utils import hmean, run_on_local_network -import cudf -import rmm +# Must be set _before_ importing RAPIDS libraries (cuDF, RMM) +os.environ["RAPIDS_NO_INITIALIZE"] = "True" -import ucp -from ucp.utils import run_on_local_network + +import cudf # noqa +import rmm # noqa + + +def sizeof_cudf_dataframe(df): + return int( + sum(col.memory_usage for col in df._data.columns) + df._index.memory_usage() + ) async def send_df(ep, df): @@ -80,7 +95,12 @@ async def exchange_and_concat_bins(rank, eps, bins, timings=None): if timings is not None: t2 = clock() timings.append( - (t2 - t1, sum([sys.getsizeof(b) for i, b in enumerate(bins) if i != rank])) + ( + t2 - t1, + sum( + [sizeof_cudf_dataframe(b) for i, b in enumerate(bins) if i != rank] + ), + ) ) return cudf.concat(ret) @@ -91,7 +111,7 @@ async def distributed_join(args, rank, eps, left_table, right_table, timings=Non left_df = await exchange_and_concat_bins(rank, eps, left_bins, timings) right_df = await exchange_and_concat_bins(rank, eps, right_bins, timings) - return left_df.merge(right_df) + return left_df.merge(right_df, on="key") def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): @@ -157,11 +177,7 @@ def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match): async def worker(rank, eps, args): # Setting current device and make RMM use it - dev_id = args.devs[rank % len(args.devs)] - cupy.cuda.runtime.setDevice(dev_id) - rmm.reinitialize( - pool_allocator=True, devices=dev_id, initial_pool_size=args.rmm_init_pool_size - ) + rmm.reinitialize(pool_allocator=True, initial_pool_size=args.rmm_init_pool_size) # Make cupy use RMM cupy.cuda.set_allocator(rmm.rmm_cupy_allocator) @@ -170,8 +186,11 @@ async def worker(rank, eps, args): df2 = generate_chunk(rank, args.chunk_size, args.n_chunks, "other", args.frac_match) # Let's warmup and sync before benchmarking - await distributed_join(args, rank, eps, df1, df2) - await barrier(rank, eps) + for i in range(args.warmup_iter): + await distributed_join(args, rank, eps, df1, df2) + await barrier(rank, eps) + if args.collect_garbage: + gc.collect() if args.cuda_profile: cupy.cuda.profiler.start() @@ -180,10 +199,37 @@ async def worker(rank, eps, args): pr = cProfile.Profile() pr.enable() + iter_results = {"bw": [], "wallclock": [], "throughput": [], "data_processed": []} timings = [] t1 = clock() - await distributed_join(args, rank, eps, df1, df2, timings) - await barrier(rank, eps) + for i in range(args.iter): + iter_timings = [] + + iter_t = clock() + ret = await distributed_join(args, rank, eps, df1, df2, iter_timings) + await barrier(rank, eps) + iter_took = clock() - iter_t + + # Ensure the number of matches falls within `args.frac_match` +/- 2% + expected_len = args.chunk_size * args.frac_match + expected_len_err = expected_len * 0.02 + assert abs(len(ret) - expected_len) <= expected_len_err + + if args.collect_garbage: + gc.collect() + + iter_bw = sum(t[1] for t in iter_timings) / sum(t[0] for t in iter_timings) + iter_data_processed = len(df1) * sum([t.itemsize for t in df1.dtypes]) + iter_data_processed += len(df2) * sum([t.itemsize for t in df2.dtypes]) + iter_throughput = args.n_chunks * iter_data_processed / iter_took + + iter_results["bw"].append(iter_bw) + iter_results["wallclock"].append(iter_took) + iter_results["throughput"].append(iter_throughput) + iter_results["data_processed"].append(iter_data_processed) + + timings += iter_timings + took = clock() - t1 if args.profile: @@ -195,14 +241,15 @@ async def worker(rank, eps, args): if args.cuda_profile: cupy.cuda.profiler.stop() - data_processed = len(df1) * sum([t.itemsize for t in df1.dtypes]) - data_processed += len(df2) * sum([t.itemsize for t in df2.dtypes]) + data_processed = len(df1) * sum([t.itemsize * args.iter for t in df1.dtypes]) + data_processed += len(df2) * sum([t.itemsize * args.iter for t in df2.dtypes]) return { "bw": sum(t[1] for t in timings) / sum(t[0] for t in timings), "wallclock": took, "throughput": args.n_chunks * data_processed / took, "data_processed": data_processed, + "iter_results": iter_results, } @@ -260,6 +307,24 @@ def parse_args(): type=int, help="Initial RMM pool size (default 1/2 total GPU memory)", ) + parser.add_argument( + "--collect-garbage", + default=False, + action="store_true", + help="Trigger Python garbage collection after each iteration.", + ) + parser.add_argument( + "--iter", + default=1, + type=int, + help="Number of benchmark iterations.", + ) + parser.add_argument( + "--warmup-iter", + default=5, + type=int, + help="Number of warmup iterations.", + ) args = parser.parse_args() args.devs = [int(d) for d in args.devs.split(",")] args.n_chunks = len(args.devs) * args.chunks_per_dev @@ -282,24 +347,44 @@ def main(): worker, worker_args=args, server_address=args.server_address, + ensure_cuda_device=True, ) wc = stats[0]["wallclock"] - bw = sum(s["bw"] for s in stats) / len(stats) + bw = hmean(np.array([s["bw"] for s in stats])) tp = stats[0]["throughput"] dp = sum(s["data_processed"] for s in stats) - - print("cudf merge benchmark") - print("----------------------------") - print(f"device(s) | {args.devs}") - print(f"chunks-per-dev | {args.chunks_per_dev}") - print(f"rows-per-chunk | {args.chunk_size}") - print(f"data-processed | {format_bytes(dp)}") - print(f"frac-match | {args.frac_match}") - print("============================") - print(f"Wall-clock | {format_time(wc)}") - print(f"Bandwidth | {format_bytes(bw)}/s") - print(f"Throughput | {format_bytes(tp)}/s") + dp_iter = sum(s["iter_results"]["data_processed"][0] for s in stats) + + print("cuDF merge benchmark") + print_separator(separator="-", length=110) + print_multi(values=["Device(s)", f"{args.devs}"]) + print_multi(values=["Chunks per device", f"{args.chunks_per_dev}"]) + print_multi(values=["Rows per chunk", f"{args.chunk_size}"]) + print_multi(values=["Total data processed", f"{format_bytes(dp)}"]) + print_multi(values=["Data processed per iter", f"{format_bytes(dp_iter)}"]) + print_multi(values=["Row matching fraction", f"{args.frac_match}"]) + print_separator(separator="=", length=110) + print_multi(values=["Wall-clock", f"{format_time(wc)}"]) + print_multi(values=["Bandwidth", f"{format_bytes(bw)}/s"]) + print_multi(values=["Throughput", f"{format_bytes(tp)}/s"]) + print_separator(separator="=", length=110) + print_multi(values=["Run", "Wall-clock", "Bandwidth", "Throughput"]) + for i in range(args.iter): + iter_results = stats[0]["iter_results"] + + iter_wc = iter_results["wallclock"][i] + iter_bw = hmean(np.array([s["iter_results"]["bw"][i] for s in stats])) + iter_tp = iter_results["throughput"][i] + + print_multi( + values=[ + i, + f"{format_time(iter_wc)}", + f"{format_bytes(iter_bw)}/s", + f"{format_bytes(iter_tp)}/s", + ] + ) if __name__ == "__main__": diff --git a/ucp/_libs/utils.py b/ucp/_libs/utils.py index 6466ec110..137aeb424 100644 --- a/ucp/_libs/utils.py +++ b/ucp/_libs/utils.py @@ -13,9 +13,19 @@ def nvtx_annotate(message=None, color=None, domain=None): try: - from dask.utils import format_bytes, parse_bytes + from dask.utils import format_bytes, format_time, parse_bytes except ImportError: + def format_time(x): + if x < 1e-6: + return f"{x * 1e9:.3f} ns" + if x < 1e-3: + return f"{x * 1e6:.3f} us" + if x < 1: + return f"{x * 1e3:.3f} ms" + else: + return f"{x:.3f} s" + def format_bytes(x): """Return formatted string in B, KiB, MiB, GiB or TiB""" if x < 1024: @@ -40,3 +50,13 @@ def print_separator(separator="-", length=80): def print_key_value(key, value, key_length=25): """Print a key and value with fixed key-field length""" print(f"{key: <{key_length}} | {value}") + + +def print_multi(values, key_length=25): + """Print a key and value with fixed key-field length""" + assert isinstance(values, tuple) or isinstance(values, list) + assert len(values) > 1 + + print_str = "".join(f"{s: <{key_length}} | " for s in values[:-1]) + print_str += values[-1] + print(print_str) diff --git a/ucp/utils.py b/ucp/utils.py index 0b21cbce8..c0d06551f 100644 --- a/ucp/utils.py +++ b/ucp/utils.py @@ -75,10 +75,28 @@ def filter(self, record): return logger +def _ensure_cuda_device(devs, rank): + import numba.cuda + + dev_id = devs[rank % len(devs)] + os.environ["CUDA_VISIBLE_DEVICES"] = str(dev_id) + numba.cuda.current_context() + + # Help function used by `run_on_local_network()` def _worker_process( - queue, rank, server_address, n_workers, ucx_options_list, func, args + queue, + rank, + server_address, + n_workers, + ucx_options_list, + ensure_cuda_device, + func, + args, ): + if ensure_cuda_device is True: + _ensure_cuda_device(args.devs, rank) + import ucp if ucx_options_list is not None: @@ -115,7 +133,12 @@ async def server_handler(ep): def run_on_local_network( - n_workers, worker_func, worker_args=None, server_address=None, ucx_options_list=None + n_workers, + worker_func, + worker_args=None, + server_address=None, + ucx_options_list=None, + ensure_cuda_device=False, ): """ Creates a local UCX network of `n_workers` that runs `worker_func` @@ -136,6 +159,16 @@ def run_on_local_network( Server address for the workers. If None, ucx_api.get_address() is used. ucx_options_list: list of dict Options to pass to UCX when initializing workers, one for each worker. + ensure_cuda_device: bool + If `True`, sets the `CUDA_VISIBLE_DEVICES` environment variable to match + the proper CUDA device based on the worker's rank and create the CUDA + context on the corresponding device before calling `import ucp` for the + first time on the newly-spawned worker process, otherwise continues + without modifying `CUDA_VISIBLE_DEVICES` and creating a CUDA context. + Please note that having this set to `False` may cause all workers to use + device 0 and will not ensure proper InfiniBand<->GPU mapping on UCX, + potentially leading to low performance as GPUDirectRDMA will not be + active. Returns ------- @@ -156,6 +189,7 @@ def run_on_local_network( server_address, n_workers, ucx_options_list, + ensure_cuda_device, worker_func, worker_args, ), @@ -182,3 +216,11 @@ def hash64bits(*args): h = hashlib.sha1(bytes(repr(args), "utf-8")).hexdigest()[:16] # Convert to an integer and return return int(h, 16) + + +def hmean(a): + """Harmonic mean""" + if len(a): + return 1 / np.mean(1 / a) + else: + return 0