-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encapsulate spill buffer and memory_monitor #5904
Changes from all commits
1a0548b
47b4ea1
4918d65
ae9d89f
b82da4d
5883c96
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,11 +13,11 @@ | |
from inspect import isawaitable | ||
from queue import Empty | ||
from time import sleep as sync_sleep | ||
from typing import TYPE_CHECKING, ClassVar, Literal | ||
from typing import TYPE_CHECKING, ClassVar | ||
|
||
import psutil | ||
from tornado import gen | ||
from tornado.ioloop import IOLoop, PeriodicCallback | ||
from tornado.ioloop import IOLoop | ||
|
||
import dask | ||
from dask.system import CPU_COUNT | ||
|
@@ -49,7 +49,12 @@ | |
parse_ports, | ||
silence_logging, | ||
) | ||
from distributed.worker import Worker, parse_memory_limit, run | ||
from distributed.worker import Worker, run | ||
from distributed.worker_memory import ( | ||
DeprecatedMemoryManagerAttribute, | ||
DeprecatedMemoryMonitor, | ||
NannyMemoryManager, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
from distributed.diagnostics.plugin import NannyPlugin | ||
|
@@ -89,6 +94,7 @@ class Nanny(ServerNode): | |
_instances: ClassVar[weakref.WeakSet[Nanny]] = weakref.WeakSet() | ||
process = None | ||
status = Status.undefined | ||
memory_manager: NannyMemoryManager | ||
|
||
def __init__( | ||
self, | ||
|
@@ -103,7 +109,6 @@ def __init__( | |
services=None, | ||
name=None, | ||
memory_limit="auto", | ||
memory_terminate_fraction: float | Literal[False] | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This init parameter is a very recent addition so I think it's safe not to have a deprecation cycle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, it doesn't look like there have been any releases since it was introduced: $ git log a86f4bb568b5aeb60f5a2a8e24f86592a407b09d~1..HEAD --oneline
4918d652 Merge monitor-interval for spill/pause and terminate
47b4ea19 Merge branch 'main' into spill_extension
7a69b5e2 Prevent data duplication on unspill (#5936)
1a0548b6 distributed.worker_memory
2fffe74d Worker State Machine refactor: redesign TaskState and scheduler messages (#5922)
85bf1beb absolufy-imports (#5924)
925c6100 Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)
60ce8436 Fix `track_features` for distributed pre-releases (#5927)
2d68dfc8 Add key to compute failed message (#5928)
f9d2f914 zict type annotations (#5905)
30f0b601 Support dumping cluster state to URL (#5863)
936fba5a Xfail test_submit_different_names (#5916)
e1e43858 Change default log format to include timestamp (#5897)
de94b408 Unblock event loop while waiting for ThreadpoolExecutor to shut down (#5883)
39c5e885 handle concurrent or failing handshakes in InProcListener (#5903)
b3f50cef add GitHub URL for PyPi (#5886)
8c98ad8c fix progress_stream teardown (#5823)
be4fc7f7 Drop unused `_round_robin` global variable (#5881)
ca235dd6 Mark xfail COMPILED tests skipif instead (#5884)
16931cc8 Improve type annotations in worker.py (#5814)
a86f4bb5 Mock process memory readings in test_worker.py (v2) (#5878) |
||
reconnect=True, | ||
validate=False, | ||
quiet=False, | ||
|
@@ -192,7 +197,8 @@ def __init__( | |
config_environ = dask.config.get("distributed.nanny.environ", {}) | ||
if not isinstance(config_environ, dict): | ||
raise TypeError( | ||
f"distributed.nanny.environ configuration must be of type dict. Instead got {type(config_environ)}" | ||
"distributed.nanny.environ configuration must be of type dict. " | ||
f"Instead got {type(config_environ)}" | ||
) | ||
self.env = config_environ.copy() | ||
for k in self.env: | ||
|
@@ -213,19 +219,12 @@ def __init__( | |
self.worker_kwargs = worker_kwargs | ||
|
||
self.contact_address = contact_address | ||
self.memory_terminate_fraction = ( | ||
memory_terminate_fraction | ||
if memory_terminate_fraction is not None | ||
else dask.config.get("distributed.worker.memory.terminate") | ||
) | ||
|
||
self.services = services | ||
self.name = name | ||
self.quiet = quiet | ||
self.auto_restart = True | ||
|
||
self.memory_limit = parse_memory_limit(memory_limit, self.nthreads) | ||
|
||
if silence_logs: | ||
silence_logging(level=silence_logs) | ||
self.silence_logs = silence_logs | ||
|
@@ -250,10 +249,7 @@ def __init__( | |
) | ||
|
||
self.scheduler = self.rpc(self.scheduler_addr) | ||
|
||
if self.memory_limit: | ||
pc = PeriodicCallback(self.memory_monitor, 100) | ||
self.periodic_callbacks["memory"] = pc | ||
self.memory_manager = NannyMemoryManager(self, memory_limit=memory_limit) | ||
|
||
if ( | ||
not host | ||
|
@@ -271,6 +267,11 @@ def __init__( | |
Nanny._instances.add(self) | ||
self.status = Status.init | ||
|
||
# Deprecated attributes; use Nanny.memory_manager.<name> instead | ||
memory_limit = DeprecatedMemoryManagerAttribute() | ||
memory_terminate_fraction = DeprecatedMemoryManagerAttribute() | ||
memory_monitor = DeprecatedMemoryMonitor() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my own comprehension, a descriptor protocol is used to warn and redirect use of the deprecated attribute onto the appropriate attribute in |
||
|
||
def __repr__(self): | ||
return "<Nanny: %s, threads: %d>" % (self.worker_address, self.nthreads) | ||
|
||
|
@@ -388,7 +389,7 @@ async def instantiate(self) -> Status: | |
services=self.services, | ||
nanny=self.address, | ||
name=self.name, | ||
memory_limit=self.memory_limit, | ||
memory_limit=self.memory_manager.memory_limit, | ||
reconnect=self.reconnect, | ||
resources=self.resources, | ||
validate=self.validate, | ||
|
@@ -502,28 +503,6 @@ def _psutil_process(self): | |
|
||
return self._psutil_process_obj | ||
|
||
def memory_monitor(self): | ||
"""Track worker's memory. Restart if it goes above terminate fraction""" | ||
if self.status != Status.running: | ||
return | ||
if self.process is None or self.process.process is None: | ||
return None | ||
process = self.process.process | ||
|
||
try: | ||
proc = self._psutil_process | ||
memory = proc.memory_info().rss | ||
except (ProcessLookupError, psutil.NoSuchProcess, psutil.AccessDenied): | ||
return | ||
frac = memory / self.memory_limit | ||
|
||
if self.memory_terminate_fraction and frac > self.memory_terminate_fraction: | ||
logger.warning( | ||
"Worker exceeded %d%% memory budget. Restarting", | ||
100 * self.memory_terminate_fraction, | ||
) | ||
process.terminate() | ||
|
||
def is_alive(self): | ||
return self.process is not None and self.process.is_alive() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,14 +2,15 @@ | |
|
||
import logging | ||
import time | ||
from collections.abc import Mapping, MutableMapping | ||
from collections.abc import Mapping, MutableMapping, Sized | ||
from contextlib import contextmanager | ||
from functools import partial | ||
from typing import Any, Literal, NamedTuple, cast | ||
from typing import Any, Literal, NamedTuple, Protocol, cast | ||
|
||
import zict | ||
from packaging.version import parse as parse_version | ||
|
||
import zict | ||
|
||
from distributed.protocol import deserialize_bytes, serialize_bytelist | ||
from distributed.sizeof import safe_sizeof | ||
|
||
|
@@ -34,6 +35,36 @@ def __sub__(self, other: SpilledSize) -> SpilledSize: # type: ignore | |
return SpilledSize(self.memory - other.memory, self.disk - other.disk) | ||
|
||
|
||
class ManualEvictProto(Protocol): | ||
"""Duck-type API that a third-party alternative to SpillBuffer must respect (in | ||
addition to MutableMapping) if it wishes to support spilling when the | ||
``distributed.worker.memory.spill`` threshold is surpassed. | ||
|
||
This is public API. At the moment of writing, Dask-CUDA implements this protocol in | ||
the ProxifyHostFile class. | ||
""" | ||
|
||
@property | ||
def fast(self) -> Sized | bool: | ||
"""Access to fast memory. This is normally a MutableMapping, but for the purpose | ||
of the manual eviction API it is just tested for emptiness to know if there is | ||
anything to evict. | ||
""" | ||
... # pragma: nocover | ||
|
||
def evict(self) -> int: | ||
"""Manually evict a key/value pair from fast to slow memory. | ||
Return size of the evicted value in fast memory. | ||
|
||
If the eviction failed for whatever reason, return -1. This method must | ||
guarantee that the key/value pair that caused the issue has been retained in | ||
fast memory and that the problem has been logged internally. | ||
|
||
This method never raises. | ||
""" | ||
... # pragma: nocover | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my own comprehension, this Protocol checks that the supplied object adheres to a duck-type interface, as opposed to that imposed by an ABC class. This is primarily used in |
||
# zict.Buffer[str, Any] requires zict >= 2.2.0 | ||
class SpillBuffer(zict.Buffer): | ||
"""MutableMapping that automatically spills out dask key/value pairs to disk when | ||
|
@@ -166,11 +197,14 @@ def __setitem__(self, key: str, value: Any) -> None: | |
assert key not in self.slow | ||
|
||
def evict(self) -> int: | ||
"""Manually evict the oldest key/value pair, even if target has not been reached. | ||
Returns sizeof(value). | ||
"""Implementation of :meth:`ManualEvictProto.evict`. | ||
|
||
Manually evict the oldest key/value pair, even if target has not been | ||
reached. Returns sizeof(value). | ||
If the eviction failed (value failed to pickle, disk full, or max_spill | ||
exceeded), return -1; the key/value pair that caused the issue will remain in | ||
fast. This method never raises. | ||
fast. The exception has been logged internally. | ||
This method never raises. | ||
""" | ||
try: | ||
with self.handle_errors(None): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit: Would it not be simpler to define
_status
upfront in__init__
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's already happening, but the subclasses are not calling
super().__init__
straight away and that creates something that's hard to disentangle.