Skip to content

Commit

Permalink
Merge branch 'main' into dask-expr-durations
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Mar 8, 2024
2 parents 87f96ed + e16a7af commit dfbadf1
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 237 deletions.
2 changes: 1 addition & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ async def test_shuffling(c, s, a, b):
freq="10 s",
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
df2 = dd.shuffle.shuffle(df, "x").persist()
df2 = df.shuffle("x").persist()
start = time()
while not ss.source.data["comm_written"]:
ss.update()
Expand Down
15 changes: 12 additions & 3 deletions distributed/protocol/tests/test_highlevelgraph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import contextlib

import pytest

np = pytest.importorskip("numpy")
Expand Down Expand Up @@ -173,7 +175,13 @@ async def test_dataframe_annotations(c, s, a, b):
acol = df["a"]
bcol = df["b"]

with dask.annotate(retries=retries):
ctx = contextlib.nullcontext()
if dd._dask_expr_enabled():
ctx = pytest.warns(
UserWarning, match="Annotations will be ignored when using query-planning"
)

with dask.annotate(retries=retries), ctx:
df = acol + bcol

with dask.config.set(optimization__fuse__active=False):
Expand All @@ -182,5 +190,6 @@ async def test_dataframe_annotations(c, s, a, b):
assert rdf.dtypes == np.float64
assert (rdf == 10.0).all()

# There is an annotation match per partition (i.e. task)
assert plugin.retry_matches == df.npartitions
if not dd._dask_expr_enabled():
# There is an annotation match per partition (i.e. task)
assert plugin.retry_matches == df.npartitions
9 changes: 2 additions & 7 deletions distributed/shuffle/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
import pytest

pd = pytest.importorskip("pandas")
pytest.importorskip("dask.dataframe")
dd = pytest.importorskip("dask.dataframe")
pytest.importorskip("pyarrow")

import dask
import dask.dataframe as dd
from dask.blockwise import Blockwise
from dask.utils_test import hlg_layer_topological

from distributed.utils_test import gen_cluster


@pytest.mark.skipif(condition=dd._dask_expr_enabled(), reason="no HLG")
def test_basic(client):
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D")
df["name"] = df["name"].astype("string[python]")
Expand All @@ -26,11 +26,6 @@ def test_basic(client):
assert isinstance(hlg_layer_topological(opt.dask, 0), Blockwise)
# blockwise -> barrier -> unpack -> drop_by_shallow_copy

with dask.config.set({"dataframe.shuffle.method": "tasks"}):
tasks_shuffled = df.shuffle("id")
dd.utils.assert_eq(p2p_shuffled, tasks_shuffled, scheduler=client)
# ^ NOTE: this works because `assert_eq` sorts the rows before comparing


@pytest.mark.parametrize("dtype", ["csingle", "cdouble", "clongdouble"])
def test_raise_on_complex_numbers(dtype):
Expand Down
187 changes: 52 additions & 135 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from distributed import Worker
from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key
from distributed.shuffle._merge import hash_join
from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager
from distributed.utils_test import gen_cluster

Expand All @@ -21,7 +20,8 @@
import dask
from dask.dataframe._compat import PANDAS_GE_200, tm
from dask.dataframe.utils import assert_eq
from dask.utils_test import hlg_layer_topological

from distributed import get_client

try:
import pyarrow as pa
Expand All @@ -31,15 +31,10 @@
pytestmark = pytest.mark.ci1


def list_eq(aa, bb):
if isinstance(aa, dd.DataFrame):
a = aa.compute(scheduler="sync")
else:
a = aa
if isinstance(bb, dd.DataFrame):
b = bb.compute(scheduler="sync")
else:
b = bb
async def list_eq(a, b):
c = get_client()
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
b = await c.compute(b) if isinstance(b, dd.DataFrame) else b
tm.assert_index_equal(a.columns, b.columns)

if isinstance(a, pd.DataFrame):
Expand All @@ -52,6 +47,7 @@ def list_eq(aa, bb):
dd._compat.assert_numpy_array_equal(av, bv)


@pytest.mark.skipif(dd._dask_expr_enabled(), reason="pyarrow>=7.0.0 already required")
@gen_cluster(client=True)
async def test_minimal_version(c, s, a, b):
no_pyarrow_ctx = (
Expand Down Expand Up @@ -81,30 +77,32 @@ async def test_basic_merge(c, s, a, b, how):
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])

