Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dask/distributed into ucx-base-exce…
Browse files Browse the repository at this point in the history
…ption
  • Loading branch information
quasiben committed Sep 30, 2022
2 parents 582bc2c + 68e5a6a commit 8de1557
Show file tree
Hide file tree
Showing 35 changed files with 1,300 additions and 450 deletions.
67 changes: 58 additions & 9 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,20 @@ class ActiveMemoryManagerExtension:
``distributed.scheduler.active-memory-manager``.
"""

#: Back-reference to the scheduler holding this extension
scheduler: Scheduler
#: All active policies
policies: set[ActiveMemoryManagerPolicy]
#: Memory measure to use. Must be one of the attributes or properties of
#: :class:`distributed.scheduler.MemoryState`.
measure: str
#: Run automatically every this many seconds
interval: float

# These attributes only exist within the scope of self.run()
# Current memory (in bytes) allocated on each worker, plus/minus pending actions
#: Current memory (in bytes) allocated on each worker, plus/minus pending actions
#: This attribute only exist within the scope of self.run().
workers_memory: dict[WorkerState, int]
# Pending replications and deletions for each task
#: Pending replications and deletions for each task
#: This attribute only exist within the scope of self.run().
pending: dict[TaskState, tuple[set[WorkerState], set[WorkerState]]]

def __init__(
Expand All @@ -63,6 +69,7 @@ def __init__(
# away on the fly a specialized manager, separate from the main one.
policies: set[ActiveMemoryManagerPolicy] | None = None,
*,
measure: str | None = None,
register: bool = True,
start: bool | None = None,
interval: float | None = None,
Expand All @@ -83,6 +90,23 @@ def __init__(
for policy in policies:
self.add_policy(policy)

if not measure:
measure = dask.config.get(
"distributed.scheduler.active-memory-manager.measure"
)
mem = scheduler.memory
measure_domain = {
name
for name in dir(mem)
if not name.startswith("_") and isinstance(getattr(mem, name), int)
}
if not isinstance(measure, str) or measure not in measure_domain:
raise ValueError(
"distributed.scheduler.active-memory-manager.measure "
"must be one of " + ", ".join(sorted(measure_domain))
)
self.measure = measure

if register:
scheduler.extensions["amm"] = self
scheduler.handlers["amm_handler"] = self.amm_handler
Expand All @@ -92,6 +116,7 @@ def __init__(
dask.config.get("distributed.scheduler.active-memory-manager.interval")
)
self.interval = interval

if start is None:
start = dask.config.get("distributed.scheduler.active-memory-manager.start")
if start:
Expand Down Expand Up @@ -140,8 +165,9 @@ def run_once(self) -> None:
assert not hasattr(self, "pending")

self.pending = {}
measure = self.measure
self.workers_memory = {
w: w.memory.optimistic for w in self.scheduler.workers.values()
ws: getattr(ws.memory, measure) for ws in self.scheduler.workers.values()
}
try:
# populate self.pending
Expand Down Expand Up @@ -230,6 +256,10 @@ def log_reject(msg: str) -> None:
log_reject(f"ts.state = {ts.state}")
return None

if ts.actor:
log_reject("task is an actor")
return None

if candidates is None:
candidates = self.scheduler.running.copy()
else:
Expand Down Expand Up @@ -282,6 +312,10 @@ def log_reject(msg: str) -> None:
log_reject("less than 2 replicas exist")
return None

if ts.actor:
log_reject("task is an actor")
return None

if candidates is None:
candidates = ts.who_has.copy()
else:
Expand Down Expand Up @@ -591,11 +625,26 @@ def run(self) -> SuggestionGenerator:
self.manager.policies.remove(self)
return

if ws.actors:
logger.warning(
f"Tried retiring worker {self.address}, but it holds actor(s) "
f"{set(ws.actors)}, which can't be moved."
"The worker will not be retired."
)
self.no_recipients = True
self.manager.policies.remove(self)
return

nrepl = 0
nno_rec = 0

logger.debug("Retiring %s", ws)
for ts in ws.has_what:
if ts.actor:
# This is just a proxy Actor object; if there were any originals we
# would have stopped earlier
continue

if len(ts.who_has) > 1:
# There are already replicas of this key on other workers.
# Suggest dropping the replica from this worker.
Expand Down Expand Up @@ -663,10 +712,10 @@ def run(self) -> SuggestionGenerator:
def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise"""
if self not in self.manager.policies:
# Either the no_recipients flag has been raised, or there were no unique replicas
# as of the latest AMM run. Note that due to tasks transitioning from running to
# memory there may be some now; it's OK to lose them and just recompute them
# somewhere else.
# Either the no_recipients flag has been raised, or there were no unique
# replicas as of the latest AMM run. Note that due to tasks transitioning
# from running to memory there may be some now; it's OK to lose them and
# just recompute them somewhere else.
return True
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
Expand Down
20 changes: 16 additions & 4 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,11 @@ def scatter(
broadcast : bool (defaults to False)
Whether to send each data element to all workers.
By default we round-robin based on number of cores.
.. note::
Setting this flag to True is incompatible with the Active Memory
Manager's :ref:`ReduceReplicas` policy. If you wish to use it, you must
first disable the policy or disable the AMM entirely.
direct : bool (defaults to automatically check)
Whether or not to connect directly to the workers, or to ask
the scheduler to serve as intermediary. This can also be set when
Expand Down Expand Up @@ -3513,12 +3518,17 @@ def replicate(self, futures, n=None, workers=None, branching_factor=2, **kwargs)
"""Set replication of futures within network
Copy data onto many workers. This helps to broadcast frequently
accessed data and it helps to improve resilience.
accessed data and can improve resilience.
This performs a tree copy of the data throughout the network
individually on each piece of data. This operation blocks until
complete. It does not guarantee replication of data to future workers.
.. note::
This method is incompatible with the Active Memory Manager's
:ref:`ReduceReplicas` policy. If you wish to use it, you must first disable
the policy or disable the AMM entirely.
Parameters
----------
futures : list of futures
Expand Down Expand Up @@ -4741,16 +4751,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
Parameters
----------
fs : List[Future]
timeout : number, optional
Time in seconds after which to raise a
``dask.distributed.TimeoutError``
timeout : number, string, optional
Time after which to raise a ``dask.distributed.TimeoutError``.
Can be a string like ``"10 minutes"`` or a number of seconds to wait.
return_when : str, optional
One of `ALL_COMPLETED` or `FIRST_COMPLETED`
Returns
-------
Named tuple of completed, not completed
"""
if timeout is not None and isinstance(timeout, (Number, str)):
timeout = parse_timedelta(timeout, default="s")
client = default_client()
result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
return result
Expand Down
6 changes: 3 additions & 3 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
logging_names.update(logging._levelToName) # type: ignore
logging_names.update(logging._nameToLevel) # type: ignore

LINUX = sys.platform == "linux"
MACOS = sys.platform == "darwin"
WINDOWS = sys.platform == "win32"
LINUX: bool = sys.platform == "linux"
MACOS: bool = sys.platform == "darwin"
WINDOWS: bool = sys.platform == "win32"


if sys.version_info >= (3, 9):
Expand Down
2 changes: 1 addition & 1 deletion distributed/deploy/adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Adaptive(AdaptiveCore):
:meth:`Adaptive.workers_to_close` to control when the cluster should be
resized. The default implementation checks if there are too many tasks
per worker or too little memory available (see
:meth:`Scheduler.adaptive_target`).
:meth:`distributed.Scheduler.adaptive_target`).
The values for interval, min, max, wait_count and target_duration can be
specified in the dask config under the distributed.adaptive key.
'''
Expand Down
42 changes: 34 additions & 8 deletions distributed/diskutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import shutil
import stat
import sys
import tempfile
import weakref
from typing import ClassVar
Expand Down Expand Up @@ -115,21 +116,46 @@ class WorkSpace:
this will be detected and the directories purged.
"""

base_dir: str
_global_lock_path: str
_purge_lock_path: str

# Keep track of all locks known to this process, to avoid several
# WorkSpaces to step on each other's toes
_known_locks: ClassVar[set[str]] = set()

def __init__(self, base_dir):
self.base_dir = os.path.abspath(base_dir)
self._init_workspace()
def __init__(self, base_dir: str):
self.base_dir = self._init_workspace(base_dir)
self._global_lock_path = os.path.join(self.base_dir, "global.lock")
self._purge_lock_path = os.path.join(self.base_dir, "purge.lock")

def _init_workspace(self):
try:
os.mkdir(self.base_dir)
except FileExistsError:
pass
def _init_workspace(self, base_dir: str) -> str:
"""Create base_dir if it doesn't exist.
If base_dir already exists but it's not writeable, change the name.
"""
base_dir = os.path.abspath(base_dir)
try_dirs = [base_dir]
# Note: can't use WINDOWS constant as it upsets mypy
if sys.platform != "win32":
# - os.getlogin() raises OSError on containerized environments
# - os.getuid() does not exist in Windows
try_dirs.append(f"{base_dir}-{os.getuid()}")

for try_dir in try_dirs:
try:
os.makedirs(try_dir)
except FileExistsError:
try:
with tempfile.TemporaryFile(dir=try_dir):
pass
except PermissionError:
continue
return try_dir

# If we reached this, we're likely in a containerized environment where /tmp
# has been shared between containers through a mountpoint, every container
# has an external $UID, but the internal one is the same for all.
return tempfile.mkdtemp(prefix=base_dir + "-")

def _global_lock(self, **kwargs):
return locket.lock_file(self._global_lock_path, **kwargs)
Expand Down
17 changes: 15 additions & 2 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ properties:
active-memory-manager:
type: object
required: [start, interval, policies]
required: [start, interval, measure, policies]
additionalProperties: false
properties:
start:
Expand All @@ -287,6 +287,14 @@ properties:
type: string
description:
Time expression, e.g. "2s". Run the AMM cycle every <interval>.
measure:
enum:
- process
- optimistic
- managed
- managed_in_memory
description:
One of the attributes of distributed.scheduler.MemoryState
policies:
type: array
items:
Expand Down Expand Up @@ -349,7 +357,12 @@ properties:
- string
- integer
description: |
The maximum size of a message sent between workers
The maximum amount of data for a worker to request from another in a single gather operation
Tasks are gathered in batches, and if the first task in a batch is larger than this value,
the task will still be gathered to ensure progress. Hence, this limit is not absolute.
Note that this limit applies to a single gather operation and a worker may gather data from
multiple workers in parallel.
connections:
type: object
description: |
Expand Down
12 changes: 9 additions & 3 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ distributed:
# Set to true to auto-start the Active Memory Manager on Scheduler start; if false
# you'll have to either manually start it with client.amm.start() or run it once
# with client.amm.run_once().
start: false
start: true

# Once started, run the AMM cycle every <interval>
interval: 2s

# Memory measure to use. Must be one of the attributes of
# distributed.scheduler.MemoryState.
measure: optimistic

# Policies that should be executed at every cycle. Any additional keys in each
# object are passed as keyword arguments to the policy constructor.
policies:
# Policies that should be executed at every cycle. Any additional keys in each
# object are passed as keyword arguments to the policy constructor.
- class: distributed.active_memory_manager.ReduceReplicas

worker:
Expand Down
7 changes: 4 additions & 3 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
error_message,
)
from distributed.diagnostics.plugin import _get_plugin_name
from distributed.diskutils import WorkSpace
from distributed.metrics import time
from distributed.node import ServerNode
from distributed.process import AsyncProcess
Expand Down Expand Up @@ -179,9 +180,9 @@ def __init__( # type: ignore[no-untyped-def]
else:
self._original_local_dir = local_directory

self.local_directory = local_directory
if not os.path.exists(self.local_directory):
os.makedirs(self.local_directory, exist_ok=True)
# Create directory if it doesn't exist and test for write access.
# In case of PermissionError, change the name.
self.local_directory = WorkSpace(local_directory).base_dir

self.preload = preload
if self.preload is None:
Expand Down
6 changes: 2 additions & 4 deletions distributed/protocol/tests/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ def test_serialize_numba(shape, dtype, order, serializers):
elif serializers[0] == "dask":
assert all(isinstance(f, memoryview) for f in frames)

hx = np.empty_like(ary)
hy = np.empty_like(ary)
x.copy_to_host(hx)
y.copy_to_host(hy)
hx = x.copy_to_host()
hy = y.copy_to_host()
assert (hx == hy).all()


Expand Down
4 changes: 2 additions & 2 deletions distributed/protocol/tests/test_serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
from distributed.protocol.serialize import check_dask_serializable
from distributed.utils import ensure_memoryview, nbytes
from distributed.utils_test import gen_test, inc
from distributed.utils_test import NO_AMM, gen_test, inc


class MyObj:
Expand Down Expand Up @@ -208,7 +208,7 @@ async def test_object_in_graph(c, s, a, b):
assert result.data == 123


@gen_cluster(client=True)
@gen_cluster(client=True, config=NO_AMM)
async def test_scatter(c, s, a, b):
o = MyObj(123)
[future] = await c._scatter([o])
Expand Down
3 changes: 2 additions & 1 deletion distributed/pytest_resourceleaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def measure(self) -> int:
if sys.platform == "win32":
# Don't use num_handles(); you'll get tens of thousands of reported leaks
return 0
return psutil.Process().num_fds()
else:
return psutil.Process().num_fds()

def has_leak(self, before: int, after: int) -> bool:
return after > before
Expand Down
Loading

0 comments on commit 8de1557

Please sign in to comment.