Skip to content

Commit

Permalink
🐛Dask: fix issue with disappearing workers (#4057)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Apr 3, 2023
1 parent 661a727 commit 0e4bf95
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 16 deletions.
5 changes: 3 additions & 2 deletions services/director-v2/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ anyio==3.6.1
# starlette
# watchgod
arrow==1.2.3
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
asgiref==3.5.2
# via uvicorn
async-timeout==4.0.2
Expand Down Expand Up @@ -89,7 +91,6 @@ commonmark==0.9.1
# via rich
dask==2023.3.0
# via
# -r requirements/../../../packages/dask-task-models-library/requirements/_base.in
# -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# dask-gateway
# distributed
Expand Down
1 change: 1 addition & 0 deletions services/director-v2/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async-asgi-testclient # replacement for fastapi.testclient.TestClient [see b) be
codecov
coveralls
dask-gateway-server[local]
dask[distributed,diagnostics]
docker
Faker
flaky
Expand Down
90 changes: 90 additions & 0 deletions services/director-v2/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ attrs==21.4.0
# aiohttp
# pytest
# pytest-docker
bokeh==2.4.3
# via dask
boto3==1.24.59
# via aiobotocore
botocore==1.27.59
Expand All @@ -69,6 +71,16 @@ charset-normalizer==2.0.12
# -c requirements/_base.txt
# aiohttp
# requests
click==8.1.3
# via
# -c requirements/_base.txt
# dask
# distributed
cloudpickle==2.2.1
# via
# -c requirements/_base.txt
# dask
# distributed
codecov==2.1.12
# via -r requirements/_test.in
colorlog==6.7.0
Expand All @@ -84,10 +96,18 @@ cryptography==39.0.1
# via
# -c requirements/../../../requirements/constraints.txt
# dask-gateway-server
dask==2023.3.0
# via
# -r requirements/_test.in
# distributed
dask-gateway-server==2023.1.1
# via -r requirements/_test.in
dill==0.3.6
# via pylint
distributed==2023.3.0
# via
# -c requirements/_base.txt
# dask
docker==6.0.1
# via -r requirements/_test.in
docopt==0.6.2
Expand All @@ -105,6 +125,10 @@ frozenlist==1.3.0
# -c requirements/_base.txt
# aiohttp
# aiosignal
fsspec==2023.3.0
# via
# -c requirements/_base.txt
# dask
greenlet==1.1.2
# via
# -c requirements/_base.txt
Expand All @@ -113,6 +137,10 @@ h11==0.12.0
# via
# -c requirements/_base.txt
# httpcore
heapdict==1.0.1
# via
# -c requirements/_base.txt
# zict
httpcore==0.15.0
# via
# -c requirements/_base.txt
Expand All @@ -134,45 +162,76 @@ iniconfig==2.0.0
# via pytest
isort==5.12.0
# via pylint
jinja2==3.1.2
# via
# -c requirements/_base.txt
# bokeh
# dask
# distributed
jmespath==1.0.1
# via
# boto3
# botocore
lazy-object-proxy==1.9.0
# via astroid
locket==1.0.0
# via
# -c requirements/_base.txt
# distributed
# partd
mako==1.2.2
# via
# -c requirements/_base.txt
# alembic
markupsafe==2.1.2
# via
# -c requirements/_base.txt
# jinja2
# mako
mccabe==0.7.0
# via pylint
minio==7.0.4
# via -r requirements/_test.in
msgpack==1.0.5
# via
# -c requirements/_base.txt
# distributed
multidict==6.0.2
# via
# -c requirements/_base.txt
# aiohttp
# async-asgi-testclient
# yarl
numpy==1.24.2
# via bokeh
packaging==23.0
# via
# -c requirements/_base.txt
# bokeh
# dask
# distributed
# docker
# pytest
pamqp==3.2.1
# via
# -c requirements/_base.txt
# aiormq
partd==1.3.0
# via
# -c requirements/_base.txt
# dask
pillow==9.5.0
# via bokeh
platformdirs==3.0.0
# via pylint
pluggy==1.0.0
# via pytest
pprintpp==0.4.0
# via pytest-icdiff
psutil==5.9.4
# via
# -c requirements/_base.txt
# distributed
pycparser==2.21
# via cffi
pylint==2.16.2
Expand Down Expand Up @@ -208,6 +267,12 @@ python-dateutil==2.8.2
# -c requirements/_base.txt
# botocore
# faker
pyyaml==5.4.1
# via
# -c requirements/_base.txt
# bokeh
# dask
# distributed
requests==2.27.1
# via
# -c requirements/_base.txt
Expand All @@ -234,28 +299,49 @@ sniffio==1.2.0
# asgi-lifespan
# httpcore
# httpx
sortedcontainers==2.4.0
# via
# -c requirements/_base.txt
# distributed
sqlalchemy==1.4.37
# via
# -c requirements/_base.txt
# alembic
# dask-gateway-server
tblib==1.7.0
# via
# -c requirements/_base.txt
# distributed
tomli==2.0.1
# via
# coverage
# pylint
# pytest
tomlkit==0.11.6
# via pylint
toolz==0.12.0
# via
# -c requirements/_base.txt
# dask
# distributed
# partd
tornado==6.2
# via
# -c requirements/_base.txt
# bokeh
# distributed
traitlets==5.9.0
# via dask-gateway-server
typing-extensions==4.3.0
# via
# -c requirements/_base.txt
# astroid
# bokeh
urllib3==1.26.14
# via
# -c requirements/_base.txt
# botocore
# distributed
# docker
# minio
# requests
Expand All @@ -271,3 +357,7 @@ yarl==1.7.2
# aio-pika
# aiohttp
# aiormq
zict==2.2.0
# via
# -c requirements/_base.txt
# distributed
4 changes: 2 additions & 2 deletions services/director-v2/requirements/_tools.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ cfgv==3.3.1
# via pre-commit
click==8.1.3
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
# black
# pip-tools
dill==0.3.6
Expand Down Expand Up @@ -69,7 +69,7 @@ pyproject-hooks==1.0.0
# via build
pyyaml==5.4.1
# via
# -c requirements/_base.txt
# -c requirements/_test.txt
# pre-commit
# watchdog
tomli==2.0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import logging
import traceback
from collections import deque
from copy import deepcopy
from dataclasses import dataclass, field
from http.client import HTTPException
from typing import Any, Callable, Deque, Final, Optional
Expand All @@ -35,6 +36,7 @@
from models_library.users import UserID
from pydantic import parse_obj_as
from pydantic.networks import AnyUrl
from servicelib.logging_utils import log_catch
from settings_library.s3 import S3Settings
from simcore_sdk.node_ports_v2 import FileLinkType
from simcore_service_director_v2.modules.storage import StorageClient
Expand Down Expand Up @@ -441,19 +443,32 @@ def _get_worker_used_resources(
dask_scheduler: distributed.Scheduler,
) -> dict[str, dict]:
used_resources = {}
for worker_name in dask_scheduler.workers:
worker = dask_scheduler.workers[worker_name]
used_resources[worker_name] = worker.used_resources
for worker_name, worker_state in dask_scheduler.workers.items():
used_resources[worker_name] = worker_state.used_resources
return used_resources

used_resources_per_worker: dict[
str, dict[str, Any]
] = await self.backend.client.run_on_scheduler(
_get_worker_used_resources
) # type: ignore

for k, v in used_resources_per_worker.items():
scheduler_info.get("workers", {}).get(k, {}).update(used_resources=v)
with log_catch(logger, reraise=False):
# NOTE: this runs directly on the dask-scheduler and may rise exceptions
used_resources_per_worker: dict[
str, dict[str, Any]
] = await self.backend.client.run_on_scheduler(
_get_worker_used_resources
) # type: ignore

# let's update the scheduler info, with default to 0s since sometimes
# workers are destroyed/created without us knowing right away
for worker_name, worker_info in scheduler_info.get("workers", {}).items():
used_resources: dict[str, float] = deepcopy(
worker_info.get("resources", {})
)
# reset default values
for res_name in used_resources:
used_resources[res_name] = 0
# if the scheduler has info, let's override them
used_resources = used_resources_per_worker.get(
worker_name, used_resources
)
worker_info.update(used_resources=used_resources)

assert dashboard_link # nosec
return ClusterDetails(
Expand Down
33 changes: 32 additions & 1 deletion services/director-v2/tests/unit/test_modules_dask_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pylint:disable=too-many-arguments
# pylint: disable=reimported
import asyncio
import datetime
import functools
import traceback
from dataclasses import dataclass
Expand Down Expand Up @@ -34,6 +35,7 @@
from distributed.deploy.spec import SpecCluster
from faker import Faker
from fastapi.applications import FastAPI
from models_library.api_schemas_storage import LinkType
from models_library.clusters import ClusterID, NoAuthentication, SimpleAuthentication
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
Expand All @@ -45,6 +47,7 @@
from pytest import MonkeyPatch
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from servicelib.background_task import periodic_task
from settings_library.s3 import S3Settings
from simcore_sdk.node_ports_v2 import FileLinkType
from simcore_service_director_v2.core.errors import (
Expand Down Expand Up @@ -200,7 +203,8 @@ async def factory() -> DaskClient:
assert client.backend.gateway
assert client.backend.gateway_cluster

scheduler_infos = client.backend.client.scheduler_info() # type: ignore
scheduler_infos = client.backend.client.scheduler_info()
assert scheduler_infos
print(f"--> Connected to gateway {client.backend.gateway=}")
print(f"--> Cluster {client.backend.gateway_cluster=}")
print(f"--> Client {client=}")
Expand Down Expand Up @@ -1163,3 +1167,30 @@ def fake_sidecar_fct(
].used_resources

assert all(res == 0.0 for res in currently_used_resources.values())


@pytest.mark.skip(reason="manual testing")
@pytest.mark.parametrize("tasks_file_link_type", [LinkType.S3], indirect=True)
async def test_get_cluster_details_robust_to_worker_disappearing(
create_dask_client_from_gateway: Callable[[], Awaitable[DaskClient]]
):
"""When running a high number of comp. services in a gateway,
one could observe an issue where getting the cluster used resources
would fail sometimes and generate a big amount of errors in the logs
due to dask worker disappearing or not completely ready.
This test kind of simulates this."""
dask_client = await create_dask_client_from_gateway()
await dask_client.get_cluster_details()

async def _scale_up_and_down():
assert dask_client.backend.gateway_cluster
await dask_client.backend.gateway_cluster.scale(40)
await asyncio.sleep(1)
await dask_client.backend.gateway_cluster.scale(1)

async with periodic_task(
_scale_up_and_down, interval=datetime.timedelta(seconds=1), task_name="pytest"
):
for n in range(900):
await dask_client.get_cluster_details()
await asyncio.sleep(0.1)

0 comments on commit 0e4bf95

Please sign in to comment.