From 5740070eefe9c392816c18ea75db45acfc3e127a Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 20 Apr 2021 16:45:58 +0100 Subject: [PATCH 1/5] Enable configuration of prometheus metrics namespace --- distributed/distributed-schema.yaml | 138 +++++++++-------- distributed/distributed.yaml | 143 +++++++++--------- .../http/scheduler/prometheus/__init__.py | 29 +++- .../http/scheduler/prometheus/semaphore.py | 26 +++- distributed/http/worker/prometheus.py | 41 ++++- 5 files changed, 219 insertions(+), 158 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index a809e03ad7..27ceec5f6f 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,14 +2,12 @@ properties: distributed: type: object properties: - version: type: integer scheduler: type: object properties: - allowed-failures: type: integer minimum: 0 @@ -22,8 +20,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -45,8 +43,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -60,8 +58,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -108,8 +106,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -194,16 +192,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -242,7 +240,6 @@ properties: A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules. - worker: type: object description: | @@ -338,8 +335,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -359,7 +356,6 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? - profile: type: object description: | @@ -406,8 +402,8 @@ properties: target: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -415,24 +411,24 @@ properties: spill: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - {type: number, minimum: 0, maximum: 1} - - {enum: [false]} + - { type: number, minimum: 0, maximum: 1 } + - { enum: [false] } description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -454,7 +450,6 @@ properties: description: | Configuration settings for Dask Nannies properties: - preload: type: array description: | @@ -478,8 +473,7 @@ properties: properties: heartbeat: type: string - description: - This value is the time between heartbeats + description: This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -548,13 +542,11 @@ properties: type: object description: Configuration settings for Dask communications properties: - retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: - count: type: integer minimum: 0 @@ -580,8 +572,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -624,8 +616,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -643,14 +635,14 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" descsription: Allowed ciphers, specified as an OpenSSL cipher string. ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -659,13 +651,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -678,13 +670,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -697,13 +689,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -728,6 +720,13 @@ properties: export-tool: type: boolean + prometheus: + type: object + properties: + namespace: + type: string + description: Namespace prefix to use for all prometheus metrics. + admin: type: object description: | @@ -751,7 +750,7 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit : + limit: type: string description: The time allowed before triggering a warning @@ -807,10 +806,9 @@ properties: properties: pool-size: type: - - integer - - "null" - description: - The size of the memory pool in bytes + - integer + - "null" + description: The size of the memory pool in bytes ucx: type: object description: | @@ -818,28 +816,28 @@ properties: properties: tcp: type: - - boolean - - "null" + - boolean + - "null" nvlink: type: - - boolean - - "null" + - boolean + - "null" infiniband: type: - - boolean - - "null" + - boolean + - "null" rdmacm: type: - - boolean - - "null" + - boolean + - "null" cuda_copy: type: - - boolean - - "null" + - boolean + - "null" net-devices: type: - - string - - "null" + - string + - "null" description: Define which Infiniband device to use reuse-endpoints: type: boolean diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 08aed7f9da..5b5828a228 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -9,27 +9,27 @@ distributed: # tornado.application: error scheduler: - allowed-failures: 3 # number of retries before a task is considered bad - bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth + allowed-failures: 3 # number of retries before a task is considered bad + bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] default-data-size: 1kiB # Number of seconds to wait until workers or clients are removed from the events log # after they have been removed from the scheduler events-cleanup-delay: 1h - idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" + idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" transition-log-length: 100000 events-log-length: 100000 - work-stealing: True # workers should steal tasks from each other - work-stealing-interval: 100ms # Callback time for work stealing - worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this - pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings - preload: [] # Run custom modules with Scheduler - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html - unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") - default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks) + work-stealing: True # workers should steal tasks from each other + work-stealing-interval: 100ms # Callback time for work stealing + worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this + pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings + preload: [] # Run custom modules with Scheduler + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") + default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks) rechunk-split: 1us shuffle-split: 1us - validate: False # Check scheduler state at every step for debugging + validate: False # Check scheduler state at every step for debugging dashboard: status: task-stream-length: 1000 @@ -39,13 +39,13 @@ distributed: ca-file: null key: null cert: null - bokeh-application: # keywords to pass to BokehTornado application + bokeh-application: # keywords to pass to BokehTornado application allow_websocket_origin: ["*"] keep_alive_milliseconds: 500 check_unused_sessions_milliseconds: 500 locks: - lease-validation-interval: 10s # The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself. - lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. + lease-validation-interval: 10s # The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself. + lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. http: routes: @@ -64,24 +64,25 @@ distributed: blocked-handlers: [] multiprocessing-method: spawn use-file-locking: True - connections: # Maximum concurrent connections for data - outgoing: 50 # This helps to control network saturation + connections: # Maximum concurrent connections for data + outgoing: 50 # This helps to control network saturation incoming: 10 - preload: [] # Run custom modules with Worker - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + preload: [] # Run custom modules with Worker + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html daemon: True - validate: False # Check worker state at every step for debugging - resources: {} # Key: value pairs specifying worker resources. + validate: False # Check worker state at every step for debugging + resources: {} # Key: value pairs specifying worker resources. lifetime: - duration: null # Time after which to gracefully shutdown the worker - stagger: 0 seconds # Random amount by which to stagger lifetimes - restart: False # Do we ressurrect the worker after the lifetime deadline? + duration: null # Time after which to gracefully shutdown the worker + stagger: 0 seconds # Random amount by which to stagger lifetimes + restart: False # Do we ressurrect the worker after the lifetime deadline? profile: - interval: 10ms # Time between statistical profiling queries - cycle: 1000ms # Time between starting new profile - low-level: False # Whether or not to include low-level functions - # Requires https://github.com/numba/stacktrace + interval: 10ms # Time between statistical profiling queries + cycle: 1000ms # Time between starting new profile + low-level: + False # Whether or not to include low-level functions + # Requires https://github.com/numba/stacktrace memory: # When there is an increase in process memory (as observed by the operating @@ -92,10 +93,10 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker + target: 0.60 # target fraction to stay below + spill: 0.70 # fraction at which we spill to disk + pause: 0.80 # fraction at which we pause worker threads + terminate: 0.95 # fraction at which we terminate the worker http: routes: @@ -104,54 +105,55 @@ distributed: - distributed.http.statics nanny: - preload: [] # Run custom modules with Nanny - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + preload: [] # Run custom modules with Nanny + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html client: - heartbeat: 5s # Interval between client heartbeats - scheduler-info-interval: 2s # Interval between scheduler-info updates + heartbeat: 5s # Interval between client heartbeats + scheduler-info-interval: 2s # Interval between scheduler-info updates deploy: - lost-worker-timeout: 15s # Interval after which to hard-close a lost worker job - cluster-repr-interval: 500ms # Interval between calls to update cluster-repr for the widget + lost-worker-timeout: 15s # Interval after which to hard-close a lost worker job + cluster-repr-interval: 500ms # Interval between calls to update cluster-repr for the widget adaptive: - interval: 1s # Interval between scaling evaluations - target-duration: 5s # Time an entire graph calculation is desired to take ("1m", "30m") - minimum: 0 # Minimum number of workers - maximum: .inf # Maximum number of workers - wait-count: 3 # Number of times a worker should be suggested for removal before removing it + interval: 1s # Interval between scaling evaluations + target-duration: 5s # Time an entire graph calculation is desired to take ("1m", "30m") + minimum: 0 # Minimum number of workers + maximum: .inf # Maximum number of workers + wait-count: 3 # Number of times a worker should be suggested for removal before removing it comm: - retry: # some operations (such as gathering data) are subject to re-tries with the below parameters - count: 0 # the maximum retry attempts. 0 disables re-trying. + retry: # some operations (such as gathering data) are subject to re-tries with the below parameters + count: 0 # the maximum retry attempts. 0 disables re-trying. delay: - min: 1s # the first non-zero delay between re-tries - max: 20s # the maximum delay between re-tries + min: 1s # the first non-zero delay between re-tries + max: 20s # the maximum delay between re-tries compression: auto offload: 10MiB # Size after which we choose to offload serialization to another thread default-scheme: tcp socket-backlog: 2048 - recent-messages-log-length: 0 # number of messages to keep for debugging + recent-messages-log-length: 0 # number of messages to keep for debugging zstd: - level: 3 # Compression level, between 1 and 22. - threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. + level: 3 # Compression level, between 1 and 22. + threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. timeouts: - connect: 10s # time before connecting fails - tcp: 30s # time before calling an unresponsive connection dead + connect: 10s # time before connecting fails + tcp: 30s # time before calling an unresponsive connection dead require-encryption: null # Whether to require encryption on non-local comms tls: - ciphers: null # Allowed ciphers, specified as an OpenSSL cipher string. - ca-file: null # Path to a CA file, in pem format, optional + ciphers: null # Allowed ciphers, specified as an OpenSSL cipher string. + ca-file: null # Path to a CA file, in pem format, optional scheduler: - cert: null # Path to certificate file for scheduler. - key: null # Path to key file for scheduler. Alternatively, the key - # can be appended to the cert file above, and this field - # left blank. + cert: null # Path to certificate file for scheduler. + key: + null # Path to key file for scheduler. Alternatively, the key + # can be appended to the cert file above, and this field + # left blank. worker: key: null cert: null @@ -159,7 +161,6 @@ distributed: key: null cert: null - ################### # Bokeh dashboard # ################### @@ -167,7 +168,9 @@ distributed: dashboard: link: "{scheme}://{host}:{port}/status" export-tool: False - graph-max-items: 5000 # maximum number of tasks to try to plot in graph view + graph-max-items: 5000 # maximum number of tasks to try to plot in graph view + prometheus: + namespace: "dask" ################## # Administrative # @@ -175,23 +178,23 @@ distributed: admin: tick: - interval: 20ms # time between event loop health checks - limit: 3s # time allowed before triggering a warning + interval: 20ms # time between event loop health checks + limit: 3s # time allowed before triggering a warning max-error-length: 10000 # Maximum size traceback after error to return - log-length: 10000 # default length of logs to keep in memory - log-format: '%(name)s - %(levelname)s - %(message)s' - pdb-on-err: False # enter debug mode on scheduling error + log-length: 10000 # default length of logs to keep in memory + log-format: "%(name)s - %(levelname)s - %(message)s" + pdb-on-err: False # enter debug mode on scheduling error system-monitor: interval: 500ms event-loop: tornado rmm: pool-size: null ucx: - tcp: null # enable tcp - nvlink: null # enable cuda_ipc + tcp: null # enable tcp + nvlink: null # enable cuda_ipc infiniband: null # enable Infiniband rdmacm: null # enable RDMACM - cuda_copy: null # enable cuda-copy - net-devices: null # define which Infiniband device to use - reuse-endpoints: True # enable endpoint reuse + cuda_copy: null # enable cuda-copy + net-devices: null # define which Infiniband device to use + reuse-endpoints: True # enable endpoint reuse diff --git a/distributed/http/scheduler/prometheus/__init__.py b/distributed/http/scheduler/prometheus/__init__.py index 120a01dab5..956b3f34c4 100644 --- a/distributed/http/scheduler/prometheus/__init__.py +++ b/distributed/http/scheduler/prometheus/__init__.py @@ -1,5 +1,7 @@ import toolz +import dask.config + from distributed.http.utils import RequestHandler from distributed.scheduler import ALL_TASK_STATES @@ -9,24 +11,33 @@ class _PrometheusCollector: def __init__(self, dask_server): self.server = dask_server + self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") + self.subsystem = "scheduler" def collect(self): from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily + from prometheus_client.metrics import _build_full_name yield GaugeMetricFamily( - "dask_scheduler_clients", + _build_full_name( + "clients", namespace=self.namespace, subsystem=self.subsystem + ), "Number of clients connected.", value=len([k for k in self.server.clients if k != "fire-and-forget"]), ) yield GaugeMetricFamily( - "dask_scheduler_desired_workers", + _build_full_name( + "desired_workers", namespace=self.namespace, subsystem=self.subsystem + ), "Number of workers scheduler needs for task graph.", value=self.server.adaptive_target(), ) worker_states = GaugeMetricFamily( - "dask_scheduler_workers", + _build_full_name( + "workers", namespace=self.namespace, subsystem=self.subsystem + ), "Number of workers known by scheduler.", labels=["state"], ) @@ -36,7 +47,9 @@ def collect(self): yield worker_states tasks = GaugeMetricFamily( - "dask_scheduler_tasks", + _build_full_name( + "tasks", namespace=self.namespace, subsystem=self.subsystem + ), "Number of tasks known by scheduler.", labels=["state"], ) @@ -46,7 +59,9 @@ def collect(self): ) suspicious_tasks = CounterMetricFamily( - "dask_scheduler_tasks_suspicious", + _build_full_name( + "tasks_suspicious", namespace=self.namespace, subsystem=self.subsystem + ), "Total number of times a task has been marked suspicious", labels=["task_prefix_name"], ) @@ -56,7 +71,9 @@ def collect(self): yield suspicious_tasks yield CounterMetricFamily( - "dask_scheduler_tasks_forgotten", + _build_full_name( + "tasks_forgotten", namespace=self.namespace, subsystem=self.subsystem + ), ( "Total number of processed tasks no longer in memory and already " "removed from the scheduler job queue. Note task groups on the " diff --git a/distributed/http/scheduler/prometheus/semaphore.py b/distributed/http/scheduler/prometheus/semaphore.py index aac467b66c..c59e7a3c4b 100644 --- a/distributed/http/scheduler/prometheus/semaphore.py +++ b/distributed/http/scheduler/prometheus/semaphore.py @@ -1,36 +1,52 @@ +import dask.config + + class SemaphoreMetricExtension: def __init__(self, dask_server): self.server = dask_server + self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") + self.subsystem = "scheduler_semaphore" def collect(self): from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily + from prometheus_client.metrics import _build_full_name sem_ext = self.server.extensions["semaphores"] semaphore_max_leases_family = GaugeMetricFamily( - "semaphore_max_leases", + _build_full_name( + "max_leases", namespace=self.namespace, subsystem=self.subsystem + ), "Maximum leases allowed per semaphore, this will be constant for each semaphore during its lifetime.", labels=["name"], ) semaphore_active_leases_family = GaugeMetricFamily( - "semaphore_active_leases", + _build_full_name( + "active_leases", namespace=self.namespace, subsystem=self.subsystem + ), "Amount of currently active leases per semaphore.", labels=["name"], ) semaphore_pending_leases = GaugeMetricFamily( - "semaphore_pending_leases", + _build_full_name( + "pending_leases", namespace=self.namespace, subsystem=self.subsystem + ), "Amount of currently pending leases per semaphore.", labels=["name"], ) semaphore_acquire_total = CounterMetricFamily( - "semaphore_acquire_total", + _build_full_name( + "acquire_total", namespace=self.namespace, subsystem=self.subsystem + ), "Total number of leases acquired per semaphore.", labels=["name"], ) semaphore_release_total = CounterMetricFamily( - "semaphore_release_total", + _build_full_name( + "release_total", namespace=self.namespace, subsystem=self.subsystem + ), "Total number of leases released per semaphore.\n" "Note: if a semaphore is closed while there are still leases active, this count will not equal " "`semaphore_acquired_total` after execution.", diff --git a/distributed/http/worker/prometheus.py b/distributed/http/worker/prometheus.py index 4d0c0a55e6..71c2a5433b 100644 --- a/distributed/http/worker/prometheus.py +++ b/distributed/http/worker/prometheus.py @@ -1,5 +1,7 @@ import logging +import dask.config + from ..utils import RequestHandler @@ -7,6 +9,8 @@ class _PrometheusCollector: def __init__(self, server): self.worker = server self.logger = logging.getLogger("distributed.dask_worker") + self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") + self.subsystem = "worker" self.crick_available = True try: import crick # noqa: F401 @@ -18,9 +22,14 @@ def __init__(self, server): def collect(self): from prometheus_client.core import GaugeMetricFamily + from prometheus_client.metrics import _build_full_name tasks = GaugeMetricFamily( - "dask_worker_tasks", "Number of tasks at worker.", labels=["state"] + _build_full_name( + "tasks", namespace=self.namespace, subsystem=self.subsystem + ), + "Number of tasks at worker.", + labels=["state"], ) tasks.add_metric(["stored"], len(self.worker.data)) tasks.add_metric(["executing"], self.worker.executing_count) @@ -30,19 +39,25 @@ def collect(self): yield tasks yield GaugeMetricFamily( - "dask_worker_connections", + _build_full_name( + "connections", namespace=self.namespace, subsystem=self.subsystem + ), "Number of task connections to other workers.", value=len(self.worker.in_flight_workers), ) yield GaugeMetricFamily( - "dask_worker_threads", + _build_full_name( + "threads", namespace=self.namespace, subsystem=self.subsystem + ), "Number of worker threads.", value=self.worker.nthreads, ) yield GaugeMetricFamily( - "dask_worker_latency_seconds", + _build_full_name( + "latency_seconds", namespace=self.namespace, subsystem=self.subsystem + ), "Latency of worker connection.", value=self.worker.latency, ) @@ -51,19 +66,31 @@ def collect(self): # the following metrics will export NaN, if the corresponding digests are None if self.crick_available: yield GaugeMetricFamily( - "dask_worker_tick_duration_median_seconds", + _build_full_name( + "tick_duration_median_seconds", + namespace=self.namespace, + subsystem=self.subsystem, + ), "Median tick duration at worker.", value=self.worker.digests["tick-duration"].components[1].quantile(50), ) yield GaugeMetricFamily( - "dask_worker_task_duration_median_seconds", + _build_full_name( + "task_duration_median_seconds", + namespace=self.namespace, + subsystem=self.subsystem, + ), "Median task runtime at worker.", value=self.worker.digests["task-duration"].components[1].quantile(50), ) yield GaugeMetricFamily( - "dask_worker_transfer_bandwidth_median_bytes", + _build_full_name( + "transfer_bandwidth_median_bytes", + namespace=self.namespace, + subsystem=self.subsystem, + ), "Bandwidth for transfer at worker in Bytes.", value=self.worker.digests["transfer-bandwidth"] .components[1] From 1d714682ef203a28b519d97cfb4a699e8c95811c Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Tue, 20 Apr 2021 16:48:18 +0100 Subject: [PATCH 2/5] Undo editor formatting --- distributed/distributed-schema.yaml | 131 ++++++++++++++------------ distributed/distributed.yaml | 141 ++++++++++++++-------------- 2 files changed, 140 insertions(+), 132 deletions(-) diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 27ceec5f6f..bba527bdf4 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -2,12 +2,14 @@ properties: distributed: type: object properties: + version: type: integer scheduler: type: object properties: + allowed-failures: type: integer minimum: 0 @@ -20,8 +22,8 @@ properties: bandwidth: type: - - integer - - string + - integer + - string description: | The expected bandwidth between any pair of workers @@ -43,8 +45,8 @@ properties: default-data-size: type: - - string - - integer + - string + - integer description: | The default size of a piece of data if we don't know anything about it. @@ -58,8 +60,8 @@ properties: idle-timeout: type: - - string - - "null" + - string + - "null" description: | Shut down the scheduler after this duration if no activity has occured @@ -106,8 +108,8 @@ properties: worker-ttl: type: - - string - - "null" + - string + - "null" description: | Time to live for workers. @@ -192,16 +194,16 @@ properties: properties: ca-file: type: - - string - - "null" + - string + - "null" key: type: - - string - - "null" + - string + - "null" cert: type: - - string - - "null" + - string + - "null" bokeh-application: type: object description: | @@ -240,6 +242,7 @@ properties: A list of trusted root modules the schedular is allowed to import (incl. submodules). For security reasons, the scheduler does not import arbitrary Python modules. + worker: type: object description: | @@ -335,8 +338,8 @@ properties: properties: duration: type: - - string - - "null" + - string + - "null" description: | The time after creation to close the worker, like "1 hour" stagger: @@ -356,6 +359,7 @@ properties: description: | Do we try to resurrect the worker after the lifetime deadline? + profile: type: object description: | @@ -402,8 +406,8 @@ properties: target: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we start spilling the dask keys holding the largest @@ -411,24 +415,24 @@ properties: spill: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we spill all data to disk. pause: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory (as observed by the operating system) gets above this amount we no longer start new tasks on this worker. terminate: oneOf: - - { type: number, minimum: 0, maximum: 1 } - - { enum: [false] } + - {type: number, minimum: 0, maximum: 1} + - {enum: [false]} description: >- When the process memory reaches this level the nanny process will kill the worker (if a nanny is present) @@ -450,6 +454,7 @@ properties: description: | Configuration settings for Dask Nannies properties: + preload: type: array description: | @@ -473,7 +478,8 @@ properties: properties: heartbeat: type: string - description: This value is the time between heartbeats + description: + This value is the time between heartbeats The client sends a periodic heartbeat message to the scheduler. If it misses enough of these then the scheduler assumes that it has gone. @@ -542,11 +548,13 @@ properties: type: object description: Configuration settings for Dask communications properties: + retry: type: object description: | Some operations (such as gathering data) are subject to re-tries with the below parameters properties: + count: type: integer minimum: 0 @@ -572,8 +580,8 @@ properties: offload: type: - - boolean - - string + - boolean + - string description: | The size of message after which we choose to offload serialization to another thread @@ -616,8 +624,8 @@ properties: require-encryption: type: - - boolean - - "null" + - boolean + - "null" description: | Whether to require encryption on non-local comms @@ -635,14 +643,14 @@ properties: properties: ciphers: type: - - string - - "null" + - string + - "null" descsription: Allowed ciphers, specified as an OpenSSL cipher string. ca-file: type: - - string - - "null" + - string + - "null" description: Path to a CA file, in pem format scheduler: @@ -651,13 +659,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -670,13 +678,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -689,13 +697,13 @@ properties: properties: cert: type: - - string - - "null" + - string + - "null" description: Path to certificate file key: type: - - string - - "null" + - string + - "null" description: | Path to key file. @@ -750,7 +758,7 @@ properties: interval: type: string description: The time between ticks, default 20ms - limit: + limit : type: string description: The time allowed before triggering a warning @@ -806,9 +814,10 @@ properties: properties: pool-size: type: - - integer - - "null" - description: The size of the memory pool in bytes + - integer + - "null" + description: + The size of the memory pool in bytes ucx: type: object description: | @@ -816,28 +825,28 @@ properties: properties: tcp: type: - - boolean - - "null" + - boolean + - "null" nvlink: type: - - boolean - - "null" + - boolean + - "null" infiniband: type: - - boolean - - "null" + - boolean + - "null" rdmacm: type: - - boolean - - "null" + - boolean + - "null" cuda_copy: type: - - boolean - - "null" + - boolean + - "null" net-devices: type: - - string - - "null" + - string + - "null" description: Define which Infiniband device to use reuse-endpoints: type: boolean diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 5b5828a228..0d776c5520 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -9,27 +9,27 @@ distributed: # tornado.application: error scheduler: - allowed-failures: 3 # number of retries before a task is considered bad - bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth + allowed-failures: 3 # number of retries before a task is considered bad + bandwidth: 100000000 # 100 MB/s estimated worker-worker bandwidth blocked-handlers: [] default-data-size: 1kiB # Number of seconds to wait until workers or clients are removed from the events log # after they have been removed from the scheduler events-cleanup-delay: 1h - idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" + idle-timeout: null # Shut down after this duration, like "1h" or "30 minutes" transition-log-length: 100000 events-log-length: 100000 - work-stealing: True # workers should steal tasks from each other - work-stealing-interval: 100ms # Callback time for work stealing - worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this - pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings - preload: [] # Run custom modules with Scheduler - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html - unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") - default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks) + work-stealing: True # workers should steal tasks from each other + work-stealing-interval: 100ms # Callback time for work stealing + worker-ttl: null # like '60s'. Time to live for workers. They must heartbeat faster than this + pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings + preload: [] # Run custom modules with Scheduler + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") + default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks) rechunk-split: 1us shuffle-split: 1us - validate: False # Check scheduler state at every step for debugging + validate: False # Check scheduler state at every step for debugging dashboard: status: task-stream-length: 1000 @@ -39,13 +39,13 @@ distributed: ca-file: null key: null cert: null - bokeh-application: # keywords to pass to BokehTornado application + bokeh-application: # keywords to pass to BokehTornado application allow_websocket_origin: ["*"] keep_alive_milliseconds: 500 check_unused_sessions_milliseconds: 500 locks: - lease-validation-interval: 10s # The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself. - lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. + lease-validation-interval: 10s # The interval in which the scheduler validates staleness of all acquired leases. Must always be smaller than the lease-timeout itself. + lease-timeout: 30s # Maximum interval to wait for a Client refresh before a lease is invalidated and released. http: routes: @@ -64,25 +64,24 @@ distributed: blocked-handlers: [] multiprocessing-method: spawn use-file-locking: True - connections: # Maximum concurrent connections for data - outgoing: 50 # This helps to control network saturation + connections: # Maximum concurrent connections for data + outgoing: 50 # This helps to control network saturation incoming: 10 - preload: [] # Run custom modules with Worker - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + preload: [] # Run custom modules with Worker + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html daemon: True - validate: False # Check worker state at every step for debugging - resources: {} # Key: value pairs specifying worker resources. + validate: False # Check worker state at every step for debugging + resources: {} # Key: value pairs specifying worker resources. lifetime: - duration: null # Time after which to gracefully shutdown the worker - stagger: 0 seconds # Random amount by which to stagger lifetimes - restart: False # Do we ressurrect the worker after the lifetime deadline? + duration: null # Time after which to gracefully shutdown the worker + stagger: 0 seconds # Random amount by which to stagger lifetimes + restart: False # Do we ressurrect the worker after the lifetime deadline? profile: - interval: 10ms # Time between statistical profiling queries - cycle: 1000ms # Time between starting new profile - low-level: - False # Whether or not to include low-level functions - # Requires https://github.com/numba/stacktrace + interval: 10ms # Time between statistical profiling queries + cycle: 1000ms # Time between starting new profile + low-level: False # Whether or not to include low-level functions + # Requires https://github.com/numba/stacktrace memory: # When there is an increase in process memory (as observed by the operating @@ -93,10 +92,10 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. - target: 0.60 # target fraction to stay below - spill: 0.70 # fraction at which we spill to disk - pause: 0.80 # fraction at which we pause worker threads - terminate: 0.95 # fraction at which we terminate the worker + target: 0.60 # target fraction to stay below + spill: 0.70 # fraction at which we spill to disk + pause: 0.80 # fraction at which we pause worker threads + terminate: 0.95 # fraction at which we terminate the worker http: routes: @@ -105,55 +104,54 @@ distributed: - distributed.http.statics nanny: - preload: [] # Run custom modules with Nanny - preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html + preload: [] # Run custom modules with Nanny + preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html client: - heartbeat: 5s # Interval between client heartbeats - scheduler-info-interval: 2s # Interval between scheduler-info updates + heartbeat: 5s # Interval between client heartbeats + scheduler-info-interval: 2s # Interval between scheduler-info updates deploy: - lost-worker-timeout: 15s # Interval after which to hard-close a lost worker job - cluster-repr-interval: 500ms # Interval between calls to update cluster-repr for the widget + lost-worker-timeout: 15s # Interval after which to hard-close a lost worker job + cluster-repr-interval: 500ms # Interval between calls to update cluster-repr for the widget adaptive: - interval: 1s # Interval between scaling evaluations - target-duration: 5s # Time an entire graph calculation is desired to take ("1m", "30m") - minimum: 0 # Minimum number of workers - maximum: .inf # Maximum number of workers - wait-count: 3 # Number of times a worker should be suggested for removal before removing it + interval: 1s # Interval between scaling evaluations + target-duration: 5s # Time an entire graph calculation is desired to take ("1m", "30m") + minimum: 0 # Minimum number of workers + maximum: .inf # Maximum number of workers + wait-count: 3 # Number of times a worker should be suggested for removal before removing it comm: - retry: # some operations (such as gathering data) are subject to re-tries with the below parameters - count: 0 # the maximum retry attempts. 0 disables re-trying. + retry: # some operations (such as gathering data) are subject to re-tries with the below parameters + count: 0 # the maximum retry attempts. 0 disables re-trying. delay: - min: 1s # the first non-zero delay between re-tries - max: 20s # the maximum delay between re-tries + min: 1s # the first non-zero delay between re-tries + max: 20s # the maximum delay between re-tries compression: auto offload: 10MiB # Size after which we choose to offload serialization to another thread default-scheme: tcp socket-backlog: 2048 - recent-messages-log-length: 0 # number of messages to keep for debugging + recent-messages-log-length: 0 # number of messages to keep for debugging zstd: - level: 3 # Compression level, between 1 and 22. - threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. + level: 3 # Compression level, between 1 and 22. + threads: 0 # Threads to use. 0 for single-threaded, -1 to infer from cpu count. timeouts: - connect: 10s # time before connecting fails - tcp: 30s # time before calling an unresponsive connection dead + connect: 10s # time before connecting fails + tcp: 30s # time before calling an unresponsive connection dead require-encryption: null # Whether to require encryption on non-local comms tls: - ciphers: null # Allowed ciphers, specified as an OpenSSL cipher string. - ca-file: null # Path to a CA file, in pem format, optional + ciphers: null # Allowed ciphers, specified as an OpenSSL cipher string. + ca-file: null # Path to a CA file, in pem format, optional scheduler: - cert: null # Path to certificate file for scheduler. - key: - null # Path to key file for scheduler. Alternatively, the key - # can be appended to the cert file above, and this field - # left blank. + cert: null # Path to certificate file for scheduler. + key: null # Path to key file for scheduler. Alternatively, the key + # can be appended to the cert file above, and this field + # left blank. worker: key: null cert: null @@ -161,6 +159,7 @@ distributed: key: null cert: null + ################### # Bokeh dashboard # ################### @@ -168,7 +167,7 @@ distributed: dashboard: link: "{scheme}://{host}:{port}/status" export-tool: False - graph-max-items: 5000 # maximum number of tasks to try to plot in graph view + graph-max-items: 5000 # maximum number of tasks to try to plot in graph view prometheus: namespace: "dask" @@ -178,23 +177,23 @@ distributed: admin: tick: - interval: 20ms # time between event loop health checks - limit: 3s # time allowed before triggering a warning + interval: 20ms # time between event loop health checks + limit: 3s # time allowed before triggering a warning max-error-length: 10000 # Maximum size traceback after error to return - log-length: 10000 # default length of logs to keep in memory - log-format: "%(name)s - %(levelname)s - %(message)s" - pdb-on-err: False # enter debug mode on scheduling error + log-length: 10000 # default length of logs to keep in memory + log-format: '%(name)s - %(levelname)s - %(message)s' + pdb-on-err: False # enter debug mode on scheduling error system-monitor: interval: 500ms event-loop: tornado rmm: pool-size: null ucx: - tcp: null # enable tcp - nvlink: null # enable cuda_ipc + tcp: null # enable tcp + nvlink: null # enable cuda_ipc infiniband: null # enable Infiniband rdmacm: null # enable RDMACM - cuda_copy: null # enable cuda-copy - net-devices: null # define which Infiniband device to use - reuse-endpoints: True # enable endpoint reuse + cuda_copy: null # enable cuda-copy + net-devices: null # define which Infiniband device to use + reuse-endpoints: True # enable endpoint reuse From 020a2750a4ccd090dbdbd5e438a419f30e38322b Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 21 Apr 2021 13:20:55 +0100 Subject: [PATCH 3/5] Refactor into base class and remove private function import --- distributed/http/prometheus.py | 17 +++ .../http/scheduler/prometheus/__init__.py | 121 +---------------- distributed/http/scheduler/prometheus/core.py | 104 +++++++++++++++ .../http/scheduler/prometheus/semaphore.py | 32 ++--- .../scheduler/tests/test_semaphore_http.py | 58 ++++++--- distributed/http/worker/prometheus.py | 123 ------------------ .../http/worker/prometheus/__init__.py | 3 + distributed/http/worker/prometheus/core.py | 97 ++++++++++++++ 8 files changed, 270 insertions(+), 285 deletions(-) create mode 100644 distributed/http/prometheus.py create mode 100644 distributed/http/scheduler/prometheus/core.py delete mode 100644 distributed/http/worker/prometheus.py create mode 100644 distributed/http/worker/prometheus/__init__.py create mode 100644 distributed/http/worker/prometheus/core.py diff --git a/distributed/http/prometheus.py b/distributed/http/prometheus.py new file mode 100644 index 0000000000..254a8f9591 --- /dev/null +++ b/distributed/http/prometheus.py @@ -0,0 +1,17 @@ +import dask.config + + +class PrometheusCollector: + def __init__(self, server): + self.server = server + self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") + self.subsystem = None + + def build_name(self, name): + full_name = [] + if self.namespace: + full_name.append(self.namespace) + if self.subsystem: + full_name.append(self.subsystem) + full_name.append(name) + return "_".join(full_name) diff --git a/distributed/http/scheduler/prometheus/__init__.py b/distributed/http/scheduler/prometheus/__init__.py index 956b3f34c4..63c0310d0a 100644 --- a/distributed/http/scheduler/prometheus/__init__.py +++ b/distributed/http/scheduler/prometheus/__init__.py @@ -1,122 +1,3 @@ -import toolz - -import dask.config - -from distributed.http.utils import RequestHandler -from distributed.scheduler import ALL_TASK_STATES - -from .semaphore import SemaphoreMetricExtension - - -class _PrometheusCollector: - def __init__(self, dask_server): - self.server = dask_server - self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") - self.subsystem = "scheduler" - - def collect(self): - from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily - from prometheus_client.metrics import _build_full_name - - yield GaugeMetricFamily( - _build_full_name( - "clients", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of clients connected.", - value=len([k for k in self.server.clients if k != "fire-and-forget"]), - ) - - yield GaugeMetricFamily( - _build_full_name( - "desired_workers", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of workers scheduler needs for task graph.", - value=self.server.adaptive_target(), - ) - - worker_states = GaugeMetricFamily( - _build_full_name( - "workers", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of workers known by scheduler.", - labels=["state"], - ) - worker_states.add_metric(["connected"], len(self.server.workers)) - worker_states.add_metric(["saturated"], len(self.server.saturated)) - worker_states.add_metric(["idle"], len(self.server.idle)) - yield worker_states - - tasks = GaugeMetricFamily( - _build_full_name( - "tasks", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of tasks known by scheduler.", - labels=["state"], - ) - - task_counter = toolz.merge_with( - sum, (tp.states for tp in self.server.task_prefixes.values()) - ) - - suspicious_tasks = CounterMetricFamily( - _build_full_name( - "tasks_suspicious", namespace=self.namespace, subsystem=self.subsystem - ), - "Total number of times a task has been marked suspicious", - labels=["task_prefix_name"], - ) - - for tp in self.server.task_prefixes.values(): - suspicious_tasks.add_metric([tp.name], tp.suspicious) - yield suspicious_tasks - - yield CounterMetricFamily( - _build_full_name( - "tasks_forgotten", namespace=self.namespace, subsystem=self.subsystem - ), - ( - "Total number of processed tasks no longer in memory and already " - "removed from the scheduler job queue. Note task groups on the " - "scheduler which have all tasks in the forgotten state are not included." - ), - value=task_counter.get("forgotten", 0.0), - ) - - for state in ALL_TASK_STATES: - tasks.add_metric([state], task_counter.get(state, 0.0)) - yield tasks - - -COLLECTORS = [_PrometheusCollector, SemaphoreMetricExtension] - - -class PrometheusHandler(RequestHandler): - _collectors = None - - def __init__(self, *args, dask_server=None, **kwargs): - import prometheus_client - - super().__init__(*args, dask_server=dask_server, **kwargs) - - if PrometheusHandler._collectors: - # Especially during testing, multiple schedulers are started - # sequentially in the same python process - for _collector in PrometheusHandler._collectors: - _collector.server = self.server - return - - PrometheusHandler._collectors = tuple( - collector(self.server) for collector in COLLECTORS - ) - # Register collectors - for instantiated_collector in PrometheusHandler._collectors: - prometheus_client.REGISTRY.register(instantiated_collector) - - def get(self): - import prometheus_client - - self.write(prometheus_client.generate_latest()) - self.set_header("Content-Type", "text/plain; version=0.0.4") - +from .core import PrometheusHandler routes = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/http/scheduler/prometheus/core.py b/distributed/http/scheduler/prometheus/core.py new file mode 100644 index 0000000000..2a27494078 --- /dev/null +++ b/distributed/http/scheduler/prometheus/core.py @@ -0,0 +1,104 @@ +import toolz + +from distributed.http.prometheus import PrometheusCollector +from distributed.http.utils import RequestHandler +from distributed.scheduler import ALL_TASK_STATES + +from .semaphore import SemaphoreMetricCollector + + +class SchedulerMetricCollector(PrometheusCollector): + def __init__(self, server): + super().__init__(server) + self.subsystem = "scheduler" + + def collect(self): + from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily + + yield GaugeMetricFamily( + self.build_name("clients"), + "Number of clients connected.", + value=len([k for k in self.server.clients if k != "fire-and-forget"]), + ) + + yield GaugeMetricFamily( + self.build_name("desired_workers"), + "Number of workers scheduler needs for task graph.", + value=self.server.adaptive_target(), + ) + + worker_states = GaugeMetricFamily( + self.build_name("workers"), + "Number of workers known by scheduler.", + labels=["state"], + ) + worker_states.add_metric(["connected"], len(self.server.workers)) + worker_states.add_metric(["saturated"], len(self.server.saturated)) + worker_states.add_metric(["idle"], len(self.server.idle)) + yield worker_states + + tasks = GaugeMetricFamily( + self.build_name("tasks"), + "Number of tasks known by scheduler.", + labels=["state"], + ) + + task_counter = toolz.merge_with( + sum, (tp.states for tp in self.server.task_prefixes.values()) + ) + + suspicious_tasks = CounterMetricFamily( + self.build_name("tasks_suspicious"), + "Total number of times a task has been marked suspicious", + labels=["task_prefix_name"], + ) + + for tp in self.server.task_prefixes.values(): + suspicious_tasks.add_metric([tp.name], tp.suspicious) + yield suspicious_tasks + + yield CounterMetricFamily( + self.build_name("tasks_forgotten"), + ( + "Total number of processed tasks no longer in memory and already " + "removed from the scheduler job queue. Note task groups on the " + "scheduler which have all tasks in the forgotten state are not included." + ), + value=task_counter.get("forgotten", 0.0), + ) + + for state in ALL_TASK_STATES: + tasks.add_metric([state], task_counter.get(state, 0.0)) + yield tasks + + +COLLECTORS = [SchedulerMetricCollector, SemaphoreMetricCollector] + + +class PrometheusHandler(RequestHandler): + _collectors = None + + def __init__(self, *args, dask_server=None, **kwargs): + import prometheus_client + + super().__init__(*args, dask_server=dask_server, **kwargs) + + if PrometheusHandler._collectors: + # Especially during testing, multiple schedulers are started + # sequentially in the same python process + for _collector in PrometheusHandler._collectors: + _collector.server = self.server + return + + PrometheusHandler._collectors = tuple( + collector(self.server) for collector in COLLECTORS + ) + # Register collectors + for instantiated_collector in PrometheusHandler._collectors: + prometheus_client.REGISTRY.register(instantiated_collector) + + def get(self): + import prometheus_client + + self.write(prometheus_client.generate_latest()) + self.set_header("Content-Type", "text/plain; version=0.0.4") diff --git a/distributed/http/scheduler/prometheus/semaphore.py b/distributed/http/scheduler/prometheus/semaphore.py index c59e7a3c4b..c1e6a013a3 100644 --- a/distributed/http/scheduler/prometheus/semaphore.py +++ b/distributed/http/scheduler/prometheus/semaphore.py @@ -1,52 +1,40 @@ -import dask.config +from distributed.http.prometheus import PrometheusCollector -class SemaphoreMetricExtension: - def __init__(self, dask_server): - self.server = dask_server - self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") +class SemaphoreMetricCollector(PrometheusCollector): + def __init__(self, server): + super().__init__(server) self.subsystem = "scheduler_semaphore" def collect(self): from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily - from prometheus_client.metrics import _build_full_name sem_ext = self.server.extensions["semaphores"] semaphore_max_leases_family = GaugeMetricFamily( - _build_full_name( - "max_leases", namespace=self.namespace, subsystem=self.subsystem - ), + self.build_name("max_leases"), "Maximum leases allowed per semaphore, this will be constant for each semaphore during its lifetime.", labels=["name"], ) semaphore_active_leases_family = GaugeMetricFamily( - _build_full_name( - "active_leases", namespace=self.namespace, subsystem=self.subsystem - ), + self.build_name("active_leases"), "Amount of currently active leases per semaphore.", labels=["name"], ) semaphore_pending_leases = GaugeMetricFamily( - _build_full_name( - "pending_leases", namespace=self.namespace, subsystem=self.subsystem - ), + self.build_name("pending_leases"), "Amount of currently pending leases per semaphore.", labels=["name"], ) semaphore_acquire_total = CounterMetricFamily( - _build_full_name( - "acquire_total", namespace=self.namespace, subsystem=self.subsystem - ), + self.build_name("acquire_total"), "Total number of leases acquired per semaphore.", labels=["name"], ) semaphore_release_total = CounterMetricFamily( - _build_full_name( - "release_total", namespace=self.namespace, subsystem=self.subsystem - ), + self.build_name("release_total"), "Total number of leases released per semaphore.\n" "Note: if a semaphore is closed while there are still leases active, this count will not equal " "`semaphore_acquired_total` after execution.", @@ -54,7 +42,7 @@ def collect(self): ) semaphore_average_pending_lease_time = GaugeMetricFamily( - "semaphore_average_pending_lease_time", + self.build_name("average_pending_lease_time"), "Exponential moving average of the time it took to acquire a lease per semaphore.\n" "Note: this only includes time spent on scheduler side, " "it does" diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 21996cb35f..971c4b55be 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -19,19 +19,19 @@ async def fetch_metrics(): families = { family.name: family for family in text_string_to_metric_families(txt) - if family.name.startswith("semaphore_") + if family.name.startswith("dask_scheduler_semaphore_") } return families active_metrics = await fetch_metrics() expected_metrics = { - "semaphore_max_leases", - "semaphore_active_leases", - "semaphore_pending_leases", - "semaphore_acquire", - "semaphore_release", - "semaphore_average_pending_lease_time_s", + "dask_scheduler_semaphore_max_leases", + "dask_scheduler_semaphore_active_leases", + "dask_scheduler_semaphore_pending_leases", + "dask_scheduler_semaphore_acquire", + "dask_scheduler_semaphore_release", + "dask_scheduler_semaphore_average_pending_lease_time_s", } assert active_metrics.keys() == expected_metrics @@ -48,28 +48,46 @@ async def fetch_metrics(): assert len(samples) == 1 sample = samples.pop() assert sample.labels["name"] == "test" - if name == "semaphore_max_leases": + if name == "dask_scheduler_semaphore_max_leases": assert sample.value == 2 else: assert sample.value == 0 assert await sem.acquire() active_metrics = await fetch_metrics() - assert active_metrics["semaphore_max_leases"].samples[0].value == 2 - assert active_metrics["semaphore_active_leases"].samples[0].value == 1 - assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0 - assert active_metrics["semaphore_acquire"].samples[0].value == 1 - assert active_metrics["semaphore_release"].samples[0].value == 0 - assert active_metrics["semaphore_pending_leases"].samples[0].value == 0 + assert active_metrics["dask_scheduler_semaphore_max_leases"].samples[0].value == 2 + assert ( + active_metrics["dask_scheduler_semaphore_active_leases"].samples[0].value == 1 + ) + assert ( + active_metrics["dask_scheduler_semaphore_average_pending_lease_time_s"] + .samples[0] + .value + > 0 + ) + assert active_metrics["dask_scheduler_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_scheduler_semaphore_release"].samples[0].value == 0 + assert ( + active_metrics["dask_scheduler_semaphore_pending_leases"].samples[0].value == 0 + ) assert await sem.release() is True active_metrics = await fetch_metrics() - assert active_metrics["semaphore_max_leases"].samples[0].value == 2 - assert active_metrics["semaphore_active_leases"].samples[0].value == 0 - assert active_metrics["semaphore_average_pending_lease_time_s"].samples[0].value > 0 - assert active_metrics["semaphore_acquire"].samples[0].value == 1 - assert active_metrics["semaphore_release"].samples[0].value == 1 - assert active_metrics["semaphore_pending_leases"].samples[0].value == 0 + assert active_metrics["dask_scheduler_semaphore_max_leases"].samples[0].value == 2 + assert ( + active_metrics["dask_scheduler_semaphore_active_leases"].samples[0].value == 0 + ) + assert ( + active_metrics["dask_scheduler_semaphore_average_pending_lease_time_s"] + .samples[0] + .value + > 0 + ) + assert active_metrics["dask_scheduler_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_scheduler_semaphore_release"].samples[0].value == 1 + assert ( + active_metrics["dask_scheduler_semaphore_pending_leases"].samples[0].value == 0 + ) await sem.close() active_metrics = await fetch_metrics() diff --git a/distributed/http/worker/prometheus.py b/distributed/http/worker/prometheus.py deleted file mode 100644 index 71c2a5433b..0000000000 --- a/distributed/http/worker/prometheus.py +++ /dev/null @@ -1,123 +0,0 @@ -import logging - -import dask.config - -from ..utils import RequestHandler - - -class _PrometheusCollector: - def __init__(self, server): - self.worker = server - self.logger = logging.getLogger("distributed.dask_worker") - self.namespace = dask.config.get("distributed.dashboard.prometheus.namespace") - self.subsystem = "worker" - self.crick_available = True - try: - import crick # noqa: F401 - except ImportError: - self.crick_available = False - self.logger.info( - "Not all prometheus metrics available are exported. Digest-based metrics require crick to be installed" - ) - - def collect(self): - from prometheus_client.core import GaugeMetricFamily - from prometheus_client.metrics import _build_full_name - - tasks = GaugeMetricFamily( - _build_full_name( - "tasks", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of tasks at worker.", - labels=["state"], - ) - tasks.add_metric(["stored"], len(self.worker.data)) - tasks.add_metric(["executing"], self.worker.executing_count) - tasks.add_metric(["ready"], len(self.worker.ready)) - tasks.add_metric(["waiting"], self.worker.waiting_for_data_count) - tasks.add_metric(["serving"], len(self.worker._comms)) - yield tasks - - yield GaugeMetricFamily( - _build_full_name( - "connections", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of task connections to other workers.", - value=len(self.worker.in_flight_workers), - ) - - yield GaugeMetricFamily( - _build_full_name( - "threads", namespace=self.namespace, subsystem=self.subsystem - ), - "Number of worker threads.", - value=self.worker.nthreads, - ) - - yield GaugeMetricFamily( - _build_full_name( - "latency_seconds", namespace=self.namespace, subsystem=self.subsystem - ), - "Latency of worker connection.", - value=self.worker.latency, - ) - - # all metrics using digests require crick to be installed - # the following metrics will export NaN, if the corresponding digests are None - if self.crick_available: - yield GaugeMetricFamily( - _build_full_name( - "tick_duration_median_seconds", - namespace=self.namespace, - subsystem=self.subsystem, - ), - "Median tick duration at worker.", - value=self.worker.digests["tick-duration"].components[1].quantile(50), - ) - - yield GaugeMetricFamily( - _build_full_name( - "task_duration_median_seconds", - namespace=self.namespace, - subsystem=self.subsystem, - ), - "Median task runtime at worker.", - value=self.worker.digests["task-duration"].components[1].quantile(50), - ) - - yield GaugeMetricFamily( - _build_full_name( - "transfer_bandwidth_median_bytes", - namespace=self.namespace, - subsystem=self.subsystem, - ), - "Bandwidth for transfer at worker in Bytes.", - value=self.worker.digests["transfer-bandwidth"] - .components[1] - .quantile(50), - ) - - -class PrometheusHandler(RequestHandler): - _initialized = False - - def __init__(self, *args, **kwargs): - import prometheus_client - - super().__init__(*args, **kwargs) - - if PrometheusHandler._initialized: - return - - prometheus_client.REGISTRY.register(_PrometheusCollector(self.server)) - - PrometheusHandler._initialized = True - - def get(self): - import prometheus_client - - self.write(prometheus_client.generate_latest()) - self.set_header("Content-Type", "text/plain; version=0.0.4") - - -routes = [(r"metrics", PrometheusHandler, {})] diff --git a/distributed/http/worker/prometheus/__init__.py b/distributed/http/worker/prometheus/__init__.py new file mode 100644 index 0000000000..63c0310d0a --- /dev/null +++ b/distributed/http/worker/prometheus/__init__.py @@ -0,0 +1,3 @@ +from .core import PrometheusHandler + +routes = [("/metrics", PrometheusHandler, {})] diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py new file mode 100644 index 0000000000..2933cd1d41 --- /dev/null +++ b/distributed/http/worker/prometheus/core.py @@ -0,0 +1,97 @@ +import logging + +from distributed.http.prometheus import PrometheusCollector +from distributed.http.utils import RequestHandler + + +class WorkerMetricCollector(PrometheusCollector): + def __init__(self, server): + super().__init__(server) + self.logger = logging.getLogger("distributed.dask_worker") + self.subsystem = "worker" + self.crick_available = True + try: + import crick # noqa: F401 + except ImportError: + self.crick_available = False + self.logger.info( + "Not all prometheus metrics available are exported. Digest-based metrics require crick to be installed" + ) + + def collect(self): + from prometheus_client.core import GaugeMetricFamily + + tasks = GaugeMetricFamily( + self.build_name("tasks"), + "Number of tasks at worker.", + labels=["state"], + ) + tasks.add_metric(["stored"], len(self.server.data)) + tasks.add_metric(["executing"], self.server.executing_count) + tasks.add_metric(["ready"], len(self.server.ready)) + tasks.add_metric(["waiting"], self.server.waiting_for_data_count) + tasks.add_metric(["serving"], len(self.server._comms)) + yield tasks + + yield GaugeMetricFamily( + self.build_name("connections"), + "Number of task connections to other workers.", + value=len(self.server.in_flight_workers), + ) + + yield GaugeMetricFamily( + self.build_name("threads"), + "Number of worker threads.", + value=self.server.nthreads, + ) + + yield GaugeMetricFamily( + self.build_name("latency_seconds"), + "Latency of worker connection.", + value=self.server.latency, + ) + + # all metrics using digests require crick to be installed + # the following metrics will export NaN, if the corresponding digests are None + if self.crick_available: + yield GaugeMetricFamily( + self.build_name("tick_duration_median_seconds"), + "Median tick duration at worker.", + value=self.server.digests["tick-duration"].components[1].quantile(50), + ) + + yield GaugeMetricFamily( + self.build_name("task_duration_median_seconds"), + "Median task runtime at worker.", + value=self.server.digests["task-duration"].components[1].quantile(50), + ) + + yield GaugeMetricFamily( + self.build_name("transfer_bandwidth_median_bytes"), + "Bandwidth for transfer at worker in Bytes.", + value=self.server.digests["transfer-bandwidth"] + .components[1] + .quantile(50), + ) + + +class PrometheusHandler(RequestHandler): + _initialized = False + + def __init__(self, *args, **kwargs): + import prometheus_client + + super().__init__(*args, **kwargs) + + if PrometheusHandler._initialized: + return + + prometheus_client.REGISTRY.register(WorkerMetricCollector(self.server)) + + PrometheusHandler._initialized = True + + def get(self): + import prometheus_client + + self.write(prometheus_client.generate_latest()) + self.set_header("Content-Type", "text/plain; version=0.0.4") From eaf713c5e12314cf4a4c72ae49636be1361b226d Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Wed, 21 Apr 2021 14:34:44 +0100 Subject: [PATCH 4/5] Rename semaphore subsystem --- .../http/scheduler/prometheus/semaphore.py | 2 +- .../scheduler/tests/test_semaphore_http.py | 52 +++++++------------ 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/distributed/http/scheduler/prometheus/semaphore.py b/distributed/http/scheduler/prometheus/semaphore.py index c1e6a013a3..36defbbeea 100644 --- a/distributed/http/scheduler/prometheus/semaphore.py +++ b/distributed/http/scheduler/prometheus/semaphore.py @@ -4,7 +4,7 @@ class SemaphoreMetricCollector(PrometheusCollector): def __init__(self, server): super().__init__(server) - self.subsystem = "scheduler_semaphore" + self.subsystem = "semaphore" def collect(self): from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily diff --git a/distributed/http/scheduler/tests/test_semaphore_http.py b/distributed/http/scheduler/tests/test_semaphore_http.py index 971c4b55be..67842e141f 100644 --- a/distributed/http/scheduler/tests/test_semaphore_http.py +++ b/distributed/http/scheduler/tests/test_semaphore_http.py @@ -19,19 +19,19 @@ async def fetch_metrics(): families = { family.name: family for family in text_string_to_metric_families(txt) - if family.name.startswith("dask_scheduler_semaphore_") + if family.name.startswith("dask_semaphore_") } return families active_metrics = await fetch_metrics() expected_metrics = { - "dask_scheduler_semaphore_max_leases", - "dask_scheduler_semaphore_active_leases", - "dask_scheduler_semaphore_pending_leases", - "dask_scheduler_semaphore_acquire", - "dask_scheduler_semaphore_release", - "dask_scheduler_semaphore_average_pending_lease_time_s", + "dask_semaphore_max_leases", + "dask_semaphore_active_leases", + "dask_semaphore_pending_leases", + "dask_semaphore_acquire", + "dask_semaphore_release", + "dask_semaphore_average_pending_lease_time_s", } assert active_metrics.keys() == expected_metrics @@ -48,46 +48,34 @@ async def fetch_metrics(): assert len(samples) == 1 sample = samples.pop() assert sample.labels["name"] == "test" - if name == "dask_scheduler_semaphore_max_leases": + if name == "dask_semaphore_max_leases": assert sample.value == 2 else: assert sample.value == 0 assert await sem.acquire() active_metrics = await fetch_metrics() - assert active_metrics["dask_scheduler_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 1 assert ( - active_metrics["dask_scheduler_semaphore_active_leases"].samples[0].value == 1 - ) - assert ( - active_metrics["dask_scheduler_semaphore_average_pending_lease_time_s"] - .samples[0] - .value + active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value > 0 ) - assert active_metrics["dask_scheduler_semaphore_acquire"].samples[0].value == 1 - assert active_metrics["dask_scheduler_semaphore_release"].samples[0].value == 0 - assert ( - active_metrics["dask_scheduler_semaphore_pending_leases"].samples[0].value == 0 - ) + assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_semaphore_release"].samples[0].value == 0 + assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 assert await sem.release() is True active_metrics = await fetch_metrics() - assert active_metrics["dask_scheduler_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_max_leases"].samples[0].value == 2 + assert active_metrics["dask_semaphore_active_leases"].samples[0].value == 0 assert ( - active_metrics["dask_scheduler_semaphore_active_leases"].samples[0].value == 0 - ) - assert ( - active_metrics["dask_scheduler_semaphore_average_pending_lease_time_s"] - .samples[0] - .value + active_metrics["dask_semaphore_average_pending_lease_time_s"].samples[0].value > 0 ) - assert active_metrics["dask_scheduler_semaphore_acquire"].samples[0].value == 1 - assert active_metrics["dask_scheduler_semaphore_release"].samples[0].value == 1 - assert ( - active_metrics["dask_scheduler_semaphore_pending_leases"].samples[0].value == 0 - ) + assert active_metrics["dask_semaphore_acquire"].samples[0].value == 1 + assert active_metrics["dask_semaphore_release"].samples[0].value == 1 + assert active_metrics["dask_semaphore_pending_leases"].samples[0].value == 0 await sem.close() active_metrics = await fetch_metrics() From 149cef804a625581875adc1dd7ffca4a6da02cf9 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Thu, 22 Apr 2021 13:56:37 +0100 Subject: [PATCH 5/5] Remove serving metric and rename connections metric --- distributed/http/worker/prometheus/core.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/http/worker/prometheus/core.py b/distributed/http/worker/prometheus/core.py index 2933cd1d41..cd082ea6a3 100644 --- a/distributed/http/worker/prometheus/core.py +++ b/distributed/http/worker/prometheus/core.py @@ -30,12 +30,11 @@ def collect(self): tasks.add_metric(["executing"], self.server.executing_count) tasks.add_metric(["ready"], len(self.server.ready)) tasks.add_metric(["waiting"], self.server.waiting_for_data_count) - tasks.add_metric(["serving"], len(self.server._comms)) yield tasks yield GaugeMetricFamily( - self.build_name("connections"), - "Number of task connections to other workers.", + self.build_name("concurrent_fetch_requests"), + "Number of open fetch requests to other workers.", value=len(self.server.in_flight_workers), )