Skip to content

Commit

Permalink
Avoid deprecated shuffle keyword (#8439)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Jan 4, 2024
1 parent 81774d4 commit e8e9f70
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 357 deletions.
3 changes: 2 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,8 @@ async def test_shuffling(c, s, a, b):
dtypes={"x": float, "y": float},
freq="10 s",
)
df2 = dd.shuffle.shuffle(df, "x", shuffle="p2p").persist()
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
df2 = dd.shuffle.shuffle(df, "x").persist()
start = time()
while not ss.source.data["comm_written"]:
ss.update()
Expand Down
6 changes: 3 additions & 3 deletions distributed/protocol/tests/test_highlevelgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def add(x, y, z, extra_arg):
)

df = dd.from_pandas(pd.DataFrame({"a": np.arange(3)}), npartitions=3)
df = df.shuffle("a", shuffle="tasks")
df = df.shuffle("a")
df = df["a"].to_dask_array()

res = x.sum() + df.sum()
Expand Down Expand Up @@ -84,7 +84,7 @@ async def test_shuffle(c, s, a, b):
),
npartitions=5,
)
df = df.shuffle("a", shuffle="tasks", max_branch=2)
df = df.shuffle("a", max_branch=2)
df = df["a"] + df["b"]
res = await c.compute(df, optimize_graph=False)
assert res.dtypes == np.float64
Expand Down Expand Up @@ -169,7 +169,7 @@ async def test_dataframe_annotations(c, s, a, b):
),
npartitions=5,
)
df = df.shuffle("a", shuffle="tasks", max_branch=2)
df = df.shuffle("a", max_branch=2)
acol = df["a"]
bcol = df["b"]

Expand Down
47 changes: 31 additions & 16 deletions distributed/shuffle/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
def test_basic(client):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
shuffled = df.shuffle("id", shuffle="p2p")
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
p2p_shuffled = df.shuffle("id")

(opt,) = dask.optimize(shuffled)
(opt,) = dask.optimize(p2p_shuffled)
assert isinstance(hlg_layer_topological(opt.dask, 0), Blockwise)
# blockwise -> barrier -> unpack -> drop_by_shallow_copy

dd.utils.assert_eq(shuffled, df.shuffle("id", shuffle="tasks"), scheduler=client)
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
tasks_shuffled = df.shuffle("id")
dd.utils.assert_eq(p2p_shuffled, tasks_shuffled, scheduler=client)
# ^ NOTE: this works because `assert_eq` sorts the rows before comparing


Expand All @@ -36,8 +39,8 @@ def test_raise_on_complex_numbers(dtype):
)
with pytest.raises(
TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
):
df.shuffle("x", shuffle="p2p")
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")


@pytest.mark.xfail(
Expand All @@ -52,34 +55,42 @@ def __init__(self, value: int) -> None:
pd.DataFrame({"x": pd.array([Stub(i) for i in range(10)], dtype="object")}),
npartitions=5,
)
with pytest.raises(TypeError, match="p2p does not support custom objects"):
df.shuffle("x", shuffle="p2p")
with pytest.raises(
TypeError, match="p2p does not support custom objects"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")


def test_raise_on_sparse_data():
df = dd.from_pandas(
pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5
)
with pytest.raises(TypeError, match="p2p does not support sparse data"):
df.shuffle("x", shuffle="p2p")
with pytest.raises(
TypeError, match="p2p does not support sparse data"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")


def test_raise_on_non_string_column_name():
df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5)
with pytest.raises(TypeError, match="p2p requires all column names to be str"):
df.shuffle("a", shuffle="p2p")
with pytest.raises(
TypeError, match="p2p requires all column names to be str"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("a")


def test_does_not_raise_on_stringified_numeric_column_name():
df = dd.from_pandas(pd.DataFrame({"a": range(10), "1": range(10)}), npartitions=5)
df.shuffle("a", shuffle="p2p")
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("a")


@gen_cluster([("", 2)] * 4, client=True)
async def test_basic_state(c, s, *workers):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
shuffled = df.shuffle("id", shuffle="p2p")
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = df.shuffle("id")

exts = [w.extensions["shuffle"] for w in workers]
for ext in exts:
Expand All @@ -102,14 +113,18 @@ async def test_basic_state(c, s, *workers):
def test_multiple_linear(client):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
s1 = df.shuffle("id", shuffle="p2p")
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
s1 = df.shuffle("id")
s1["x"] = s1["x"] + 1
s2 = s1.shuffle("x", shuffle="p2p")
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
s2 = s1.shuffle("x")

with dask.config.set({"dataframe.shuffle.method": "tasks"}):
expected = df.assign(x=lambda df: df.x + 1).shuffle("x")
# TODO eventually test for fusion between s1's unpacks, the `+1`, and s2's `transfer`s

dd.utils.assert_eq(
s2,
df.assign(x=lambda df: df.x + 1).shuffle("x", shuffle="tasks"),
expected,
scheduler=client,
)
Loading

0 comments on commit e8e9f70

Please sign in to comment.