Skip to content

Commit

Permalink
O(1) rebalance (#4774)
Browse files Browse the repository at this point in the history
* partial prototype

* incomplete poc

* poc (incomplete)

* complete POC

* polish

* polish

* bugfix

* fixes

* fix

* Use arbitrary measure in rebalance

* Code review

* renames

* suggest tweaking malloc_trim

* self-review

* test_tls_functional

* test_memory to use gen_cluster

* test_memory to use gen_cluster

* half memory

* tests

* tests

* tests

* tests

* make Cython happy

* test_rebalance_managed_memory

* tests

* robustness

* improve test stability

* tests stability

* trivial

* reload dask.config on Scheduler.__init__

* code review
  • Loading branch information
crusaderky authored Jun 1, 2021
1 parent 95a0689 commit 9d4f0bf
Show file tree
Hide file tree
Showing 8 changed files with 893 additions and 234 deletions.
9 changes: 6 additions & 3 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3057,11 +3057,14 @@ def upload_file(self, filename, **kwargs):
)

async def _rebalance(self, futures=None, workers=None):
await _wait(futures)
keys = list({stringify(f.key) for f in self.futures_of(futures)})
if futures is not None:
await _wait(futures)
keys = list({stringify(f.key) for f in self.futures_of(futures)})
else:
keys = None
result = await self.scheduler.rebalance(keys=keys, workers=workers)
if result["status"] == "missing-data":
raise ValueError(
raise KeyError(
f"During rebalance {len(result['keys'])} keys were found to be missing"
)
assert result["status"] == "OK"
Expand Down
42 changes: 41 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,54 @@ properties:
description: >-
Settings for memory management
properties:
recent_to_old_time:
recent-to-old-time:
type: string
description: >-
When there is an increase in process memory (as observed by the
operating system) that is not accounted for by the dask keys stored on
the worker, ignore it for this long before considering it in
non-time-sensitive heuristics. This should be set to be longer than
the duration of most dask tasks.
rebalance:
type: object
description: >-
Settings for memory rebalance operations
properties:
measure:
enum:
- process
- optimistic
- managed
- managed_in_memory
description: >-
Which of the properties of distributed.scheduler.MemoryState
should be used for measuring worker memory usage
sender-min:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we start potentially
transferring data to other workers.
recipient-max:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we stop potentially
receiving data from other workers. Ignored when max_memory is not
set.
sender-recipient-gap:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory, around the cluster mean, where
a worker is neither a sender nor a recipient of data during a
rebalance operation. E.g. if the mean cluster occupation is 50%,
sender-recipient-gap=0.1 means that only nodes above 55% will
donate data and only nodes below 45% will receive them. This helps
avoid data from bouncing around the cluster repeatedly.
target:
oneOf:
Expand Down
38 changes: 37 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,43 @@ distributed:
# system) that is not accounted for by the dask keys stored on the worker, ignore
# it for this long before considering it in non-critical memory measures.
# This should be set to be longer than the duration of most dask tasks.
recent_to_old_time: 30s
recent-to-old-time: 30s

rebalance:
# Memory measure to rebalance upon. Possible choices are:
# process
# Total process memory, as measured by the OS.
# optimistic
# Managed by dask (instantaneous) + unmanaged (without any increases
# happened in the last <distributed.worker.memory.recent-to-old-time>).
# Recommended for use on CPython with large (2MiB+) numpy-based data chunks.
# managed_in_memory
# Only consider the data allocated by dask in RAM. Recommended if RAM is not
# released in a timely fashion back to the OS after the Python objects are
# dereferenced, but remains available for reuse by PyMalloc.
#
# If this is your problem on Linux, you should alternatively consider
# setting the MALLOC_TRIM_THRESHOLD_ environment variable (note the final
# underscore) to a low value; refer to the mallopt man page and to the
# comments about M_TRIM_THRESHOLD on
# https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c
# managed
# Only consider data allocated by dask, including that spilled to disk.
# Recommended if disk occupation of the spill file is an issue.
measure: optimistic
# Fraction of worker process memory at which we start potentially sending
# data to other workers. Ignored when max_memory is not set.
sender-min: 0.30
# Fraction of worker process memory at which we stop potentially accepting
# data from other workers. Ignored when max_memory is not set.
recipient-max: 0.60
# Fraction of worker process memory, around the cluster mean, where a worker is
# neither a sender nor a recipient of data during a rebalance operation. E.g. if
# the mean cluster occupation is 50%, sender-recipient-gap=0.10 means that only
# nodes above 55% will donate data and only nodes below 45% will receive them.
# This helps avoid data from bouncing around the cluster repeatedly.
# Ignored when max_memory is not set.
sender-recipient-gap: 0.10

# Fractions of worker process memory at which we take action to avoid memory
# blowup. Set any of the values to False to turn off the behavior entirely.
Expand Down
Loading

0 comments on commit 9d4f0bf

Please sign in to comment.