Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shuffle-benchmark #1074

Merged
merged 13 commits into from
Jan 10, 2023
11 changes: 8 additions & 3 deletions dask_cuda/benchmarks/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class Config(NamedTuple):
def run_benchmark(client: Client, args: Namespace, config: Config):
"""Run a benchmark a specified number of times

If ``args.profile`` is set, the final run is profiled."""
If ``args.profile`` is set, the final run is profiled.
"""
results = []
for _ in range(max(1, args.runs) - 1):
res = config.bench_once(client, args, write_profile=None)
Expand All @@ -110,8 +111,11 @@ def gather_bench_results(client: Client, args: Namespace, config: Config):
def run(client: Client, args: Namespace, config: Config):
"""Run the full benchmark on the cluster

Waits for the cluster, sets up memory pools, prints and saves results"""
Waits for the cluster, sets up memory pools, prints and saves results
"""

wait_for_cluster(client, shutdown_on_failure=True)
assert len(client.scheduler_info()["workers"]) > 0
setup_memory_pools(
client,
args.type == "gpu",
Expand Down Expand Up @@ -156,7 +160,8 @@ def run_client_from_existing_scheduler(args: Namespace, config: Config):
def run_create_client(args: Namespace, config: Config):
"""Create a client + cluster and run

Shuts down the cluster at the end of the benchmark"""
Shuts down the cluster at the end of the benchmark
"""
cluster_options = get_cluster_options(args)
Cluster = cluster_options["class"]
cluster_args = cluster_options["args"]
Expand Down
92 changes: 70 additions & 22 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import contextlib
from collections import ChainMap
from time import perf_counter
from typing import Tuple

import numpy as np
import pandas as pd

import dask
from dask import array as da
import dask.dataframe
from dask.dataframe.core import new_dd_object
from dask.dataframe.shuffle import shuffle
from dask.distributed import performance_report, wait
from dask.distributed import Client, performance_report, wait
from dask.utils import format_bytes, parse_bytes

import dask_cuda.explicit_comms.dataframe.shuffle
Expand All @@ -20,42 +23,82 @@
print_throughput_bandwidth,
)

try:
import cupy

def shuffle_dask(df, *, noop=False):
result = shuffle(df, index="data", shuffle="tasks")
if noop:
import cudf
except ImportError:
cupy = None
cudf = None


def shuffle_dask(df, args):
result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index)
if args.backend == "dask-noop":
result = as_noop(result)
t1 = perf_counter()
wait(result.persist())
return perf_counter() - t1


def shuffle_explicit_comms(df):
def shuffle_explicit_comms(df, args):
t1 = perf_counter()
wait(
dask_cuda.explicit_comms.dataframe.shuffle.shuffle(
df, column_names="data"
df, column_names="data", ignore_index=args.ignore_index
).persist()
)
return perf_counter() - t1


def bench_once(client, args, write_profile=None):
# Generate random Dask dataframe
chunksize = args.partition_size // 8 # Convert bytes to float64
nchunks = args.in_parts
totalsize = chunksize * nchunks
x = da.random.random((totalsize,), chunks=(chunksize,))
df = dask.dataframe.from_dask_array(x, columns="data").to_frame()
def create_df(nelem, df_type):
if df_type == "cpu":
return pd.DataFrame({"data": np.random.random(nelem)})
elif df_type == "gpu":
if cudf is None or cupy is None:
raise RuntimeError("`--type=gpu` requires cudf and cupy ")
return cudf.DataFrame({"data": cupy.random.random(nelem)})
else:
raise ValueError(f"Unknown type {df_type}")


def create_data(
client: Client, args, name="balanced-df"
) -> Tuple[int, dask.dataframe.DataFrame]:
"""Create an evenly distributed dask dataframe

The partitions are perfectly distributed across workers, if the number of
requested partitions is evenly divisible by the number of workers.
"""

workers = list(client.scheduler_info()["workers"].keys())
assert len(workers) > 0

chunksize = args.partition_size // np.float64().nbytes
# Distribute the new partitions between workers by round robin.
# We use `client.submit` to control the distribution exactly.
# TODO: support unbalanced partition distribution
dsk = {}
for i in range(args.in_parts):
worker = workers[i % len(workers)] # Round robin
madsbk marked this conversation as resolved.
Show resolved Hide resolved
dsk[(name, i)] = client.submit(
create_df, chunksize, args.type, workers=[worker], pure=False
)
wait(dsk.values())

if args.type == "gpu":
import cudf
df_meta = create_df(0, args.type)
divs = [None] * (len(dsk) + 1)
ret = new_dd_object(dsk, name, df_meta, divs).persist()
wait(ret)

df = df.map_partitions(cudf.from_pandas)
data_processed = args.in_parts * args.partition_size
if not args.ignore_index:
data_processed += args.in_parts * chunksize * df_meta.index.dtype.itemsize
return data_processed, ret

df = df.persist()
wait(df)
data_processed = len(df) * sum([t.itemsize for t in df.dtypes])

def bench_once(client, args, write_profile=None):
data_processed, df = create_data(client, args)

if write_profile is None:
ctx = contextlib.nullcontext()
Expand All @@ -64,9 +107,9 @@ def bench_once(client, args, write_profile=None):

with ctx:
if args.backend in {"dask", "dask-noop"}:
duration = shuffle_dask(df, noop=args.backend == "dask-noop")
duration = shuffle_dask(df, args)
else:
duration = shuffle_explicit_comms(df)
duration = shuffle_explicit_comms(df, args)

return (data_processed, duration)

Expand Down Expand Up @@ -177,6 +220,11 @@ def parse_args():
"type": int,
"help": "Number of runs",
},
{
"name": "--ignore-index",
"action": "store_true",
"help": "When shuffle, ignore the index",
},
]

return parse_benchmark_args(
Expand Down
2 changes: 1 addition & 1 deletion dask_cuda/explicit_comms/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(self, client: Optional[Client] = None):
self.sessionId = uuid.uuid4().int

# Get address of all workers (not Nanny addresses)
self.worker_addresses = list(self.client.run(lambda: 42).keys())
self.worker_addresses = list(self.client.scheduler_info()["workers"].keys())

# Make all workers listen and get all listen addresses
self.worker_direct_addresses = []
Expand Down