Skip to content

Commit

Permalink
Mark shuffle fast tasks in dask-expr
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 8, 2024
1 parent f595c9e commit a112432
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ jobs:
# Increase this value to reset cache if
# continuous_integration/environment-${{ matrix.environment }}.yaml has not
# changed. See also same variable in .pre-commit-config.yaml
CACHE_NUMBER: 1
CACHE_NUMBER: 2
id: cache

- name: Update environment
Expand Down
2 changes: 2 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ distributed:
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
rechunk-split: 1us
split-shuffle: 1us
split-taskshuffle: 1us
split-stage: 1us
validate: False # Check scheduler state at every step for debugging
dashboard:
status:
Expand Down
6 changes: 5 additions & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,4 +542,8 @@ def _get_thief(
return min(potential_thieves, key=partial(scheduler.worker_objective, ts))


fast_tasks = {"split-shuffle"}
fast_tasks = {

Check warning on line 545 in distributed/stealing.py

View check run for this annotation

Codecov / codecov/patch

distributed/stealing.py#L545

Added line #L545 was not covered by tests
k
for k, v in dask.config.get("distributed.scheduler.default-task-durations").items()
if parse_timedelta(v) <= 0.001
}
11 changes: 5 additions & 6 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2783,12 +2783,11 @@ async def test_default_task_duration_splits(c, s, a, b):
await wait(fut)

split_prefix = [pre for pre in s.task_prefixes.keys() if "split" in pre]
assert len(split_prefix) == 1
split_prefix = split_prefix[0]
default_time = parse_timedelta(
dask.config.get("distributed.scheduler.default-task-durations")[split_prefix]
)
assert default_time <= 1e-6
assert split_prefix
default_times = dask.config.get("distributed.scheduler.default-task-durations")
for p in split_prefix:
default_time = parse_timedelta(default_times[p])
assert default_time <= 1e-6


@gen_test()
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,10 +1027,10 @@ async def test_blocklist_shuffle_split(c, s, a, b):

while not s.tasks:
await asyncio.sleep(0.005)
prefixes = set(s.task_prefixes.keys())

from distributed.stealing import fast_tasks

blocked = fast_tasks & prefixes
blocked = fast_tasks & s.task_prefixes.keys()
assert blocked
assert any(["split" in prefix for prefix in blocked])

Expand Down

0 comments on commit a112432

Please sign in to comment.