joined = hash_join(a, "y", b, "y", how)
joined = a.merge(b, left_on="y", right_on="y", how=how)

if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P

assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)

assert not hlg_layer_topological(joined.dask, -1).is_materialized()
result = await c.compute(joined)
expected = pd.merge(A, B, how, "y")
list_eq(result, expected)
await list_eq(joined, expected)

# Different columns and npartitions
joined = hash_join(a, "x", b, "z", "outer", npartitions=3)
assert not hlg_layer_topological(joined.dask, -1).is_materialized()
assert joined.npartitions == 3
joined = a.merge(b, left_on="x", right_on="z", how="outer")

result = await c.compute(joined)
expected = pd.merge(A, B, "outer", None, "x", "z")

list_eq(result, expected)
await list_eq(joined, expected)

assert (
hash_join(a, "y", b, "y", "inner")._name
== hash_join(a, "y", b, "y", "inner")._name
a.merge(b, left_on="y", right_on="y", how="inner")._name
== a.merge(b, left_on="y", right_on="y", how="inner")._name
)
assert (
hash_join(a, "y", b, "y", "inner")._name
!= hash_join(a, "y", b, "y", "outer")._name
a.merge(b, left_on="y", right_on="y", how="inner")._name
!= a.merge(b, left_on="y", right_on="y", how="outer")._name
)


Expand Down Expand Up @@ -188,82 +186,41 @@ async def test_merge(c, s, a, b, how, disk):
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
joined = dd.merge(a, b, on="y", how=how)
result = await c.compute(joined)
list_eq(result, pd.merge(A, B, on="y", how=how))
await list_eq(joined, pd.merge(A, B, on="y", how=how))
assert all(d is None for d in joined.divisions)

