Skip to content

Commit

Permalink
♻️✨adding nodeports support to dynamic-sidecar (#2509)
Browse files Browse the repository at this point in the history
* there seems to be an issue elsewere

* fixed issue when booting

* enabling debug boot

* injecting pahts to all containers

* all declared volumes are removed as well

* fixing dy-sidecar unit tests

* dv2 added volume creation and removal for dy sidecar

* forarding pgsettings to dybamic-sidecar

* setting refactoring

* volumes are properly mounted on the spec

* adding additional setup elements

* starting direcotry watcher

* adding module to properly manage volumes

* fixed some tests

* created new directory

* using correct password

* better error message

* codestyle

* extended tests for the API

* refactor renames

* attached nodeports pulling

* adding permissin changes

* semplified interface for saving and restoring the service state

* using new api interface

* is no longer set to fail

* refactored

* raising error now

* fixing tests

* added data_manager integration

* added tests for mounted_fs

* added missing env var and refactor

* fixed API call to save and restore state

* pylint

* fixing tests

* fix mounted_fs and volumes generated

* refactored volume resolve and mounting

* added missing env paths

* fixed an issue with initialization

* fixed issues with permission changing

* added more comments

* rename refactor

* changed debug message

* minor refactor

* added utility to run on threadppols

* using not blocking calls

* adds API for nodeports push and pull

* fix codestyle

* updated opeanpi specs for new entrypoint

* updated webserver ospeanpi spec

* always return size in bytes of the trasnferred data

* updated openapi.json

* exposed retrieve to frontend

* ensures clean shutdown in call cases

* split nodeports from state saving

* fixing codestyle

* refactor

* codeclimate

* codestyle

* fixed version parsing

* Feature/dynamic retrieve (#8)

* minor

* refactoring

* bumped version

* bumped service version

* composing correct url

* version bumped

* revert change

* computeServiceV2RetrieveUrl (#9)

* computeServiceV2RetrieveUrl

* minor

* updated openaipi specs

* updated requirements

* added new webserver entrypoint

* added retrieve api to director-v2

* fixed director-v2 tests

* fix bug

* inverted oder

* sending in the correct format

* faster booting of dy-sidecar

* removed todos

* update policy timeout

* fixed depenencies after merge

* using appropriate images for testing

* pylint

* storage is now connected for this test

* fix status url

* fixed port forwarding issues

* fixed broken test

* removed dangerous code

* updated openapi.json

* minor refactor

* further refactoring

* refactor

* further refactoring

* shared functions refactored

* refactor

* added save_to option to datamanager.pull

* refactor

* added dependencies for testing

* aded nodeports integration test

* removed unsued plugin

* trying to fix test in CI

* refactor API interface

* updated API inteface after change

* moving to fixure

* added more information

* using nodeports execption

* rename function

* mocked api removed, no requests are fowarded here

* refactor

* renaming function

* updated docstring

* updated description

* update comment

* fix comment

* updated _meta.py

* revert to old version

* adding watchdog

* added missing requirement

* added missing requirement

* removed test uncertanty

* enhanced test slightly

* fixed broken endpoint

* adding some debug messages

* updated openapi specification

* expadning fixture to support multiple versions

* added better description

* typing

* removed comment

* added timeouts

* using ByteSize

* renamed

* renmed save_state to can_save

* reverting change

* more renaming

* renaming restore

* renamed endpoints

* updated openapi.json

* @sanderegg fixing in all Dockerfiles

* migrated to dataclass

* better debug messages

* fixed docstring

* [skip ci] no ci when pushing

* making it easier to debug

* trying to give nodeports more time

* put back missing parameter

* putting back option

* restored missing API

* fixed typing

* replaced with 3.8 version

* typo

* using statues from starlette

* making it run in development

* fixed import issue

* only log keys

* aded status retrying

* ading retries on status and retrieve

* refactor

* renaming and refactoring

* renamed again

* removing dependency bump

* reverting

* added minio

* added extra comments

* added more assersions

* added more debug prints

* adding more infomation in logs

* more explicit debug message

* added sleep to make sure data is available

* adding more information to make it easier to debug

* tring to trigger CI again

* reafactoring

* refactor

* used for testing, leaving it in

* added new container logs dump for simpler debugging

* dumping logs twice

* adding logs debug

* updated return type

* logs from containers on fail

* using version with more debug options

* bumped expected version in workbench

* better error message and explaniation

* even more debug information

* put back original check

* added separators

* removed space

* trigger CI again

* bumping timeout

* prining data for all containers

* fix issue

* bumping dy_static_file_server version

* updated versions in workbench

* fixing python-lining CI

* reverted changes

* debug help

* addresses an issue with stopping

* fix error

* replaced logging. with logger.

* adding more debug messages

* adding healthcheck to traefik

* enabling logs from traefik

* adding more logging

* adding more logs

* raising attempts 15 minute timeout

* adding better debug messages

* changed the dafults to something else

* added logging when data is not found to be upoaded

* updating logs on function

* fixes a race condition when saving ports in parallel

* reverting changes

* changed endpoint to make it more readable

* removed client-sdk references

* migrated nodeports implementaiton

* archiving folders is now done in parallel

* fix import issue

* some missing types

* typing

* fix pylint

* checking how much the test lasts

* fixing timeout for test

* added faster bailout

* making bigger case start first

* adding debug flags to detect hangs

* reduced timeout to a reasonable value

* added port forward for proxy to test

* making tests more reliable

* extended logging

* pylint

Co-authored-by: Andrei Neagu <[email protected]>
Co-authored-by: Odei Maiz <[email protected]>
  • Loading branch information
3 people authored Oct 27, 2021
1 parent 5ef4cd9 commit c6246c3
Show file tree
Hide file tree
Showing 96 changed files with 5,236 additions and 1,319 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,7 @@ jobs:
path: codeclimate.${{ github.job }}_coverage.json

integration-test-director-v2-02:
timeout-minutes: 20 # if this timeout gets too small, then split the tests
timeout-minutes: 30 # if this timeout gets too small, then split the tests
name: "[int] director-v2 02"
needs: [build-test-images]
runs-on: ${{ matrix.os }}
Expand Down
49 changes: 49 additions & 0 deletions api/specs/webserver/openapi-projects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,55 @@ paths:
default:
$ref: "./openapi.yaml#/components/responses/DefaultErrorResponse"

/projects/{project_id}/nodes/{node_id}/retrieve:
parameters:
- name: project_id
in: path
required: true
schema:
type: string
- name: node_id
in: path
required: true
schema:
type: string

post:
tags:
- project
description: Triggers service retrieve
operationId: post_retrieve
requestBody:
required: true
content:
application/json:
schema:
type: object
properties:
port_keys:
description: list of por keys to be retrieved
type: array
items:
type: string
responses:
"200":
description: Returns the amount of transferred bytes when pulling data via nodeports
content:
application/json:
schema:
type: object
properties:
data:
type: object
description: response payload
properties:
size_bytes:
type: integer
description: amount of transferred bytes

default:
$ref: "#/components/responses/DefaultErrorResponse"

/projects/{study_uuid}/tags/{tag_id}:
parameters:
- name: tag_id
Expand Down
3 changes: 3 additions & 0 deletions api/specs/webserver/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ paths:
/projects/{project_id}/nodes/{node_id}:
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}"

/projects/{project_id}/nodes/{node_id}:retrieve:
$ref: "./openapi-projects.yaml#/paths/~1projects~1{project_id}~1nodes~1{node_id}~1retrieve"

/nodes/{nodeInstanceUUID}/outputUi/{outputKey}:
$ref: "./openapi-node-v0.0.1.yaml#/paths/~1nodes~1{nodeInstanceUUID}~1outputUi~1{outputKey}"

Expand Down
1 change: 1 addition & 0 deletions ci/github/integration-testing/director-v2.bash
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ test() {
echo "testing in services/director-v2/tests/integration/$1"
pytest --cov=simcore_service_director_v2 --durations=10 --cov-append \
--color=yes --cov-report=term-missing --cov-report=xml --cov-config=.coveragerc \
-vvv -s --log-cli-level=DEBUG \
-v -m "not travis" "services/director-v2/tests/integration/$1" --log-level=DEBUG
}

Expand Down
16 changes: 9 additions & 7 deletions packages/pytest-simcore/src/pytest_simcore/docker_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,52 +230,54 @@ def jupyter_service(docker_registry: str, node_meta_schema: Dict) -> Dict[str, s
)


DY_STATIC_FILE_SERVER_VERSION = "1.0.5"
@pytest.fixture(scope="session", params=["2.0.2"])
def dy_static_file_server_version(request):
return request.param


@pytest.fixture(scope="session")
def dy_static_file_server_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)


@pytest.fixture(scope="session")
def dy_static_file_server_dynamic_sidecar_compose_spec_service(
docker_registry: str, node_meta_schema: Dict
docker_registry: str, node_meta_schema: Dict, dy_static_file_server_version: str
) -> Dict[str, str]:
"""
Adds the below service in docker registry
itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec
"""
return _pull_push_service(
"itisfoundation/dy-static-file-server-dynamic-sidecar-compose-spec",
DY_STATIC_FILE_SERVER_VERSION,
dy_static_file_server_version,
docker_registry,
node_meta_schema,
)
33 changes: 25 additions & 8 deletions packages/service-library/src/servicelib/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@
import logging
from collections import deque
from functools import wraps
from typing import Dict, List, Optional
from typing import TYPE_CHECKING, Any, Callable, Deque, Dict, List, Optional

import attr

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
Queue = asyncio.Queue
else:

class FakeGenericMeta(type):
def __getitem__(self, item):
return self

class Queue(
asyncio.Queue, metaclass=FakeGenericMeta
): # pylint: disable=function-redefined
pass


@attr.s(auto_attribs=True)
class Context:
Expand All @@ -30,7 +43,9 @@ async def stop_sequential_workers() -> None:
logger.info("All run_sequentially_in_context pending workers stopped")


def run_sequentially_in_context(target_args: List[str] = None):
def run_sequentially_in_context(
target_args: List[str] = None,
) -> Callable[[Any], Any]:
"""All request to function with same calling context will be run sequentially.
Example:
Expand Down Expand Up @@ -68,15 +83,17 @@ async def func(param1, param2, param3):
"""
target_args = [] if target_args is None else target_args

def internal(decorated_function):
def get_context(args, kwargs: Dict) -> Context:
def internal(
decorated_function: Callable[[Any], Optional[Any]]
) -> Callable[[Any], Optional[Any]]:
def get_context(args: Any, kwargs: Dict[Any, Any]) -> Context:
arg_names = decorated_function.__code__.co_varnames[
: decorated_function.__code__.co_argcount
]
search_args = dict(zip(arg_names, args))
search_args.update(kwargs)

key_parts = deque()
key_parts: Deque[str] = deque()
for arg in target_args:
sub_args = arg.split(".")
main_arg = sub_args[0]
Expand Down Expand Up @@ -108,13 +125,13 @@ def get_context(args, kwargs: Dict) -> Context:
return _sequential_jobs_contexts[key]

@wraps(decorated_function)
async def wrapper(*args, **kwargs):
async def wrapper(*args: Any, **kwargs: Any) -> Any:
context: Context = get_context(args, kwargs)

if not context.initialized:
context.initialized = True

async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
async def worker(in_q: Queue, out_q: Queue) -> None:
while True:
awaitable = await in_q.get()
in_q.task_done()
Expand All @@ -137,7 +154,7 @@ async def worker(in_q: asyncio.Queue, out_q: asyncio.Queue):
worker(context.in_queue, context.out_queue)
)

