Skip to content

Commit

Permalink
Encapsulate spill buffer and memory_monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 5, 2022
1 parent b3f50ce commit 60a5dc2
Show file tree
Hide file tree
Showing 22 changed files with 1,215 additions and 993 deletions.
6 changes: 6 additions & 0 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""Implementation of the Active Memory Manager. This is a scheduler extension which
sends drop/replicate suggestions to the worker.
See also :mod:`distributed.worker_memory` and :mod:`distributed.spill`, which implement
spill/pause/terminate mechanics on the Worker side.
"""
from __future__ import annotations

import logging
Expand Down
9 changes: 5 additions & 4 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ def set_thread_ident():

@property
def status(self):
return self._status
try:
return self._status
except AttributeError:
return Status.undefined

@status.setter
def status(self, new_status):
Expand Down Expand Up @@ -399,9 +402,7 @@ def port(self):
def identity(self) -> dict[str, str]:
return {"type": type(self).__name__, "id": self.id}

def _to_dict(
self, comm: Comm | None = None, *, exclude: Container[str] = ()
) -> dict:
def _to_dict(self, *, exclude: Container[str] = ()) -> dict:
"""Dictionary representation for debugging purposes.
Not type stable and not intended for roundtrips.
Expand Down
3 changes: 2 additions & 1 deletion distributed/deploy/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from ..nanny import Nanny
from ..scheduler import Scheduler
from ..security import Security
from ..worker import Worker, parse_memory_limit
from ..worker import Worker
from ..worker_memory import parse_memory_limit
from .spec import SpecCluster
from .utils import nprocesses_nthreads

Expand Down
6 changes: 6 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,12 @@ properties:
description: >-
Limit of number of bytes to be spilled on disk.
monitor-interval:
type: object
properties:
spill-pause: {type: string}
terminate: {type: string}

http:
type: object
description: Settings for Dask's embedded HTTP Server
Expand Down
6 changes: 6 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ distributed:
# Set to false for no maximum.
max-spill: false

monitor-interval:
# Interval between checks for the spill, pause, and terminate thresholds.
# The target threshold is checked every time new data is inserted.
spill-pause: 200ms # memory monitor on the Worker
terminate: 100ms # memory monitor on the Nanny

http:
routes:
- distributed.http.worker.prometheus
Expand Down
52 changes: 13 additions & 39 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from inspect import isawaitable
from queue import Empty
from time import sleep as sync_sleep
from typing import TYPE_CHECKING, ClassVar, Literal
from typing import TYPE_CHECKING, ClassVar

import psutil
from tornado import gen
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.ioloop import IOLoop

import dask
from dask.system import CPU_COUNT
Expand All @@ -43,7 +43,8 @@
parse_ports,
silence_logging,
)
from .worker import Worker, parse_memory_limit, run
from .worker import Worker, run
from .worker_memory import DeprecatedMMAccessor, NannyMemoryManager

if TYPE_CHECKING:
from .diagnostics.plugin import NannyPlugin
Expand Down Expand Up @@ -83,6 +84,7 @@ class Nanny(ServerNode):
_instances: ClassVar[weakref.WeakSet[Nanny]] = weakref.WeakSet()
process = None
status = Status.undefined
memory_manager: NannyMemoryManager