list_eq(
await c.compute(dd.merge(a, b, left_on="x", right_on="z", how=how)),
await list_eq(
dd.merge(a, b, left_on="x", right_on="z", how=how),
pd.merge(A, B, left_on="x", right_on="z", how=how),
)
list_eq(
await c.compute(
dd.merge(
a,
b,
left_on="x",
right_on="z",
how=how,
suffixes=("1", "2"),
)
),
await list_eq(
dd.merge(a, b, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
pd.merge(A, B, left_on="x", right_on="z", how=how, suffixes=("1", "2")),
)

list_eq(
await c.compute(dd.merge(a, b, how=how)),
pd.merge(A, B, how=how),
)
list_eq(
await c.compute(dd.merge(a, B, how=how)),
pd.merge(A, B, how=how),
)
list_eq(
await c.compute(dd.merge(A, b, how=how)),
pd.merge(A, B, how=how),
)
# Note: No await since A and B are both pandas dataframes and this doesn't
# actually submit anything
list_eq(
c.compute(dd.merge(A, B, how=how)),
pd.merge(A, B, how=how),
)

list_eq(
await c.compute(dd.merge(a, b, left_index=True, right_index=True, how=how)),
await list_eq(dd.merge(a, b, how=how), pd.merge(A, B, how=how))
await list_eq(dd.merge(a, B, how=how), pd.merge(A, B, how=how))
await list_eq(dd.merge(A, b, how=how), pd.merge(A, B, how=how))
await list_eq(dd.merge(A, B, how=how), pd.merge(A, B, how=how))
await list_eq(
dd.merge(a, b, left_index=True, right_index=True, how=how),
pd.merge(A, B, left_index=True, right_index=True, how=how),
)
list_eq(
await c.compute(
dd.merge(
a,
b,
left_index=True,
right_index=True,
how=how,
suffixes=("1", "2"),
)
await list_eq(
dd.merge(
a, b, left_index=True, right_index=True, how=how, suffixes=("1", "2")
),
pd.merge(
A, B, left_index=True, right_index=True, how=how, suffixes=("1", "2")
),
)

list_eq(
await c.compute(dd.merge(a, b, left_on="x", right_index=True, how=how)),
await list_eq(
dd.merge(a, b, left_on="x", right_index=True, how=how),
pd.merge(A, B, left_on="x", right_index=True, how=how),
)
list_eq(
await c.compute(
dd.merge(
a,
b,
left_on="x",
right_index=True,
how=how,
suffixes=("1", "2"),
)
),
await list_eq(
dd.merge(a, b, left_on="x", right_index=True, how=how, suffixes=("1", "2")),
pd.merge(A, B, left_on="x", right_index=True, how=how, suffixes=("1", "2")),
)

Expand Down Expand Up @@ -391,65 +348,25 @@ async def test_merge_by_multiple_columns(c, s, a, b, how):
)

# hash join
list_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_on="a",
right_on="d",
)
),
await list_eq(
dd.merge(ddl, ddr, how=how, left_on="a", right_on="d"),
pd.merge(pdl, pdr, how=how, left_on="a", right_on="d"),
)
list_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_on="b",
right_on="e",
)
),
await list_eq(
dd.merge(ddl, ddr, how=how, left_on="b", right_on="e"),
pd.merge(pdl, pdr, how=how, left_on="b", right_on="e"),
)

list_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_on="d",
right_on="a",
)
),
await list_eq(
dd.merge(ddr, ddl, how=how, left_on="d", right_on="a"),
pd.merge(pdr, pdl, how=how, left_on="d", right_on="a"),
)
list_eq(
await c.compute(
dd.merge(
ddr,
ddl,
how=how,
left_on="e",
right_on="b",
)
),
await list_eq(
dd.merge(ddr, ddl, how=how, left_on="e", right_on="b"),
pd.merge(pdr, pdl, how=how, left_on="e", right_on="b"),
)

list_eq(
await c.compute(
dd.merge(
ddl,
ddr,
how=how,
left_on=["a", "b"],
right_on=["d", "e"],
)
await list_eq(
dd.merge(
ddl, ddr, how=how, left_on=["a", "b"], right_on=["d", "e"]
),
pd.merge(
pdl, pdr, how=how, left_on=["a", "b"], right_on=["d", "e"]
Expand Down
6 changes: 0 additions & 6 deletions distributed/shuffle/tests/test_merge_column_and_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import dask
from dask.dataframe.utils import assert_eq

from distributed.shuffle import HashJoinP2PLayer, P2PShuffleLayer
from distributed.utils_test import gen_cluster


Expand Down Expand Up @@ -178,11 +177,6 @@ async def test_merge_unknown_to_unknown(
# Merge unknown to unknown
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
if not any(
isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
for layer in result_graph.dask.layers.values()
):
pytest.skip("No HashJoin or P2P layer involved")
result = await c.compute(result_graph)
# Assertions
assert_eq(result, expected)
Expand Down
12 changes: 7 additions & 5 deletions distributed/shuffle/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from distributed.utils_test import gen_cluster

da = pytest.importorskip("dask.array")
dd = pytest.importorskip("dask.dataframe")
from distributed.shuffle.tests.utils import UNPACK_PREFIX


def assert_metrics(s: Scheduler, *keys: tuple[str, ...]) -> None:
Expand Down Expand Up @@ -79,7 +81,7 @@ async def test_dataframe(c, s, a, b):
freq="10 s",
)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = dd.shuffle.shuffle(df, "x", npartitions=20)
shuffled = df.shuffle("x", npartitions=20)
await c.compute(shuffled)
await a.heartbeat()
await b.heartbeat()
Expand All @@ -91,10 +93,10 @@ async def test_dataframe(c, s, a, b):
("execute", "shuffle-transfer", "p2p-shards", "bytes"),
("execute", "shuffle-transfer", "p2p-shards", "count"),
("execute", "shuffle-transfer", "p2p-comms-limiter", "count"),
("execute", "shuffle_p2p", "p2p-disk-read", "bytes"),
("execute", "shuffle_p2p", "p2p-disk-read", "count"),
("execute", "shuffle_p2p", "p2p-get-output-cpu", "seconds"),
("execute", "shuffle_p2p", "p2p-get-output-noncpu", "seconds"),
("execute", UNPACK_PREFIX, "p2p-disk-read", "bytes"),
("execute", UNPACK_PREFIX, "p2p-disk-read", "count"),
("execute", UNPACK_PREFIX, "p2p-get-output-cpu", "seconds"),
("execute", UNPACK_PREFIX, "p2p-get-output-noncpu", "seconds"),
("p2p", "background-comms", "compress", "seconds"),
("p2p", "background-comms", "idle", "seconds"),
("p2p", "background-comms", "process", "bytes"),
Expand Down
Loading

0 comments on commit dfbadf1

Please sign in to comment.