await context.in_queue.put(decorated_function(*args, **kwargs))
await context.in_queue.put(decorated_function(*args, **kwargs)) # type: ignore

wrapped_result = await context.out_queue.get()
if isinstance(wrapped_result, Exception):
Expand Down
9 changes: 9 additions & 0 deletions packages/service-library/src/servicelib/pools.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from concurrent.futures import ProcessPoolExecutor
from contextlib import contextmanager
from typing import Any, Callable

# only gets created on use and is guaranteed to be the s
# ame for the entire lifetime of the application
Expand Down Expand Up @@ -35,3 +37,10 @@ def non_blocking_process_pool_executor(**kwargs) -> ProcessPoolExecutor:
# FIXME: uncomment below line when the issue is fixed
# executor.shutdown(wait=False)
pass


async def async_on_threadpool(callable_function: Callable, *args: Any) -> Any:
"""Ensures blocking operation runs on shared thread pool"""
return await asyncio.get_event_loop().run_in_executor(
None, callable_function, *args
)
13 changes: 6 additions & 7 deletions packages/service-library/src/servicelib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import asyncio
import logging
import os

from pathlib import Path
from typing import Any, Awaitable, Coroutine, List, Optional, Union

Expand Down Expand Up @@ -79,8 +78,11 @@ def log_exception_callback(fut: asyncio.Future):

