Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fixes several issues in director-v2 integration test #2626

Merged
merged 29 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4f802b4
use dataclass instead of namedtuple
sanderegg Nov 2, 2021
6fdf97c
types
sanderegg Nov 2, 2021
a1ef859
make this test less flaky
sanderegg Nov 2, 2021
59c3d01
remove unnecessary file
sanderegg Nov 2, 2021
b8564af
types
sanderegg Nov 2, 2021
6432ccc
typo
sanderegg Nov 2, 2021
be99304
this one might help
sanderegg Nov 2, 2021
db6ee65
typo
sanderegg Nov 2, 2021
5e4155d
remove unnecessary code
sanderegg Nov 2, 2021
8e3b552
linter
sanderegg Nov 2, 2021
9f2a357
properly cancel background tasks
sanderegg Nov 2, 2021
338d569
name tasks
sanderegg Nov 2, 2021
5a3a370
name tasks
sanderegg Nov 2, 2021
38751d5
more info in pending resources
pcrespov Nov 3, 2021
277efca
fixes linter errors and py3.6 syntax
pcrespov Nov 3, 2021
7876dcf
@GitHK review: includes volumes in resources to monitor
pcrespov Nov 3, 2021
e0fc213
py3.6c ompatible
pcrespov Nov 3, 2021
2858ce5
enfoces containers to be removed
pcrespov Nov 3, 2021
163e668
cleanup logging and retry
pcrespov Nov 3, 2021
b5f8cac
Fixes wrong query name
pcrespov Nov 3, 2021
b3c7e8a
cleanup stack
pcrespov Nov 3, 2021
bed88ef
Merge branch 'maintenance/improve_test_reliability' of https://github…
pcrespov Nov 3, 2021
bcfacff
After merge, allow_redirects=True
pcrespov Nov 3, 2021
e107293
revert previous
pcrespov Nov 3, 2021
2843cb8
fix redirect with GET
sanderegg Nov 3, 2021
15aa1e5
refactor
sanderegg Nov 3, 2021
b5d07d6
Merge branch 'maintenance/ci-hanging' of github.com:pcrespov/osparc-s…
sanderegg Nov 3, 2021
d71043c
bad merge
sanderegg Nov 3, 2021
108661d
error while creating task
sanderegg Nov 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 57 additions & 38 deletions packages/pytest-simcore/src/pytest_simcore/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name

#
# NOTE this file must be py3.6 compatible because it is used by the director
#
import json
import logging
import subprocess
import time
from datetime import datetime
from pathlib import Path
from pprint import pprint
from typing import Dict, Iterator, Type
from typing import Dict, Iterator

import docker
import pytest
import tenacity
import yaml
from docker.errors import APIError
from tenacity import Retrying
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt, stop_after_delay
from tenacity.wait import wait_exponential, wait_fixed
Expand All @@ -25,6 +28,15 @@

log = logging.getLogger(__name__)

_MINUTE: int = 60 # secs
HEADER: str = "{:-^50}"


DEFAULT_RETRY_POLICY = dict(
wait=wait_exponential(),
stop=stop_after_delay(15),
)


class _NotInSwarmException(Exception):
pass
Expand All @@ -34,6 +46,10 @@ class _StillInSwarmException(Exception):
pass


class _ResourceStillNotRemoved(Exception):
pass


