From e8e9f70807cc97f3230108232aad9871519ae42f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 4 Jan 2024 14:38:09 +0100 Subject: [PATCH] Avoid deprecated `shuffle` keyword (#8439) --- .../dashboard/tests/test_scheduler_bokeh.py | 3 +- .../protocol/tests/test_highlevelgraph.py | 6 +- distributed/shuffle/tests/test_graph.py | 47 +- distributed/shuffle/tests/test_merge.py | 491 +++++++++--------- .../tests/test_merge_column_and_index.py | 12 +- distributed/shuffle/tests/test_metrics.py | 3 +- distributed/shuffle/tests/test_shuffle.py | 196 ++++--- distributed/tests/test_dask_collections.py | 3 +- distributed/tests/test_scheduler.py | 12 +- distributed/tests/test_steal.py | 12 +- distributed/tests/test_worker.py | 3 +- 11 files changed, 431 insertions(+), 357 deletions(-) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index df9736b02f..ed61155d9d 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -1342,7 +1342,8 @@ async def test_shuffling(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist() + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + df2 = dd.shuffle.shuffle(df, "x").persist() start = time() while not ss.source.data["comm_written"]: ss.update() diff --git a/distributed/protocol/tests/test_highlevelgraph.py b/distributed/protocol/tests/test_highlevelgraph.py index 53fa96cd03..23a5768241 100644 --- a/distributed/protocol/tests/test_highlevelgraph.py +++ b/distributed/protocol/tests/test_highlevelgraph.py @@ -40,7 +40,7 @@ def add(x, y, z, extra_arg): ) df = dd.from_pandas(pd.DataFrame({"a": np.arange(3)}), npartitions=3) - df = df.shuffle("a", shuffle="tasks") + df = df.shuffle("a") df = df["a"].to_dask_array() res = x.sum() + df.sum() @@ -84,7 +84,7 @@ async def test_shuffle(c, s, a, b): ), npartitions=5, ) - df = df.shuffle("a", shuffle="tasks", max_branch=2) + df = df.shuffle("a", max_branch=2) df = df["a"] + df["b"] res = await c.compute(df, optimize_graph=False) assert res.dtypes == np.float64 @@ -169,7 +169,7 @@ async def test_dataframe_annotations(c, s, a, b): ), npartitions=5, ) - df = df.shuffle("a", shuffle="tasks", max_branch=2) + df = df.shuffle("a", max_branch=2) acol = df["a"] bcol = df["b"] diff --git a/distributed/shuffle/tests/test_graph.py b/distributed/shuffle/tests/test_graph.py index 1bedb28a84..cbd6b696d3 100644 --- a/distributed/shuffle/tests/test_graph.py +++ b/distributed/shuffle/tests/test_graph.py @@ -19,13 +19,16 @@ def test_basic(client): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") df["name"] = df["name"].astype("string[python]") - shuffled = df.shuffle("id", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + p2p_shuffled = df.shuffle("id") - (opt,) = dask.optimize(shuffled) + (opt,) = dask.optimize(p2p_shuffled) assert isinstance(hlg_layer_topological(opt.dask, 0), Blockwise) # blockwise -> barrier -> unpack -> drop_by_shallow_copy - dd.utils.assert_eq(shuffled, df.shuffle("id", shuffle="tasks"), scheduler=client) + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + tasks_shuffled = df.shuffle("id") + dd.utils.assert_eq(p2p_shuffled, tasks_shuffled, scheduler=client) # ^ NOTE: this works because `assert_eq` sorts the rows before comparing @@ -36,8 +39,8 @@ def test_raise_on_complex_numbers(dtype): ) with pytest.raises( TypeError, match=f"p2p does not support data of type '{df.x.dtype}'" - ): - df.shuffle("x", shuffle="p2p") + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + df.shuffle("x") @pytest.mark.xfail( @@ -52,34 +55,42 @@ def __init__(self, value: int) -> None: pd.DataFrame({"x": pd.array([Stub(i) for i in range(10)], dtype="object")}), npartitions=5, ) - with pytest.raises(TypeError, match="p2p does not support custom objects"): - df.shuffle("x", shuffle="p2p") + with pytest.raises( + TypeError, match="p2p does not support custom objects" + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + df.shuffle("x") def test_raise_on_sparse_data(): df = dd.from_pandas( pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5 ) - with pytest.raises(TypeError, match="p2p does not support sparse data"): - df.shuffle("x", shuffle="p2p") + with pytest.raises( + TypeError, match="p2p does not support sparse data" + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + df.shuffle("x") def test_raise_on_non_string_column_name(): df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5) - with pytest.raises(TypeError, match="p2p requires all column names to be str"): - df.shuffle("a", shuffle="p2p") + with pytest.raises( + TypeError, match="p2p requires all column names to be str" + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + df.shuffle("a") def test_does_not_raise_on_stringified_numeric_column_name(): df = dd.from_pandas(pd.DataFrame({"a": range(10), "1": range(10)}), npartitions=5) - df.shuffle("a", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + df.shuffle("a") @gen_cluster([("", 2)] * 4, client=True) async def test_basic_state(c, s, *workers): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") df["name"] = df["name"].astype("string[python]") - shuffled = df.shuffle("id", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = df.shuffle("id") exts = [w.extensions["shuffle"] for w in workers] for ext in exts: @@ -102,14 +113,18 @@ async def test_basic_state(c, s, *workers): def test_multiple_linear(client): df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") df["name"] = df["name"].astype("string[python]") - s1 = df.shuffle("id", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + s1 = df.shuffle("id") s1["x"] = s1["x"] + 1 - s2 = s1.shuffle("x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + s2 = s1.shuffle("x") + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + expected = df.assign(x=lambda df: df.x + 1).shuffle("x") # TODO eventually test for fusion between s1's unpacks, the `+1`, and s2's `transfer`s dd.utils.assert_eq( s2, - df.assign(x=lambda df: df.x + 1).shuffle("x", shuffle="tasks"), + expected, scheduler=client, ) diff --git a/distributed/shuffle/tests/test_merge.py b/distributed/shuffle/tests/test_merge.py index 08d714e77a..d74b553698 100644 --- a/distributed/shuffle/tests/test_merge.py +++ b/distributed/shuffle/tests/test_merge.py @@ -66,8 +66,10 @@ async def test_minimal_version(c, s, a, b): B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) b = dd.repartition(B, [0, 2, 5]) - with pytest.raises(ModuleNotFoundError, match="requires pyarrow"): - await c.compute(dd.merge(a, b, left_on="x", right_on="z", shuffle="p2p")) + with pytest.raises( + ModuleNotFoundError, match="requires pyarrow" + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + await c.compute(dd.merge(a, b, left_on="x", right_on="z")) @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) @@ -113,13 +115,12 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s ddf1 = dd.from_pandas(pdf1, npartitions=5) ddf2 = dd.from_pandas(pdf2, npartitions=10) - out = ( - ddf1.merge(ddf2, left_on="a", right_on="x", shuffle="p2p") - # Vary the number of output partitions for the shuffles of dd2 - .repartition(npartitions=20).merge( - ddf2, left_on="b", right_on="x", shuffle="p2p" + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = ( + ddf1.merge(ddf2, left_on="a", right_on="x") + # Vary the number of output partitions for the shuffles of dd2 + .repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x") ) - ) # Generate unique shuffle IDs if the input frame is the same but # parameters differ. Reusing shuffles in merges is dangerous because of the # required coordination and complexity introduced through dynamic clusters. @@ -141,22 +142,22 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, # This performs two shuffles: # * ddf1 is shuffled on `a` # * ddf2 is shuffled on `x` - ddf3 = ddf1.merge( - ddf2, - left_on="a", - right_on="x", - shuffle="p2p", - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf3 = ddf1.merge( + ddf2, + left_on="a", + right_on="x", + ) # This performs one shuffle: # * ddf3 is shuffled on `b` # We can reuse the shuffle of dd2 on `x` from the previous merge. - out = ddf2.merge( - ddf3, - left_on="x", - right_on="b", - shuffle="p2p", - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = ddf2.merge( + ddf3, + left_on="x", + right_on="b", + ) # Generate unique shuffle IDs if the input frame is the same and all its # parameters match. Reusing shuffles in merges is dangerous because of the # required coordination and complexity introduced through dynamic clusters. @@ -178,101 +179,93 @@ async def test_merge(c, s, a, b, how, disk): B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]}) b = dd.repartition(B, [0, 2, 5]) - with dask.config.set({"distributed.p2p.disk": disk}): - joined = dd.merge( - a, b, left_index=True, right_index=True, how=how, shuffle="p2p" + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + with dask.config.set({"distributed.p2p.disk": disk}): + joined = dd.merge(a, b, left_index=True, right_index=True, how=how) + res = await c.compute(joined) + assert_eq( + res, + pd.merge(A, B, left_index=True, right_index=True, how=how), + ) + joined = dd.merge(a, b, on="y", how=how) + result = await c.compute(joined) + list_eq(result, pd.merge(A, B, on="y", how=how)) + assert all(d is None for d in joined.divisions) + + list_eq( + await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)), + pd.merge(A, B, left_on="x", right_on="z", how=how), + ) + list_eq( + await c.compute( + dd.merge( + a, + b, + left_on="x", + right_on="z", + how=how, + suffixes=("1", "2"), + ) + ), + pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")), ) - res = await c.compute(joined) - assert_eq( - res, - pd.merge(A, B, left_index=True, right_index=True, how=how), - ) - joined = dd.merge(a, b, on="y", how=how) - result = await c.compute(joined) - list_eq(result, pd.merge(A, B, on="y", how=how)) - assert all(d is None for d in joined.divisions) - - list_eq( - await c.compute( - dd.merge(a, b, left_on="x", right_on="z", how=how, shuffle="p2p") - ), - pd.merge(A, B, left_on="x", right_on="z", how=how), - ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_on="x", - right_on="z", - how=how, - suffixes=("1", "2"), - shuffle="p2p", - ) - ), - pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")), - ) - list_eq( - await c.compute(dd.merge(a, b, how=how, shuffle="p2p")), - pd.merge(A, B, how=how), - ) - list_eq( - await c.compute(dd.merge(a, B, how=how, shuffle="p2p")), - pd.merge(A, B, how=how), - ) - list_eq( - await c.compute(dd.merge(A, b, how=how, shuffle="p2p")), - pd.merge(A, B, how=how), - ) - # Note: No await since A and B are both pandas dataframes and this doesn't - # actually submit anything - list_eq( - c.compute(dd.merge(A, B, how=how, shuffle="p2p")), - pd.merge(A, B, how=how), - ) + list_eq( + await c.compute(dd.merge(a, b, how=how)), + pd.merge(A, B, how=how), + ) + list_eq( + await c.compute(dd.merge(a, B, how=how)), + pd.merge(A, B, how=how), + ) + list_eq( + await c.compute(dd.merge(A, b, how=how)), + pd.merge(A, B, how=how), + ) + # Note: No await since A and B are both pandas dataframes and this doesn't + # actually submit anything + list_eq( + c.compute(dd.merge(A, B, how=how)), + pd.merge(A, B, how=how), + ) - list_eq( - await c.compute( - dd.merge(a, b, left_index=True, right_index=True, how=how, shuffle="p2p") - ), - pd.merge(A, B, left_index=True, right_index=True, how=how), - ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_index=True, - right_index=True, - how=how, - suffixes=("1", "2"), - shuffle="p2p", - ) - ), - pd.merge(A, B, left_index=True, right_index=True, how=how, suffixes=("1", "2")), - ) + list_eq( + await c.compute(dd.merge(a, b, left_index=True, right_index=True, how=how)), + pd.merge(A, B, left_index=True, right_index=True, how=how), + ) + list_eq( + await c.compute( + dd.merge( + a, + b, + left_index=True, + right_index=True, + how=how, + suffixes=("1", "2"), + ) + ), + pd.merge( + A, B, left_index=True, right_index=True, how=how, suffixes=("1", "2") + ), + ) - list_eq( - await c.compute( - dd.merge(a, b, left_on="x", right_index=True, how=how, shuffle="p2p") - ), - pd.merge(A, B, left_on="x", right_index=True, how=how), - ) - list_eq( - await c.compute( - dd.merge( - a, - b, - left_on="x", - right_index=True, - how=how, - suffixes=("1", "2"), - shuffle="p2p", - ) - ), - pd.merge(A, B, left_on="x", right_index=True, how=how, suffixes=("1", "2")), - ) + list_eq( + await c.compute(dd.merge(a, b, left_on="x", right_index=True, how=how)), + pd.merge(A, B, left_on="x", right_index=True, how=how), + ) + list_eq( + await c.compute( + dd.merge( + a, + b, + left_on="x", + right_index=True, + how=how, + suffixes=("1", "2"), + ) + ), + pd.merge(A, B, left_on="x", right_index=True, how=how, suffixes=("1", "2")), + ) @pytest.mark.slow @@ -336,132 +329,132 @@ async def test_merge_by_multiple_columns(c, s, a, b, how): ddl = dd.from_pandas(pdl, lpart) ddr = dd.from_pandas(pdr, rpart) - expected = pdl.join(pdr, how=how) - assert_eq( - await c.compute(ddl.join(ddr, how=how, shuffle="p2p")), - expected, - # FIXME: There's an discrepancy with an empty index for - # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). - # Temporarily avoid index check until the discrepancy is fixed. - check_index=not (PANDAS_GE_200 and expected.index.empty), - ) - - expected = pdr.join(pdl, how=how) - assert_eq( - await c.compute(ddr.join(ddl, how=how, shuffle="p2p")), - expected, - # FIXME: There's an discrepancy with an empty index for - # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). - # Temporarily avoid index check until the discrepancy is fixed. - check_index=not (PANDAS_GE_200 and expected.index.empty), - ) - - expected = pd.merge(pdl, pdr, how=how, left_index=True, right_index=True) - assert_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_index=True, - right_index=True, - shuffle="p2p", - ) - ), - expected, - # FIXME: There's an discrepancy with an empty index for - # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). - # Temporarily avoid index check until the discrepancy is fixed. - check_index=not (PANDAS_GE_200 and expected.index.empty), - ) - - expected = pd.merge(pdr, pdl, how=how, left_index=True, right_index=True) - assert_eq( - await c.compute( - dd.merge( - ddr, - ddl, - how=how, - left_index=True, - right_index=True, - shuffle="p2p", - ) - ), - expected, - # FIXME: There's an discrepancy with an empty index for - # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). - # Temporarily avoid index check until the discrepancy is fixed. - check_index=not (PANDAS_GE_200 and expected.index.empty), - ) - - # hash join - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on="a", - right_on="d", - shuffle="p2p", - ) - ), - pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"), - ) - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on="b", - right_on="e", - shuffle="p2p", - ) - ), - pd.merge(pdl, pdr, how=how, left_on="b", right_on="e"), - ) - - list_eq( - await c.compute( - dd.merge( - ddr, - ddl, - how=how, - left_on="d", - right_on="a", - shuffle="p2p", - ) - ), - pd.merge(pdr, pdl, how=how, left_on="d", right_on="a"), - ) - list_eq( - await c.compute( - dd.merge( - ddr, - ddl, - how=how, - left_on="e", - right_on="b", - shuffle="p2p", - ) - ), - pd.merge(pdr, pdl, how=how, left_on="e", right_on="b"), - ) - - list_eq( - await c.compute( - dd.merge( - ddl, - ddr, - how=how, - left_on=["a", "b"], - right_on=["d", "e"], - shuffle="p2p", - ) - ), - pd.merge(pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]), - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + expected = pdl.join(pdr, how=how) + assert_eq( + await c.compute(ddl.join(ddr, how=how)), + expected, + # FIXME: There's an discrepancy with an empty index for + # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). + # Temporarily avoid index check until the discrepancy is fixed. + check_index=not (PANDAS_GE_200 and expected.index.empty), + ) + + expected = pdr.join(pdl, how=how) + assert_eq( + await c.compute(ddr.join(ddl, how=how)), + expected, + # FIXME: There's an discrepancy with an empty index for + # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). + # Temporarily avoid index check until the discrepancy is fixed. + check_index=not (PANDAS_GE_200 and expected.index.empty), + ) + + expected = pd.merge( + pdl, pdr, how=how, left_index=True, right_index=True + ) + assert_eq( + await c.compute( + dd.merge( + ddl, + ddr, + how=how, + left_index=True, + right_index=True, + ) + ), + expected, + # FIXME: There's an discrepancy with an empty index for + # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). + # Temporarily avoid index check until the discrepancy is fixed. + check_index=not (PANDAS_GE_200 and expected.index.empty), + ) + + expected = pd.merge( + pdr, pdl, how=how, left_index=True, right_index=True + ) + assert_eq( + await c.compute( + dd.merge( + ddr, + ddl, + how=how, + left_index=True, + right_index=True, + ) + ), + expected, + # FIXME: There's an discrepancy with an empty index for + # pandas=2.0 (xref https://github.com/dask/dask/issues/9957). + # Temporarily avoid index check until the discrepancy is fixed. + check_index=not (PANDAS_GE_200 and expected.index.empty), + ) + + # hash join + list_eq( + await c.compute( + dd.merge( + ddl, + ddr, + how=how, + left_on="a", + right_on="d", + ) + ), + pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"), + ) + list_eq( + await c.compute( + dd.merge( + ddl, + ddr, + how=how, + left_on="b", + right_on="e", + ) + ), + pd.merge(pdl, pdr, how=how, left_on="b", right_on="e"), + ) + + list_eq( + await c.compute( + dd.merge( + ddr, + ddl, + how=how, + left_on="d", + right_on="a", + ) + ), + pd.merge(pdr, pdl, how=how, left_on="d", right_on="a"), + ) + list_eq( + await c.compute( + dd.merge( + ddr, + ddl, + how=how, + left_on="e", + right_on="b", + ) + ), + pd.merge(pdr, pdl, how=how, left_on="e", right_on="b"), + ) + + list_eq( + await c.compute( + dd.merge( + ddl, + ddr, + how=how, + left_on=["a", "b"], + right_on=["d", "e"], + ) + ), + pd.merge( + pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"] + ), + ) @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"]) @@ -473,19 +466,16 @@ async def test_index_merge_p2p(c, s, a, b, how): left = dd.from_pandas(pdf_left, npartitions=5, sort=False) right = dd.from_pandas(pdf_right, npartitions=6) - assert_eq( - await c.compute( - left.merge(right, how=how, left_index=True, right_on="a", shuffle="p2p") - ), - pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"), - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + assert_eq( + await c.compute(left.merge(right, how=how, left_index=True, right_on="a")), + pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"), + ) - assert_eq( - await c.compute( - right.merge(left, how=how, right_index=True, left_on="a", shuffle="p2p") - ), - pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"), - ) + assert_eq( + await c.compute(right.merge(left, how=how, right_index=True, left_on="a")), + pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"), + ) class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager): @@ -522,7 +512,8 @@ async def test_merge_does_not_deadlock_if_worker_joins(c, s, a): run_manager_A = a.plugins["shuffle"].shuffle_runs - joined = dd.merge(df1, df2, left_on="a", right_on="x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + joined = dd.merge(df1, df2, left_on="a", right_on="x") result = c.compute(joined) await run_manager_A.blocking_get_or_create.wait() diff --git a/distributed/shuffle/tests/test_merge_column_and_index.py b/distributed/shuffle/tests/test_merge_column_and_index.py index efe6ce86c9..b358650205 100644 --- a/distributed/shuffle/tests/test_merge_column_and_index.py +++ b/distributed/shuffle/tests/test_merge_column_and_index.py @@ -17,6 +17,7 @@ dd = pytest.importorskip("dask.dataframe") import pandas as pd +import dask from dask.dataframe.utils import assert_eq from distributed.shuffle import HashJoinP2PLayer, P2PShuffleLayer @@ -124,7 +125,8 @@ async def test_merge_known_to_unknown( expected = df_left.merge(df_right, on=on, how=how) # Perform merge - result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how, shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + result_graph = ddf_left.merge(ddf_right_unknown, on=on, how=how) result = await c.compute(result_graph) # Assertions assert_eq(result, expected) @@ -148,7 +150,8 @@ async def test_merge_unknown_to_known( expected = df_left.merge(df_right, on=on, how=how) # Perform merge - result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how, shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + result_graph = ddf_left_unknown.merge(ddf_right, on=on, how=how) result = await c.compute(result_graph) # Assertions @@ -173,9 +176,8 @@ async def test_merge_unknown_to_unknown( expected = df_left.merge(df_right, on=on, how=how) # Merge unknown to unknown - result_graph = ddf_left_unknown.merge( - ddf_right_unknown, on=on, how=how, shuffle="p2p" - ) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how) if not any( isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer)) for layer in result_graph.dask.layers.values() diff --git a/distributed/shuffle/tests/test_metrics.py b/distributed/shuffle/tests/test_metrics.py index eec8269352..965d469390 100644 --- a/distributed/shuffle/tests/test_metrics.py +++ b/distributed/shuffle/tests/test_metrics.py @@ -78,7 +78,8 @@ async def test_dataframe(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p", npartitions=20) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x", npartitions=20) await c.compute(shuffled) await a.heartbeat() await b.heartbeat() diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index b9864d62da..0d557c8801 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -132,8 +132,10 @@ async def test_minimal_version(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - with pytest.raises(ModuleNotFoundError, match="requires pyarrow"): - await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) + with pytest.raises( + ModuleNotFoundError, match="requires pyarrow" + ), dask.config.set({"dataframe.shuffle.method": "p2p"}): + await c.compute(dd.shuffle.shuffle(df, "x")) @pytest.mark.gpu @@ -158,7 +160,8 @@ async def test_basic_cudf_support(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ).to_backend("cudf") - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") assert shuffled.npartitions == df.npartitions result, expected = await c.compute([shuffled, df], sync=True) @@ -191,8 +194,10 @@ async def test_basic_integration(c, s, a, b, npartitions, disk): dtypes={"x": float, "y": float}, freq="10 s", ) - with dask.config.set({"distributed.p2p.disk": disk}): - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p", npartitions=npartitions) + with dask.config.set( + {"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk} + ): + shuffled = dd.shuffle.shuffle(df, "x", npartitions=npartitions) if npartitions is None: assert shuffled.npartitions == df.npartitions else: @@ -221,7 +226,8 @@ async def test_basic_integration_local_cluster(processes): freq="10 s", ) c = cluster.get_client() - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") x, y = c.compute([df, out]) x, y = await c.gather([x, y]) dd.assert_eq(x, y) @@ -236,7 +242,8 @@ async def test_shuffle_with_array_conversion(c, s, a, b, npartitions): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p", npartitions=npartitions).values + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x", npartitions=npartitions).values if npartitions == 1: # FIXME: distributed#7816 @@ -261,7 +268,8 @@ def test_shuffle_before_categorize(loop_in_thread): dtypes={"x": float, "y": str}, freq="10 s", ) - df = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + df = dd.shuffle.shuffle(df, "x") df.categorize(columns=["y"]) c.compute(df) @@ -274,8 +282,9 @@ async def test_concurrent(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") - y = dd.shuffle.shuffle(df, "y", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") + y = dd.shuffle.shuffle(df, "y") df, x, y = await c.compute([df, x, y], sync=True) dd.assert_eq(x, df, check_index=False) dd.assert_eq(y, df, check_index=False) @@ -293,7 +302,8 @@ async def test_bad_disk(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) while not get_active_shuffle_runs(a): @@ -381,7 +391,8 @@ async def test_closed_worker_during_transfer(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) await b.close() @@ -407,7 +418,8 @@ async def test_restarting_during_transfer_raises_killed_worker(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = c.compute(out.x.size) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) await b.close() @@ -449,7 +461,8 @@ async def test_get_or_create_from_dangling_transfer(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = c.compute(out.x.size) shuffle_extA = a.plugins["shuffle"] @@ -488,7 +501,8 @@ async def test_crashed_worker_during_transfer(c, s, a): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_until_worker_has_tasks( "shuffle-transfer", killed_worker_address, 1, s @@ -522,7 +536,8 @@ async def test_restarting_does_not_deadlock(c, s): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") fut = c.compute(out.x.size) await wait_until_worker_has_tasks( "shuffle-transfer", b.worker_address, 1, s @@ -557,7 +572,8 @@ def mock_get_worker_for_range_sharding( dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b, 0.001) await b.close() @@ -591,7 +607,8 @@ def mock_mock_get_worker_for_range_sharding( dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_until_worker_has_tasks( "shuffle-transfer", n.worker_address, 1, s @@ -616,7 +633,8 @@ async def test_closed_bystanding_worker_during_shuffle(c, s, w1, w2, w3): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, w1) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, w2) @@ -652,7 +670,8 @@ async def test_exception_on_close_cleans_up(c, s, caplog): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") await c.compute([shuffled, df], sync=True) assert any("test-exception-on-close" in record.message for record in caplog.records) @@ -683,7 +702,8 @@ async def test_closed_worker_during_barrier(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -745,7 +765,8 @@ async def test_restarting_during_barrier_raises_killed_worker(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = c.compute(out.x.size) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -789,7 +810,8 @@ async def test_closed_other_worker_during_barrier(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -851,7 +873,8 @@ async def test_crashed_other_worker_during_barrier(c, s, a): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) @@ -893,7 +916,8 @@ async def test_closed_worker_during_unpack(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b) await b.close() @@ -919,7 +943,8 @@ async def test_restarting_during_unpack_raises_killed_worker(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = c.compute(out.x.size) await wait_for_tasks_in_state("shuffle_p2p", "memory", 1, b) await b.close() @@ -945,7 +970,8 @@ async def test_crashed_worker_during_unpack(c, s, a): freq="10 s", ) expected = await c.compute(df) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") result = c.compute(shuffled) await wait_until_worker_has_tasks("shuffle_p2p", killed_worker_address, 1, s) @@ -969,7 +995,8 @@ async def test_heartbeat(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() while not s.plugins["shuffle"].heartbeats: @@ -1179,7 +1206,8 @@ async def test_head(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = await out.head(compute=False).persist() # Only ask for one key assert list(os.walk(a.local_directory)) == a_files # cleaned up files? @@ -1208,7 +1236,8 @@ async def test_clean_after_forgotten_early(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a) await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, b) @@ -1226,7 +1255,8 @@ async def test_tail(c, s, a, b): dtypes={"x": float, "y": float}, freq="1 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") full = await x.persist() ntasks_full = len(s.tasks) del full @@ -1257,7 +1287,8 @@ async def test_repeat_shuffle_instance(c, s, a, b, wait_until_forgotten): dtypes={"x": float, "y": float}, freq="100 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p").size + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x").size await c.compute(out) if wait_until_forgotten: @@ -1287,13 +1318,15 @@ async def test_repeat_shuffle_operation(c, s, a, b, wait_until_forgotten): dtypes={"x": float, "y": float}, freq="100 s", ) - await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + await c.compute(dd.shuffle.shuffle(df, "x")) if wait_until_forgotten: while s.tasks: await asyncio.sleep(0) - await c.compute(dd.shuffle.shuffle(df, "x", shuffle="p2p")) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + await c.compute(dd.shuffle.shuffle(df, "x")) await check_worker_cleanup(a) await check_worker_cleanup(b) @@ -1319,7 +1352,8 @@ def block(df, in_event, block_event): freq="100 s", seed=42, ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") in_event = Event() block_event = Event() with dask.annotate(workers=[n.worker_address], allow_other_workers=True): @@ -1351,7 +1385,8 @@ async def test_crashed_worker_after_shuffle_persisted(c, s, a): freq="10 s", seed=42, ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() await wait_until_worker_has_tasks("shuffle_p2p", n.worker_address, 1, s) @@ -1378,7 +1413,8 @@ async def test_closed_worker_between_repeats(c, s, w1, w2, w3): freq="100 s", seed=42, ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") await c.compute(out.head(compute=False)) await check_worker_cleanup(w1) @@ -1410,7 +1446,8 @@ async def test_new_worker(c, s, a, b): dtypes={"x": float, "y": float}, freq="1 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") persisted = shuffled.persist() while not s.plugins["shuffle"].active_shuffles: await asyncio.sleep(0.001) @@ -1441,8 +1478,8 @@ async def test_multi(c, s, a, b): ) left["id"] = (left["id"] * 1000000).astype(int) right["id"] = (right["id"] * 1000000).astype(int) - - out = left.merge(right, on="id", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = left.merge(right, on="id") out = await c.compute(out.size) assert out @@ -1463,9 +1500,11 @@ async def test_restrictions(c, s, a, b): assert a.data assert not b.data - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") + y = dd.shuffle.shuffle(df, "y") + x = x.persist(workers=b.address) - y = dd.shuffle.shuffle(df, "y", shuffle="p2p") y = y.persist(workers=a.address) await x @@ -1483,7 +1522,8 @@ async def test_delete_some_results(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist() + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x").persist() while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()): await asyncio.sleep(0.01) @@ -1504,7 +1544,8 @@ async def test_add_some_results(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") y = x.partitions[: x.npartitions // 2].persist() while not s.tasks or not any(ts.state == "memory" for ts in s.tasks.values()): @@ -1531,7 +1572,8 @@ async def test_clean_after_close(c, s, a, b): freq="100 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "executing", 1, a) @@ -1874,7 +1916,8 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten): dtypes={"x": float, "y": float}, freq="100 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") shuffled = shuffled.persist() shuffle_extA = a.plugins["shuffle"] @@ -1887,8 +1930,8 @@ async def test_deduplicate_stale_transfer(c, s, a, b, wait_until_forgotten): if wait_until_forgotten: while s.tasks or get_active_shuffle_runs(a) or get_active_shuffle_runs(b): await asyncio.sleep(0) - - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") result = c.compute(shuffled) await wait_until_new_shuffle_is_initialized(s) shuffle_extA.block_shuffle_receive.set() @@ -1925,7 +1968,8 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten): dtypes={"x": float, "y": float}, freq="100 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") shuffled = shuffled.persist() shuffle_extA = a.plugins["shuffle"] @@ -1944,7 +1988,8 @@ async def test_handle_stale_barrier(c, s, a, b, wait_until_forgotten): while s.tasks: await asyncio.sleep(0) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = dd.shuffle.shuffle(df, "x") fut = c.compute([shuffled, df], sync=True) await wait_until_new_shuffle_is_initialized(s) shuffle_extA.block_barrier.set() @@ -1982,7 +2027,8 @@ async def test_shuffle_run_consistency(c, s, a): freq="100 s", ) # Initialize first shuffle execution - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2005,7 +2051,8 @@ async def test_shuffle_run_consistency(c, s, a): worker_plugin.block_barrier.clear() # Initialize second shuffle execution - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() new_shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2031,7 +2078,8 @@ async def test_shuffle_run_consistency(c, s, a): worker_plugin.block_barrier.clear() # Create an unrelated shuffle on a different column - out = dd.shuffle.shuffle(df, "y", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "y") out = out.persist() independent_shuffle_id = await wait_until_new_shuffle_is_initialized(s) assert shuffle_id != independent_shuffle_id @@ -2071,7 +2119,8 @@ async def test_fail_fetch_race(c, s, a): dtypes={"x": float, "y": float}, freq="100 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2151,7 +2200,8 @@ async def test_replace_stale_shuffle(c, s, a, b): freq="100 s", ) # Initialize first shuffle execution - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() shuffle_id = await wait_until_new_shuffle_is_initialized(s) @@ -2176,7 +2226,8 @@ async def test_replace_stale_shuffle(c, s, a, b): run_manager_B.allow_fail = True # Initialize second shuffle execution - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = out.persist() await wait_for_tasks_in_state("shuffle-transfer", "memory", 1, a) @@ -2213,7 +2264,8 @@ async def test_handle_null_partitions(c, s, a, b): ] df = pd.DataFrame(data) ddf = dd.from_pandas(df, npartitions=2) - ddf = ddf.shuffle(on="id", shuffle="p2p", ignore_index=True) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.shuffle(on="id", ignore_index=True) result = await c.compute(ddf) dd.assert_eq(result, df) @@ -2231,7 +2283,8 @@ def make_partition(i): return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)}) ddf = dd.from_map(make_partition, range(50)) - out = ddf.shuffle(on="a", shuffle="p2p", ignore_index=True) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = ddf.shuffle(on="a", ignore_index=True) result, expected = c.compute([ddf, out]) del out result = await result @@ -2283,7 +2336,8 @@ def make_partition(i): return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)}) ddf = dd.from_map(make_partition, range(50)) - out = ddf.shuffle(on="a", shuffle="p2p", ignore_index=True) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = ddf.shuffle(on="a", ignore_index=True) if parse(pa.__version__) >= parse("14.0.0"): result, expected = c.compute([ddf, out]) @@ -2313,7 +2367,8 @@ def make_partition(i): return pd.DataFrame({"a": np.random.random(10), "b": np.random.random(10)}) ddf = dd.from_map(make_partition, range(50)) - out = ddf.shuffle(on="a", shuffle="p2p", ignore_index=True) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = ddf.shuffle(on="a", ignore_index=True) if parse(pa.__version__) >= parse("14.0.0"): with raises_with_cause( RuntimeError, @@ -2351,7 +2406,8 @@ async def test_handle_categorical_data(c, s, a, b): npartitions=2, ) df.b = df.b.astype("category") - shuffled = df.shuffle("a", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + shuffled = df.shuffle("a") result, expected = await c.compute([shuffled, df], sync=True) dd.assert_eq(result, expected, check_categorical=False) @@ -2386,7 +2442,8 @@ async def test_handle_floats_in_int_meta(c, s, a, b): async def test_set_index(c, s, *workers): df = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6, 7, 8], "b": 1}) ddf = dd.from_pandas(df, npartitions=3) - ddf = ddf.set_index("a", shuffle="p2p", divisions=(1, 3, 8)) + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a", divisions=(1, 3, 8)) assert ddf.npartitions == 2 result = await c.compute(ddf) dd.assert_eq(result, df.set_index("a")) @@ -2402,7 +2459,8 @@ def test_shuffle_with_existing_index(client): df, npartitions=4, ) - ddf = ddf.shuffle("a", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.shuffle("a") result = client.compute(ddf, sync=True) dd.assert_eq(result, df) @@ -2413,7 +2471,8 @@ def test_set_index_with_existing_index(client): df, npartitions=4, ) - ddf = ddf.set_index("a", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + ddf = ddf.set_index("a") result = client.compute(ddf, sync=True) dd.assert_eq(result, df.set_index("a")) @@ -2550,7 +2609,8 @@ async def test_flaky_connect_fails_without_retry(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") rpc = await FlakyConnectionPool(failing_connects=1) @@ -2580,7 +2640,8 @@ async def test_flaky_connect_recover_with_retry(c, s, a, b): dtypes={"x": float, "y": float}, freq="10 s", ) - x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + x = dd.shuffle.shuffle(df, "x") rpc = await FlakyConnectionPool(failing_connects=1) @@ -2614,7 +2675,8 @@ async def test_barrier_handles_stale_resumed_transfer(c, s, *workers): dtypes={"x": float, "y": float}, freq="10 s", ) - out = dd.shuffle.shuffle(df, "x", shuffle="p2p") + with dask.config.set({"dataframe.shuffle.method": "p2p"}): + out = dd.shuffle.shuffle(df, "x") out = c.compute(out) shuffle_id = await wait_until_new_shuffle_is_initialized(s) key = barrier_key(shuffle_id) diff --git a/distributed/tests/test_dask_collections.py b/distributed/tests/test_dask_collections.py index 8a9829c536..7bd8e69e28 100644 --- a/distributed/tests/test_dask_collections.py +++ b/distributed/tests/test_dask_collections.py @@ -150,7 +150,8 @@ def test_dataframe_set_index_sync(wait, client): df = df.persist() wait(df) - df2 = df.set_index("name", shuffle="tasks") + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + df2 = df.set_index("name") df2 = df2.persist() assert len(df2) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index af8321b64d..16d5f842df 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2660,12 +2660,12 @@ async def test_default_task_duration_splits(c, s, a, b): # verify that we're looking for the correct key npart = 10 df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart) - graph = df.shuffle( - "A", - shuffle="tasks", - # If we don't have enough partitions, we'll fall back to a simple shuffle - max_branch=npart - 1, - ).sum() + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + graph = df.shuffle( + "A", + # If we don't have enough partitions, we'll fall back to a simple shuffle + max_branch=npart - 1, + ).sum() fut = c.compute(graph) await wait(fut) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index f1e673333f..1153134d10 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1041,12 +1041,12 @@ async def test_blocklist_shuffle_split(c, s, a, b): dd = pytest.importorskip("dask.dataframe") npart = 10 df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart) - graph = df.shuffle( - "A", - shuffle="tasks", - # If we don't have enough partitions, we'll fall back to a simple shuffle - max_branch=npart - 1, - ).sum() + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + graph = df.shuffle( + "A", + # If we don't have enough partitions, we'll fall back to a simple shuffle + max_branch=npart - 1, + ).sum() res = c.compute(graph) while not s.tasks: diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2510da5b5a..f67cd4c26f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3311,7 +3311,8 @@ async def test_broken_comm(c, s, a, b): start="2000-01-01", end="2000-01-10", ) - s = df.shuffle("id", shuffle="tasks") + with dask.config.set({"dataframe.shuffle.method": "tasks"}): + s = df.shuffle("id") await c.compute(s.size)