def __init__(
self,
Expand All @@ -97,7 +99,6 @@ def __init__(
services=None,
name=None,
memory_limit="auto",
memory_terminate_fraction: float | Literal[False] | None = None,
reconnect=True,
validate=False,
quiet=False,
Expand Down Expand Up @@ -186,7 +187,8 @@ def __init__(
config_environ = dask.config.get("distributed.nanny.environ", {})
if not isinstance(config_environ, dict):
raise TypeError(
f"distributed.nanny.environ configuration must be of type dict. Instead got {type(config_environ)}"
"distributed.nanny.environ configuration must be of type dict. "
f"Instead got {type(config_environ)}"
)
self.env = config_environ.copy()
for k in self.env:
Expand All @@ -207,19 +209,12 @@ def __init__(
self.worker_kwargs = worker_kwargs

self.contact_address = contact_address
self.memory_terminate_fraction = (
memory_terminate_fraction
if memory_terminate_fraction is not None
else dask.config.get("distributed.worker.memory.terminate")
)

self.services = services
self.name = name
self.quiet = quiet
self.auto_restart = True

self.memory_limit = parse_memory_limit(memory_limit, self.nthreads)

if silence_logs:
silence_logging(level=silence_logs)
self.silence_logs = silence_logs
Expand All @@ -244,10 +239,7 @@ def __init__(
)

self.scheduler = self.rpc(self.scheduler_addr)

if self.memory_limit:
pc = PeriodicCallback(self.memory_monitor, 100)
self.periodic_callbacks["memory"] = pc
self.memory_manager = NannyMemoryManager(self, memory_limit=memory_limit)

if (
not host
Expand All @@ -265,6 +257,10 @@ def __init__(
Nanny._instances.add(self)
self.status = Status.init

# Deprecated attribute; use Nanny.memory_manager.memory_terminate_fraction instead
memory_limit = DeprecatedMMAccessor()
memory_terminate_fraction = DeprecatedMMAccessor()

def __repr__(self):
return "<Nanny: %s, threads: %d>" % (self.worker_address, self.nthreads)

Expand Down Expand Up @@ -382,7 +378,7 @@ async def instantiate(self) -> Status:
services=self.services,
nanny=self.address,
name=self.name,
memory_limit=self.memory_limit,
memory_limit=self.memory_manager.memory_limit,
reconnect=self.reconnect,
resources=self.resources,
validate=self.validate,
Expand Down Expand Up @@ -496,28 +492,6 @@ def _psutil_process(self):

return self._psutil_process_obj

def memory_monitor(self):
"""Track worker's memory. Restart if it goes above terminate fraction"""
if self.status != Status.running:
return
if self.process is None or self.process.process is None:
return None
process = self.process.process

try:
proc = self._psutil_process
memory = proc.memory_info().rss
except (ProcessLookupError, psutil.NoSuchProcess, psutil.AccessDenied):
return
frac = memory / self.memory_limit

if self.memory_terminate_fraction and frac > self.memory_terminate_fraction:
logger.warning(
"Worker exceeded %d%% memory budget. Restarting",
100 * self.memory_terminate_fraction,
)
process.terminate()

def is_alive(self):
return self.process is not None and self.process.is_alive()

Expand Down
5 changes: 1 addition & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
from .active_memory_manager import ActiveMemoryManagerExtension, RetireWorker
from .batched import BatchedSend
from .comm import (
Comm,
get_address_host,
normalize_address,
resolve_address,
Expand Down Expand Up @@ -4053,9 +4052,7 @@ def identity(self):
}
return d

def _to_dict(
self, comm: "Comm | None" = None, *, exclude: "Container[str]" = ()
) -> dict:
def _to_dict(self, *, exclude: "Container[str]" = ()) -> dict:
"""Dictionary representation for debugging purposes.
Not type stable and not intended for roundtrips.
Expand Down
7 changes: 4 additions & 3 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from functools import partial
from typing import Any, Literal, NamedTuple

import zict
from packaging.version import parse as parse_version

import zict

from .protocol import deserialize_bytes, serialize_bytelist
from .sizeof import safe_sizeof

logger = logging.getLogger(__name__)
has_zict_210 = parse_version(zict.__version__) > parse_version("2.0.0")
has_zict_210 = parse_version(zict.__version__) >= parse_version("2.1.0")


class SpilledSize(NamedTuple):
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(
):

if max_spill is not False and not has_zict_210:
raise ValueError("zict > 2.0.0 required to set max_weight")
raise ValueError("zict >= 2.1.0 required to set max-spill")

super().__init__(
fast={},
Expand Down
2 changes: 1 addition & 1 deletion distributed/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
__all__ = ("memory_limit", "MEMORY_LIMIT")


def memory_limit():
def memory_limit() -> int:
"""Get the memory limit (in bytes) for this system.
Takes the minimum value from the following locations:
Expand Down
Loading

0 comments on commit 60a5dc2

Please sign in to comment.