diff --git a/dask_cuda/benchmarks/common.py b/dask_cuda/benchmarks/common.py index 7c489d00..00aa31dc 100644 --- a/dask_cuda/benchmarks/common.py +++ b/dask_cuda/benchmarks/common.py @@ -85,7 +85,8 @@ class Config(NamedTuple): def run_benchmark(client: Client, args: Namespace, config: Config): """Run a benchmark a specified number of times - If ``args.profile`` is set, the final run is profiled.""" + If ``args.profile`` is set, the final run is profiled. + """ results = [] for _ in range(max(1, args.runs) - 1): res = config.bench_once(client, args, write_profile=None) @@ -110,8 +111,11 @@ def gather_bench_results(client: Client, args: Namespace, config: Config): def run(client: Client, args: Namespace, config: Config): """Run the full benchmark on the cluster - Waits for the cluster, sets up memory pools, prints and saves results""" + Waits for the cluster, sets up memory pools, prints and saves results + """ + wait_for_cluster(client, shutdown_on_failure=True) + assert len(client.scheduler_info()["workers"]) > 0 setup_memory_pools( client, args.type == "gpu", @@ -156,7 +160,8 @@ def run_client_from_existing_scheduler(args: Namespace, config: Config): def run_create_client(args: Namespace, config: Config): """Create a client + cluster and run - Shuts down the cluster at the end of the benchmark""" + Shuts down the cluster at the end of the benchmark + """ cluster_options = get_cluster_options(args) Cluster = cluster_options["class"] cluster_args = cluster_options["args"] diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 7ff099cc..d9039aad 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -1,13 +1,16 @@ import contextlib from collections import ChainMap from time import perf_counter +from typing import Tuple +import numpy as np import pandas as pd import dask -from dask import array as da +import dask.dataframe +from dask.dataframe.core import new_dd_object from dask.dataframe.shuffle import shuffle -from dask.distributed import performance_report, wait +from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, parse_bytes import dask_cuda.explicit_comms.dataframe.shuffle @@ -20,42 +23,82 @@ print_throughput_bandwidth, ) +try: + import cupy -def shuffle_dask(df, *, noop=False): - result = shuffle(df, index="data", shuffle="tasks") - if noop: + import cudf +except ImportError: + cupy = None + cudf = None + + +def shuffle_dask(df, args): + result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index) + if args.backend == "dask-noop": result = as_noop(result) t1 = perf_counter() wait(result.persist()) return perf_counter() - t1 -def shuffle_explicit_comms(df): +def shuffle_explicit_comms(df, args): t1 = perf_counter() wait( dask_cuda.explicit_comms.dataframe.shuffle.shuffle( - df, column_names="data" + df, column_names="data", ignore_index=args.ignore_index ).persist() ) return perf_counter() - t1 -def bench_once(client, args, write_profile=None): - # Generate random Dask dataframe - chunksize = args.partition_size // 8 # Convert bytes to float64 - nchunks = args.in_parts - totalsize = chunksize * nchunks - x = da.random.random((totalsize,), chunks=(chunksize,)) - df = dask.dataframe.from_dask_array(x, columns="data").to_frame() +def create_df(nelem, df_type): + if df_type == "cpu": + return pd.DataFrame({"data": np.random.random(nelem)}) + elif df_type == "gpu": + if cudf is None or cupy is None: + raise RuntimeError("`--type=gpu` requires cudf and cupy ") + return cudf.DataFrame({"data": cupy.random.random(nelem)}) + else: + raise ValueError(f"Unknown type {df_type}") + + +def create_data( + client: Client, args, name="balanced-df" +) -> Tuple[int, dask.dataframe.DataFrame]: + """Create an evenly distributed dask dataframe + + The partitions are perfectly distributed across workers, if the number of + requested partitions is evenly divisible by the number of workers. + """ + + workers = list(client.scheduler_info()["workers"].keys()) + assert len(workers) > 0 + + chunksize = args.partition_size // np.float64().nbytes + # Distribute the new partitions between workers by round robin. + # We use `client.submit` to control the distribution exactly. + # TODO: support unbalanced partition distribution + dsk = {} + for i in range(args.in_parts): + worker = workers[i % len(workers)] # Round robin + dsk[(name, i)] = client.submit( + create_df, chunksize, args.type, workers=[worker], pure=False + ) + wait(dsk.values()) - if args.type == "gpu": - import cudf + df_meta = create_df(0, args.type) + divs = [None] * (len(dsk) + 1) + ret = new_dd_object(dsk, name, df_meta, divs).persist() + wait(ret) - df = df.map_partitions(cudf.from_pandas) + data_processed = args.in_parts * args.partition_size + if not args.ignore_index: + data_processed += args.in_parts * chunksize * df_meta.index.dtype.itemsize + return data_processed, ret - df = df.persist() - wait(df) - data_processed = len(df) * sum([t.itemsize for t in df.dtypes]) + +def bench_once(client, args, write_profile=None): + data_processed, df = create_data(client, args) if write_profile is None: ctx = contextlib.nullcontext() @@ -64,9 +107,9 @@ def bench_once(client, args, write_profile=None): with ctx: if args.backend in {"dask", "dask-noop"}: - duration = shuffle_dask(df, noop=args.backend == "dask-noop") + duration = shuffle_dask(df, args) else: - duration = shuffle_explicit_comms(df) + duration = shuffle_explicit_comms(df, args) return (data_processed, duration) @@ -177,6 +220,11 @@ def parse_args(): "type": int, "help": "Number of runs", }, + { + "name": "--ignore-index", + "action": "store_true", + "help": "When shuffle, ignore the index", + }, ] return parse_benchmark_args( diff --git a/dask_cuda/explicit_comms/comms.py b/dask_cuda/explicit_comms/comms.py index 0ebd7f0c..05dbc961 100644 --- a/dask_cuda/explicit_comms/comms.py +++ b/dask_cuda/explicit_comms/comms.py @@ -180,7 +180,7 @@ def __init__(self, client: Optional[Client] = None): self.sessionId = uuid.uuid4().int # Get address of all workers (not Nanny addresses) - self.worker_addresses = list(self.client.run(lambda: 42).keys()) + self.worker_addresses = list(self.client.scheduler_info()["workers"].keys()) # Make all workers listen and get all listen addresses self.worker_direct_addresses = []