# // tasks
async def logged_gather(
*tasks, reraise: bool = True, log: logging.Logger = logger, max_concurrency: int = 0
) -> List[Any]:
*tasks: Awaitable[Any],
reraise: bool = True,
log: logging.Logger = logger,
max_concurrency: int = 0,
) -> List[Optional[Any]]:
"""
Thin wrapper around asyncio.gather that allows excuting ALL tasks concurently until the end
even if any of them fail. Finally, all errors are logged and the first raised (if reraise=True)
Expand All @@ -91,18 +93,15 @@ async def logged_gather(
use directly asyncio.gather(*tasks, return_exceptions=True).
:param reraise: reraises first exception (in order the tasks were passed) concurrent tasks, defaults to True
:type reraise: bool, optional
:param log: passing the logger gives a chance to identify the origin of the gather call, defaults to current submodule's logger
:type log: logging.Logger, optional
:return: list of tasks results and errors e.g. [1, 2, ValueError("task3 went wrong"), 33, "foo"]
:rtype: List[Any]
"""

wrapped_tasks = tasks
if max_concurrency > 0:
semaphore = asyncio.Semaphore(max_concurrency)

async def sem_task(task):
async def sem_task(task: Awaitable[Any]) -> Any:
async with semaphore:
return await task

Expand Down
7 changes: 5 additions & 2 deletions packages/service-library/tests/test_pools.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from asyncio import BaseEventLoop
from concurrent.futures import ProcessPoolExecutor


