From 3e20748b492a09511c30860cefff6b4cfac5a63c Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Wed, 20 Mar 2024 12:25:34 -0700 Subject: [PATCH 1/4] Add changes --- python/cugraph/cugraph/dask/common/part_utils.py | 6 +++++- python/cugraph/cugraph/dask/comms/comms.py | 15 ++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 25311902b29..0f5d46e83bc 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -116,6 +116,10 @@ def persist_dask_df_equal_parts_per_worker( ddf_keys = dask_df.to_delayed() workers = client.scheduler_info()["workers"].keys() + worker_to_rank = Comms.rank_to_worker(client) + # assure rank-worker mappings are in ascending order + workers = dict(sorted(worker_to_rank.items())).values() + ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) persisted_keys_d = {} for w, ddf_k in zip(workers, ddf_keys_ls): diff --git a/python/cugraph/cugraph/dask/comms/comms.py b/python/cugraph/cugraph/dask/comms/comms.py index d623f20a038..3897ab4c959 100644 --- a/python/cugraph/cugraph/dask/comms/comms.py +++ b/python/cugraph/cugraph/dask/comms/comms.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -265,3 +265,16 @@ def get_n_workers(sID=None, dask_worker=None): dask_worker = get_worker() sessionstate = get_raft_comm_state(sID, dask_worker) return sessionstate["nworkers"] + + +def rank_to_worker(client): + """ + Return a mapping of dask workers to ranks. + """ + workers = client.scheduler_info()["workers"].keys() + worker_info = __instance.worker_info(workers) + rank_to_worker = {} + for w in worker_info: + rank_to_worker[worker_info[w]["rank"]] = w + + return rank_to_worker From 546bdb9c51a4f99fdf95588a3d778e6f366c831d Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Mon, 25 Mar 2024 09:12:00 -0700 Subject: [PATCH 2/4] PR feedback corrections --- python/cugraph/cugraph/dask/common/part_utils.py | 3 +-- python/cugraph/cugraph/dask/comms/comms.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index 0f5d46e83bc..bfa32bc49db 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -115,9 +115,8 @@ def persist_dask_df_equal_parts_per_worker( raise ValueError("return_type must be either 'dask_cudf.DataFrame' or 'dict'") ddf_keys = dask_df.to_delayed() - workers = client.scheduler_info()["workers"].keys() worker_to_rank = Comms.rank_to_worker(client) - # assure rank-worker mappings are in ascending order + # rank-worker mappings are in ascending order workers = dict(sorted(worker_to_rank.items())).values() ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) diff --git a/python/cugraph/cugraph/dask/comms/comms.py b/python/cugraph/cugraph/dask/comms/comms.py index 3897ab4c959..5499b13af03 100644 --- a/python/cugraph/cugraph/dask/comms/comms.py +++ b/python/cugraph/cugraph/dask/comms/comms.py @@ -269,7 +269,7 @@ def get_n_workers(sID=None, dask_worker=None): def rank_to_worker(client): """ - Return a mapping of dask workers to ranks. + Return a mapping of ranks to dask workers. """ workers = client.scheduler_info()["workers"].keys() worker_info = __instance.worker_info(workers) From 3b8db04aa48c58354c30f6e7012739450633ac4a Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Mon, 25 Mar 2024 09:22:56 -0700 Subject: [PATCH 3/4] Update variable name --- python/cugraph/cugraph/dask/common/part_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/part_utils.py b/python/cugraph/cugraph/dask/common/part_utils.py index bfa32bc49db..d362502f239 100644 --- a/python/cugraph/cugraph/dask/common/part_utils.py +++ b/python/cugraph/cugraph/dask/common/part_utils.py @@ -115,9 +115,9 @@ def persist_dask_df_equal_parts_per_worker( raise ValueError("return_type must be either 'dask_cudf.DataFrame' or 'dict'") ddf_keys = dask_df.to_delayed() - worker_to_rank = Comms.rank_to_worker(client) + rank_to_worker = Comms.rank_to_worker(client) # rank-worker mappings are in ascending order - workers = dict(sorted(worker_to_rank.items())).values() + workers = dict(sorted(rank_to_worker.items())).values() ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) persisted_keys_d = {} From 1415b3cbf3a0a07099d871bd048ce4f44870c6b8 Mon Sep 17 00:00:00 2001 From: Ralph Liu Date: Mon, 25 Mar 2024 13:23:47 -0700 Subject: [PATCH 4/4] disable dask-expr temporarily --- ci/test_python.sh | 4 ++++ ci/test_wheel.sh | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ci/test_python.sh b/ci/test_python.sh index 3a47d7e1490..447b0a3e786 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -3,6 +3,10 @@ set -euo pipefail +# TODO: Enable dask query planning (by default) once some bugs are fixed. +# xref: https://github.com/rapidsai/cudf/issues/15027 +export DASK_DATAFRAME__QUERY_PLANNING=False + # Support invoking test_python.sh outside the script directory cd "$(dirname "$(realpath "${BASH_SOURCE[0]}")")"/../ diff --git a/ci/test_wheel.sh b/ci/test_wheel.sh index 158704e08d1..cda40d92c74 100755 --- a/ci/test_wheel.sh +++ b/ci/test_wheel.sh @@ -3,6 +3,10 @@ set -eoxu pipefail +# TODO: Enable dask query planning (by default) once some bugs are fixed. +# xref: https://github.com/rapidsai/cudf/issues/15027 +export DASK_DATAFRAME__QUERY_PLANNING=False + package_name=$1 package_dir=$2