def _in_docker_swarm(
docker_client: docker.client.DockerClient, raise_error: bool = False
) -> bool:
Expand All @@ -48,14 +64,6 @@ def _in_docker_swarm(
return True


def _attempt_for(retry_error_cls: Type[Exception]) -> tenacity.Retrying:
return tenacity.Retrying(
wait=wait_exponential(),
stop=stop_after_delay(15),
retry_error_cls=retry_error_cls,
)


@pytest.fixture(scope="session")
def docker_client() -> Iterator[docker.client.DockerClient]:
client = docker.from_env()
Expand All @@ -71,7 +79,9 @@ def keep_docker_up(request) -> bool:
def docker_swarm(
docker_client: docker.client.DockerClient, keep_docker_up: Iterator[bool]
) -> Iterator[None]:
for attempt in _attempt_for(retry_error_cls=_NotInSwarmException):
for attempt in Retrying(
retry_error_cls=_NotInSwarmException, **DEFAULT_RETRY_POLICY
):
with attempt:
if not _in_docker_swarm(docker_client):
docker_client.swarm.init(advertise_addr=get_ip())
Expand Down Expand Up @@ -185,6 +195,8 @@ def docker_stack(
"services": [service.name for service in docker_client.services.list()],
}

## TEAR DOWN ----------------------

_print_services(docker_client, "[AFTER TEST]")

if keep_docker_up:
Expand All @@ -205,46 +217,53 @@ def docker_stack(

# make down
# NOTE: remove them in reverse order since stacks share common networks
WAIT_BEFORE_RETRY_SECS = 1

HEADER = "{:-^20}"
stacks.reverse()
for _, stack, _ in stacks:

try:
subprocess.run(f"docker stack remove {stack}", shell=True, check=True)
subprocess.run(
f"docker stack remove {stack}",
shell=True,
check=True,
capture_output=True,
)
pcrespov marked this conversation as resolved.
Show resolved Hide resolved
except subprocess.CalledProcessError as err:
log.warning(
"Ignoring failure while executing '%s' (returned code %d):\n%s\n%s\n%s\n%s\n",
err.cmd,
err.returncode,
HEADER.format("stdout"),
err.stdout,
err.stdout.decode("utf8") if err.stdout else "",
HEADER.format("stderr"),
err.stderr,
err.stderr.decode("utf8") if err.stderr else "",
)

while docker_client.services.list(
filters={"label": f"com.docker.stack.namespace={stack}"}
):
time.sleep(WAIT_BEFORE_RETRY_SECS)

while docker_client.networks.list(
filters={"label": f"com.docker.stack.namespace={stack}"}
):
time.sleep(WAIT_BEFORE_RETRY_SECS)

while docker_client.containers.list(
filters={"label": f"com.docker.stack.namespace={stack}"}
):
time.sleep(WAIT_BEFORE_RETRY_SECS)

for attempt in _attempt_for(retry_error_cls=APIError):
with attempt:
list_of_volumes = docker_client.volumes.list(
filters={"label": f"com.docker.stack.namespace={stack}"}
)
for volume in list_of_volumes:
volume.remove(force=True)
# Waits that all resources get removed or force them
# The check order is intentional because some resources depend on others to be removed
# e.g. cannot remove networks/volumes used by running containers
for resource_name in ("services", "containers", "volumes", "networks"):
resource_client = getattr(docker_client, resource_name)

for attempt in Retrying(
wait=wait_exponential(),
stop=stop_after_delay(3 * _MINUTE),
before_sleep=before_sleep_log(log, logging.WARNING),
reraise=True,
):
with attempt:
pending = resource_client.list(
filters={"label": f"com.docker.stack.namespace={stack}"}
)
if pending:
if resource_name in ("volumes",):
# WARNING: rm volumes on this stack migh be a problem when shared between different stacks
# NOTE: volumes are removed to avoid mixing configs (e.g. postgres db credentials)
for resource in pending:
resource.remove(force=True)

raise _ResourceStillNotRemoved(
f"Waiting for {len(pending)} {resource_name} to shutdown: {pending}."
)

_print_services(docker_client, "[AFTER REMOVED]")
13 changes: 7 additions & 6 deletions packages/pytest-simcore/src/pytest_simcore/simcore_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
import tenacity
from _pytest.monkeypatch import MonkeyPatch
from aiohttp.client import ClientTimeout
from tenacity.after import after_log
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_random
from tenacity.wait import wait_exponential
from yarl import URL

from .helpers.utils_docker import get_ip, get_service_published_port
Expand Down Expand Up @@ -118,17 +117,19 @@ async def simcore_services_ready(


_MINUTE: Final[int] = 60

# HELPERS --
@tenacity.retry(
wait=wait_random(2, 15),
wait=wait_exponential(),
stop=stop_after_delay(5 * _MINUTE),
before_sleep=before_sleep_log(log, logging.WARNING),
after=after_log(log, logging.ERROR),
reraise=True,
)
async def wait_till_service_responsive(service_name: str, endpoint: URL):
print(f"trying to connect with '{service_name}' through '{endpoint}'")
async with aiohttp.ClientSession(timeout=ClientTimeout(total=1)) as session:
FAST = ClientTimeout(total=1) # type: ignore

print(f"Trying to connect with '{service_name}' through '{endpoint}'")
async with aiohttp.ClientSession(timeout=FAST) as session:
async with session.get(endpoint) as resp:
# NOTE: Health-check endpoint require only a
# status code 200 (see e.g. services/web/server/docker/healthcheck.py)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ def cancellable_request(handler: Callable[[Any], Coroutine[Any, Any, Response]])
@wraps(handler)
async def decorator(request: Request, *args, **kwargs) -> Response:
handler_task = asyncio.get_event_loop().create_task(
handler(request, *args, **kwargs)
handler(request, *args, **kwargs), name=f"cancellable_request_{request.url}"
)
auto_cancel_task = asyncio.get_event_loop().create_task(
_cancel_task_if_client_disconnected(request, handler_task)
_cancel_task_if_client_disconnected(request, handler_task),
name=f"task_cancellation_monitor_{request.url}",
)
try:
return await handler_task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def start_interactive_service(
"simcore/services/dynamic/3dviewer",
],
),
service_version: str = Query(
service_tag: str = Query(
...,
description="The tag/version of the service",
regex=VERSION_RE,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import logging
import traceback
from asyncio import Lock, Queue, Task, sleep
Expand Down Expand Up @@ -314,7 +315,10 @@ async def observing_single_service(service_name: str) -> None:
)
if resource_marked_as_locked:
# fire and forget about the task
asyncio.create_task(observing_single_service(service_name))
asyncio.create_task(
observing_single_service(service_name),
name=f"observe {service_name}",
)

logger.info("Scheduler 'trigger observation queue task' was shut down")

Expand Down Expand Up @@ -362,9 +366,12 @@ async def start(self) -> None:
# run as a background task
logger.info("Starting dynamic-sidecar scheduler")
self._keep_running = True
self._scheduler_task = asyncio.create_task(self._run_scheduler_task())
self._scheduler_task = asyncio.create_task(
self._run_scheduler_task(), name="dynamic-scheduler"
)
self._trigger_observation_queue_task = asyncio.create_task(
self._run_trigger_observation_queue_task()
self._run_trigger_observation_queue_task(),
name="dynamic-scheduler-trigger-obs-queue",
)

await self._discover_running_services()
Expand All @@ -376,12 +383,16 @@ async def shutdown(self):
self._to_observe = {}

if self._scheduler_task is not None:
await self._scheduler_task
self._scheduler_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._scheduler_task
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
self._scheduler_task = None

if self._trigger_observation_queue_task is not None:
await self._trigger_observation_queue.put(None)
await self._trigger_observation_queue_task
self._trigger_observation_queue_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._trigger_observation_queue_task
self._trigger_observation_queue_task = None
self._trigger_observation_queue = Queue()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
assert_start_service,
assert_stop_service,
ensure_network_cleanup,
get_director_v0_patched_url,
is_legacy,
patch_dynamic_service_url,
)
Expand Down Expand Up @@ -118,6 +117,7 @@ def minimal_configuration( # pylint:disable=too-many-arguments
ensure_swarm_and_networks: None,
) -> Iterator[None]:
with postgres_db.connect() as conn:
# pylint: disable=no-value-for-parameter
conn.execute(comp_tasks.delete())
conn.execute(comp_pipeline.delete())
yield
Expand Down Expand Up @@ -530,7 +530,6 @@ async def _wait_for_dynamic_services_to_be_running(
*(
assert_start_service(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
user_id=user_id,
project_id=str(current_study.uuid),
service_key=node.key,
Expand All @@ -553,12 +552,11 @@ async def _wait_for_dynamic_services_to_be_running(
dynamic_services_urls[service_uuid] = dynamic_service_url

await assert_all_services_running(
director_v2_client, director_v0_url, workbench=workbench_dynamic_services
director_v2_client, workbench=workbench_dynamic_services
)

await assert_services_reply_200(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
workbench=workbench_dynamic_services,
)

Expand Down Expand Up @@ -693,7 +691,6 @@ async def _assert_retrieve_completed(
) -> None:
await assert_retrieve_service(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
service_uuid=service_uuid,
)

Expand Down Expand Up @@ -794,7 +791,6 @@ async def test_nodeports_integration(
`docker` for both dynamic services
7. finally check that all states for both dynamic services match
"""
director_v0_url = get_director_v0_patched_url(services_endpoint["director"])

# STEP 1

Expand All @@ -804,7 +800,7 @@ async def test_nodeports_integration(
str, str
] = await _wait_for_dynamic_services_to_be_running(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
director_v0_url=services_endpoint["director"],
user_id=user_db["id"],
workbench_dynamic_services=workbench_dynamic_services,
current_study=current_study,
Expand Down Expand Up @@ -866,14 +862,14 @@ async def test_nodeports_integration(

await _assert_retrieve_completed(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
director_v0_url=services_endpoint["director"],
service_uuid=services_node_uuids.dy,
dynamic_services_urls=dynamic_services_urls,
)

await _assert_retrieve_completed(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
director_v0_url=services_endpoint["director"],
service_uuid=services_node_uuids.dy_compose_spec,
dynamic_services_urls=dynamic_services_urls,
)
Expand Down Expand Up @@ -911,7 +907,6 @@ async def test_nodeports_integration(
*(
assert_stop_service(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
service_uuid=service_uuid,
)
for service_uuid in workbench_dynamic_services
Expand Down Expand Up @@ -940,7 +935,7 @@ async def test_nodeports_integration(

await _wait_for_dynamic_services_to_be_running(
director_v2_client=director_v2_client,
director_v0_url=director_v0_url,
director_v0_url=services_endpoint["director"],
user_id=user_db["id"],
workbench_dynamic_services=workbench_dynamic_services,
current_study=current_study,
Expand Down
Loading