from servicelib.pools import non_blocking_process_pool_executor
from servicelib.pools import async_on_threadpool, non_blocking_process_pool_executor


def return_int_one() -> int:
Expand Down Expand Up @@ -32,3 +31,7 @@ async def test_different_pool_instances() -> None:
max_workers=1
) as first, non_blocking_process_pool_executor() as second:
assert first != second


async def test_run_on_thread_pool() -> None:
assert await async_on_threadpool(return_int_one) == 1
1 change: 1 addition & 0 deletions packages/simcore-sdk/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pytest-mock
pytest-runner
pytest-sugar
pytest-xdist
pytest-lazy-fixture

# mockups/fixtures
alembic
Expand Down
3 changes: 3 additions & 0 deletions packages/simcore-sdk/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pytest==6.2.5
# pytest-forked
# pytest-icdiff
# pytest-instafail
# pytest-lazy-fixture
# pytest-mock
# pytest-sugar
# pytest-xdist
Expand All @@ -144,6 +145,8 @@ pytest-icdiff==0.5
# via -r requirements/_test.in
pytest-instafail==0.4.2
# via -r requirements/_test.in
pytest-lazy-fixture==0.6.3
# via -r requirements/_test.in
pytest-mock==3.6.1
# via -r requirements/_test.in
pytest-runner==5.3.1
Expand Down
34 changes: 24 additions & 10 deletions packages/simcore-sdk/src/simcore_sdk/node_data/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,36 +66,50 @@ async def push(
return await _push_file(user_id, project_id, node_uuid, archive_file_path, None)


async def _pull_file(user_id: int, project_id: str, node_uuid: str, file_path: Path):
async def _pull_file(
user_id: int,
project_id: str,
node_uuid: str,
file_path: Path,
save_to: Optional[Path] = None,
):
destination_path = file_path if save_to is None else save_to
s3_object = _create_s3_object(project_id, node_uuid, file_path)
log.info("pulling data from %s to %s...", s3_object, file_path)
downloaded_file = await filemanager.download_file_from_s3(
user_id=user_id,
store_id="0",
s3_object=s3_object,
local_folder=file_path.parent,
local_folder=destination_path.parent,
)
if downloaded_file != file_path:
if file_path.exists():
file_path.unlink()
move(f"{downloaded_file}", file_path)
log.info("%s successfuly pulled", file_path)
if downloaded_file != destination_path:
destination_path.unlink(missing_ok=True)
move(f"{downloaded_file}", destination_path)
log.info("%s successfuly pulled", destination_path)


def _get_archive_name(path: Path) -> str:
return f"{path.stem}.zip"


async def pull(user_id: int, project_id: str, node_uuid: str, file_or_folder: Path):
async def pull(
user_id: int,
project_id: str,
node_uuid: str,
file_or_folder: Path,
save_to: Optional[Path] = None,
):
if file_or_folder.is_file():
return await _pull_file(user_id, project_id, node_uuid, file_or_folder)
return await _pull_file(user_id, project_id, node_uuid, file_or_folder, save_to)
# we have a folder, so we need somewhere to extract it to
with TemporaryDirectory() as tmp_dir_name:
archive_file = Path(tmp_dir_name) / _get_archive_name(file_or_folder)
await _pull_file(user_id, project_id, node_uuid, archive_file)
log.info("extracting data from %s", archive_file)

destination_folder = file_or_folder if save_to is None else save_to
await unarchive_dir(
archive_to_extract=archive_file, destination_folder=file_or_folder
archive_to_extract=archive_file, destination_folder=destination_folder
)
log.info("extraction completed")

Expand Down
Loading

0 comments on commit c6246c3

Please sign in to comment.