From 41f63dddadad65cad354d7954c358b8fe32248fe Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 1 Sep 2023 11:03:25 +0200 Subject: [PATCH 1/2] Impl. zarr benchmark --- python/benchmarks/zarr-io.py | 304 +++++++++++++++++++++++++++++++++++ 1 file changed, 304 insertions(+) create mode 100644 python/benchmarks/zarr-io.py diff --git a/python/benchmarks/zarr-io.py b/python/benchmarks/zarr-io.py new file mode 100644 index 0000000000..db9f0b3dbe --- /dev/null +++ b/python/benchmarks/zarr-io.py @@ -0,0 +1,304 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import argparse +import contextlib +import os +import os.path +import pathlib +import shutil +import statistics +import subprocess +import tempfile +from time import perf_counter as clock +from typing import ContextManager, Union + +import cupy +import numcodecs.blosc +import numpy +import zarr +from dask.utils import format_bytes, parse_bytes + +import kvikio +import kvikio.defaults +import kvikio.zarr + +if not kvikio.zarr.supported: + raise RuntimeError(f"requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}") + +compressors = { + "none": (None, None), + "lz4": (numcodecs.blosc.Blosc(cname="lz4"), kvikio.zarr.LZ4()), +} + + +def drop_vm_cache(args): + if args.drop_vm_cache: + subprocess.check_output(["sudo /sbin/sysctl vm.drop_caches=3"], shell=True) + + +def create_src_data(args): + return cupy.random.random(args.nelem, dtype=args.dtype) + + +def run_kvikio(args): + dir_path = args.dir / "kvikio" + shutil.rmtree(str(dir_path), ignore_errors=True) + + # Get the GPU compressor + compressor = compressors[args.compressor][1] + + src = create_src_data(args) + + # Write + drop_vm_cache(args) + t0 = clock() + z = zarr.create( + shape=(args.nelem,), + dtype=args.dtype, + chunks=False, + compressor=compressor, + store=kvikio.zarr.GDSStore(dir_path), + meta_array=cupy.empty(()), + ) + z[:] = src + os.sync() + write_time = clock() - t0 + + # Read + drop_vm_cache(args) + t0 = clock() + res = z[:] + read_time = clock() - t0 + assert res.nbytes == args.nbytes + + return read_time, write_time + + +def run_posix(args): + dir_path = args.dir / "posix" + shutil.rmtree(str(dir_path), ignore_errors=True) + + # Get the CPU compressor + compressor = compressors[args.compressor][0] + + src = create_src_data(args) + + # Write + drop_vm_cache(args) + t0 = clock() + z = zarr.create( + shape=(args.nelem,), + dtype=args.dtype, + chunks=False, + compressor=compressor, + store=zarr.DirectoryStore(dir_path), + meta_array=numpy.empty(()), + ) + z[:] = src.get() + os.sync() + write_time = clock() - t0 + + # Read + drop_vm_cache(args) + t0 = clock() + res = cupy.asarray(z[:]) + read_time = clock() - t0 + assert res.nbytes == args.nbytes + + return read_time, write_time + + +API = { + "kvikio": run_kvikio, + "posix": run_posix, +} + + +def main(args): + cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool + cupy.arange(10) # Make sure CUDA is initialized + + kvikio.defaults.num_threads_reset(args.nthreads) + props = kvikio.DriverProperties() + try: + import pynvml.smi + + nvsmi = pynvml.smi.nvidia_smi.getInstance() + except ImportError: + gpu_name = "Unknown (install pynvml)" + mem_total = gpu_name + bar1_total = gpu_name + else: + info = nvsmi.DeviceQuery()["gpu"][0] + gpu_name = f"{info['product_name']} (dev #0)" + mem_total = format_bytes( + parse_bytes( + str(info["fb_memory_usage"]["total"]) + info["fb_memory_usage"]["unit"] + ) + ) + bar1_total = format_bytes( + parse_bytes( + str(info["bar1_memory_usage"]["total"]) + + info["bar1_memory_usage"]["unit"] + ) + ) + gds_version = "N/A (Compatibility Mode)" + if props.is_gds_available: + gds_version = f"v{props.major_version}.{props.minor_version}" + gds_config_json_path = os.path.realpath( + os.getenv("CUFILE_ENV_PATH_JSON", "/etc/cufile.json") + ) + drop_vm_cache_msg = str(args.drop_vm_cache) + if not args.drop_vm_cache: + drop_vm_cache_msg += " (use --drop-vm-cache for better accuracy!)" + + print("Roundtrip benchmark") + print("----------------------------------") + if kvikio.defaults.compat_mode(): + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + print(" WARNING - KvikIO compat mode ") + print(" libcufile.so not used ") + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + elif not props.is_gds_available: + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + print(" WARNING - cuFile compat mode ") + print(" GDS not enabled ") + print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + print(f"GPU | {gpu_name}") + print(f"GPU Memory Total | {mem_total}") + print(f"BAR1 Memory Total | {bar1_total}") + print(f"GDS driver | {gds_version}") + print(f"GDS config.json | {gds_config_json_path}") + print("----------------------------------") + print(f"nbytes | {args.nbytes} bytes ({format_bytes(args.nbytes)})") + print(f"4K aligned | {args.nbytes % 4096 == 0}") + print(f"drop-vm-cache | {drop_vm_cache_msg}") + print(f"directory | {args.dir}") + print(f"nthreads | {args.nthreads}") + print(f"nruns | {args.nruns}") + print(f"compressor | {args.compressor}") + print("==================================") + + # Run each benchmark using the requested APIs + for api in args.api: + rs = [] + ws = [] + for _ in range(args.nruns): + read, write = API[api](args) + rs.append(args.nbytes / read) + ws.append(args.nbytes / write) + + def pprint_api_res(name, samples): + mean = statistics.mean(samples) if len(samples) > 1 else samples[0] + ret = f"{api} {name}".ljust(18) + ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) + if len(samples) > 1: + stdev = statistics.stdev(samples) / mean * 100 + ret += " ± %5.2f %%" % stdev + ret += " (" + for sample in samples: + ret += f"{format_bytes(sample)}/s, " + ret = ret[:-2] + ")" # Replace trailing comma + return ret + + print(pprint_api_res("read", rs)) + print(pprint_api_res("write", ws)) + + +if __name__ == "__main__": + + def parse_directory(x): + if x is None: + return x + else: + p = pathlib.Path(x) + if not p.is_dir(): + raise argparse.ArgumentTypeError("Must be a directory") + return p + + parser = argparse.ArgumentParser(description="Roundtrip benchmark") + parser.add_argument( + "-n", + "--nbytes", + metavar="BYTES", + default="10 MiB", + type=parse_bytes, + help="Message size, which must be a multiple of 8 (default: %(default)s).", + ) + parser.add_argument( + "--dtype", + default="float32", + type=numpy.dtype, + help="NumPy datatype to use (default: '%(default)s')", + ) + parser.add_argument( + "-d", + "--dir", + metavar="PATH", + default=None, + type=parse_directory, + help="Path to the directory to r/w from (default: tempfile.TemporaryDirectory)", + ) + parser.add_argument( + "--nruns", + metavar="RUNS", + default=1, + type=int, + help="Number of runs per API (default: %(default)s).", + ) + + parser.add_argument( + "-t", + "--nthreads", + metavar="THREADS", + default=1, + type=int, + help="Number of threads to use (default: %(default)s).", + ) + parser.add_argument( + "--api", + metavar="API", + default=("kvikio", "posix"), + nargs="+", + choices=tuple(API.keys()) + ("all",), + help="List of APIs to use {%(choices)s}", + ) + parser.add_argument( + "--compressor", + metavar="COMPRESSOR", + default="none", + choices=tuple(compressors.keys()), + help=( + "Set a nvCOMP compressor to use with Zarr " + "{%(choices)s} (default: %(default)s)" + ), + ) + parser.add_argument( + "--drop-vm-cache", + action="store_true", + default=False, + help=( + "Drop the VM cache between writes and reads, " + "requires sudo access to /sbin/sysctl" + ), + ) + + args = parser.parse_args() + if "all" in args.api: + args.api = tuple(API.keys()) + + # Compute nelem + args.nelem = args.nbytes // args.dtype.itemsize + assert args.nbytes % args.dtype.itemsize == 0 + + # Create a temporary directory if user didn't specify a directory + temp_dir: Union[tempfile.TemporaryDirectory, ContextManager] + if args.dir is None: + temp_dir = tempfile.TemporaryDirectory() + args.dir = pathlib.Path(temp_dir.name) + else: + temp_dir = contextlib.nullcontext() + + with temp_dir: + main(args) From 55569e05859db51663f60ccf364360b76da4c128 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 1 Sep 2023 04:51:38 -0700 Subject: [PATCH 2/2] chunksize and warmup --- python/benchmarks/zarr-io.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/python/benchmarks/zarr-io.py b/python/benchmarks/zarr-io.py index db9f0b3dbe..983c735364 100644 --- a/python/benchmarks/zarr-io.py +++ b/python/benchmarks/zarr-io.py @@ -55,8 +55,8 @@ def run_kvikio(args): t0 = clock() z = zarr.create( shape=(args.nelem,), + chunks=(args.chunksize,), dtype=args.dtype, - chunks=False, compressor=compressor, store=kvikio.zarr.GDSStore(dir_path), meta_array=cupy.empty(()), @@ -89,8 +89,8 @@ def run_posix(args): t0 = clock() z = zarr.create( shape=(args.nelem,), + chunks=(args.chunksize,), dtype=args.dtype, - chunks=False, compressor=compressor, store=zarr.DirectoryStore(dir_path), meta_array=numpy.empty(()), @@ -152,6 +152,7 @@ def main(args): drop_vm_cache_msg = str(args.drop_vm_cache) if not args.drop_vm_cache: drop_vm_cache_msg += " (use --drop-vm-cache for better accuracy!)" + chunksize = args.chunksize * args.dtype.itemsize print("Roundtrip benchmark") print("----------------------------------") @@ -172,6 +173,7 @@ def main(args): print(f"GDS config.json | {gds_config_json_path}") print("----------------------------------") print(f"nbytes | {args.nbytes} bytes ({format_bytes(args.nbytes)})") + print(f"chunksize | {chunksize} bytes ({format_bytes(chunksize)})") print(f"4K aligned | {args.nbytes % 4096 == 0}") print(f"drop-vm-cache | {drop_vm_cache_msg}") print(f"directory | {args.dir}") @@ -184,6 +186,8 @@ def main(args): for api in args.api: rs = [] ws = [] + for _ in range(args.n_warmup_runs): + read, write = API[api](args) for _ in range(args.nruns): read, write = API[api](args) rs.append(args.nbytes / read) @@ -226,6 +230,13 @@ def parse_directory(x): type=parse_bytes, help="Message size, which must be a multiple of 8 (default: %(default)s).", ) + parser.add_argument( + "--chunksize", + metavar="BYTES", + default="10 MiB", + type=parse_bytes, + help="Chunk size (default: %(default)s).", + ) parser.add_argument( "--dtype", default="float32", @@ -247,7 +258,12 @@ def parse_directory(x): type=int, help="Number of runs per API (default: %(default)s).", ) - + parser.add_argument( + "--n-warmup-runs", + default=0, + type=int, + help="Number of warmup runs (default: %(default)s).", + ) parser.add_argument( "-t", "--nthreads", @@ -288,9 +304,13 @@ def parse_directory(x): if "all" in args.api: args.api = tuple(API.keys()) - # Compute nelem - args.nelem = args.nbytes // args.dtype.itemsize + # Check if size is divisible by size of datatype assert args.nbytes % args.dtype.itemsize == 0 + assert args.chunksize % args.dtype.itemsize == 0 + + # Compute/convert to number of elements + args.nelem = args.nbytes // args.dtype.itemsize + args.chunksize = args.chunksize // args.dtype.itemsize # Create a temporary directory if user didn't specify a directory temp_dir: Union[tempfile.TemporaryDirectory, ContextManager]