From 5db9477ba29ec9f936e11aaddcf890fc90b39361 Mon Sep 17 00:00:00 2001 From: Arun Raman Date: Tue, 25 May 2021 11:12:22 -0700 Subject: [PATCH 1/2] Adding profiling to dask shuffle --- dask_cuda/benchmarks/local_cudf_shuffle.py | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index cf5fec8a4..f8a5c1c3f 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -35,14 +35,24 @@ def shuffle_dask(args, df, write_profile): return clock() - t1 -def shuffle_explicit_comms(args, df): - t1 = clock() - wait( - dask_cuda.explicit_comms.dataframe.shuffle.shuffle( - df, column_names="data" - ).persist() - ) - took = clock() - t1 +def shuffle_explicit_comms(args, df, write_profile): + if write_profile is not None: + with performance_report(filename=args.profile): + t1 = clock() + wait( + dask_cuda.explicit_comms.dataframe.shuffle.shuffle( + df, column_names="data" + ).persist() + ) + took = clock() - t1 + else: + t1 = clock() + wait( + dask_cuda.explicit_comms.dataframe.shuffle.shuffle( + df, column_names="data" + ).persist() + ) + took = clock() - t1 return took @@ -66,7 +76,7 @@ def run(client, args, n_workers, write_profile=None): if args.backend == "dask": took = shuffle_dask(args, df, write_profile) else: - took = shuffle_explicit_comms(args, df) + took = shuffle_explicit_comms(args, df, write_profile) return (data_processed, took) From cf5f2c4cbb2259359f1a4ec129ec503a77c6a5dc Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Tue, 25 May 2021 21:30:32 +0200 Subject: [PATCH 2/2] moved performance_report outside shuffle call --- dask_cuda/benchmarks/local_cudf_shuffle.py | 56 ++++++++-------------- 1 file changed, 21 insertions(+), 35 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index f8a5c1c3f..f329aa92b 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -22,38 +22,16 @@ from dask_cuda.utils import all_to_all -def shuffle_dask(args, df, write_profile): - if write_profile is None: - ctx = contextlib.nullcontext() - else: - ctx = performance_report(filename=args.profile) +def shuffle_dask(df): + wait(shuffle(df, index="data", shuffle="tasks").persist()) - # Execute the operations to benchmark - with ctx: - t1 = clock() - wait(shuffle(df, index="data", shuffle="tasks").persist()) - return clock() - t1 - - -def shuffle_explicit_comms(args, df, write_profile): - if write_profile is not None: - with performance_report(filename=args.profile): - t1 = clock() - wait( - dask_cuda.explicit_comms.dataframe.shuffle.shuffle( - df, column_names="data" - ).persist() - ) - took = clock() - t1 - else: - t1 = clock() - wait( - dask_cuda.explicit_comms.dataframe.shuffle.shuffle( - df, column_names="data" - ).persist() - ) - took = clock() - t1 - return took + +def shuffle_explicit_comms(df): + wait( + dask_cuda.explicit_comms.dataframe.shuffle.shuffle( + df, column_names="data" + ).persist() + ) def run(client, args, n_workers, write_profile=None): @@ -73,12 +51,20 @@ def run(client, args, n_workers, write_profile=None): wait(df) data_processed = len(df) * sum([t.itemsize for t in df.dtypes]) - if args.backend == "dask": - took = shuffle_dask(args, df, write_profile) + if write_profile is None: + ctx = contextlib.nullcontext() else: - took = shuffle_explicit_comms(args, df, write_profile) + ctx = performance_report(filename=args.profile) + + with ctx: + t1 = clock() + if args.backend == "dask": + shuffle_dask(df) + else: + shuffle_explicit_comms(df) + t2 = clock() - return (data_processed, took) + return (data_processed, t2 - t1) def main(args):