Skip to content

Commit

Permalink
✨ Enhancement/allow node port to get set value (#2605)
Browse files Browse the repository at this point in the history
* allow getting/setting value link without downloading/uploading directly
* allow for using codestyle
  • Loading branch information
sanderegg authored Oct 26, 2021
1 parent 50b5eb2 commit b014be7
Show file tree
Hide file tree
Showing 18 changed files with 320 additions and 107 deletions.
29 changes: 11 additions & 18 deletions packages/models-library/src/models_library/settings/rabbit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import warnings
from typing import Dict, Optional
from typing import Dict

from pydantic import BaseSettings, Extra, validator
from pydantic import BaseSettings, Extra
from pydantic.networks import AnyUrl
from pydantic.types import PositiveInt, SecretStr

Expand All @@ -25,28 +25,21 @@ class RabbitConfig(BaseSettings):
user: str = "simcore"
password: SecretStr = SecretStr("simcore")

dsn: Optional[RabbitDsn] = None

# channels
channels: Dict[str, str] = {
"log": "comp.backend.channels.log",
"instrumentation": "comp.backend.channels.instrumentation",
}

@validator("dsn", pre=True)
@classmethod
def autofill_dsn(cls, v, values):
if not v and all(
key in values for key in cls.__fields__ if key not in ["dsn", "channels"]
):
return RabbitDsn.build(
scheme="amqp",
user=values["user"],
password=values["password"].get_secret_value(),
host=values["host"],
port=f"{values['port']}",
)
return v
@property
def dsn(self) -> str:
return RabbitDsn.build(
scheme="amqp",
user=self.user,
password=self.password.get_secret_value(),
host=self.host,
port=f"{self.port}",
)

class Config:
env_prefix = "RABBIT_"
Expand Down
21 changes: 10 additions & 11 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import json
import logging
import socket
from typing import Any, Dict, Optional, Tuple
from typing import Any, Dict, Iterable, Optional, Tuple

import aio_pika
import pytest
import tenacity
from models_library.settings.rabbit import RabbitConfig
from tenacity.before_sleep import before_sleep_log
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from .helpers.utils_docker import get_service_published_port

Expand All @@ -31,16 +34,12 @@ async def rabbit_config(
) -> RabbitConfig:
prefix = testing_environ_vars["SWARM_STACK_NAME"]
assert f"{prefix}_rabbit" in docker_stack["services"]

rabbit_config = RabbitConfig(
user=testing_environ_vars["RABBIT_USER"],
password=testing_environ_vars["RABBIT_PASSWORD"],
host="127.0.0.1",
port=get_service_published_port("rabbit", testing_environ_vars["RABBIT_PORT"]),
channels={
"log": "logs_channel",
"instrumentation": "instrumentation_channel",
},
channels=json.loads(testing_environ_vars["RABBIT_CHANNELS"]),
)

url = rabbit_config.dsn
Expand All @@ -57,7 +56,7 @@ async def rabbit_service(rabbit_config: RabbitConfig, monkeypatch) -> RabbitConf
monkeypatch.setenv("RABBIT_PASSWORD", rabbit_config.password.get_secret_value())
monkeypatch.setenv("RABBIT_CHANNELS", json.dumps(rabbit_config.channels))

return RabbitConfig
return rabbit_config


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -126,7 +125,7 @@ async def rabbit_exchange(
async def rabbit_queue(
rabbit_channel: aio_pika.Channel,
rabbit_exchange: Tuple[aio_pika.Exchange, aio_pika.Exchange],
) -> aio_pika.Queue:
) -> Iterable[aio_pika.Queue]:
(logs_exchange, instrumentation_exchange) = rabbit_exchange
# declare queue
queue = await rabbit_channel.declare_queue(exclusive=True)
Expand All @@ -141,9 +140,9 @@ async def rabbit_queue(


@tenacity.retry(
wait=tenacity.wait_fixed(5),
stop=tenacity.stop_after_attempt(60),
before_sleep=tenacity.before_sleep_log(log, logging.INFO),
wait=wait_fixed(5),
stop=stop_after_attempt(60),
before_sleep=before_sleep_log(log, logging.INFO),
reraise=True,
)
async def wait_till_rabbit_responsive(url: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# pylint:disable=redefined-outer-name
import os
from copy import deepcopy
from typing import Dict
from typing import Dict, Iterable

import aiohttp
import pytest
Expand All @@ -16,7 +16,7 @@


@pytest.fixture(scope="module")
def storage_endpoint(docker_stack: Dict, testing_environ_vars: Dict) -> URL:
def storage_endpoint(docker_stack: Dict, testing_environ_vars: Dict) -> Iterable[URL]:
prefix = testing_environ_vars["SWARM_STACK_NAME"]
assert f"{prefix}_storage" in docker_stack["services"]

Expand All @@ -39,7 +39,7 @@ async def storage_service(
) -> URL:
await wait_till_storage_responsive(storage_endpoint)

yield storage_endpoint
return storage_endpoint


# HELPERS --
Expand Down
13 changes: 9 additions & 4 deletions packages/pytest-simcore/src/pytest_simcore/websocket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# pylint:disable=unused-argument
# pylint:disable=redefined-outer-name

from typing import Callable, Optional
from typing import AsyncIterable, Awaitable, Callable, Iterable, Optional
from uuid import uuid4

import pytest
Expand All @@ -23,7 +23,7 @@ def create() -> str:


@pytest.fixture()
def socketio_url_factory(client) -> Callable:
def socketio_url_factory(client) -> Iterable[Callable[[Optional[TestClient]], str]]:
def create_url(client_override: Optional[TestClient] = None) -> str:
SOCKET_IO_PATH = "/socket.io/"
return str((client_override or client).make_url(SOCKET_IO_PATH))
Expand All @@ -32,7 +32,9 @@ def create_url(client_override: Optional[TestClient] = None) -> str:


@pytest.fixture()
async def security_cookie_factory(client: TestClient) -> Callable:
async def security_cookie_factory(
client: TestClient,
) -> AsyncIterable[Callable[[Optional[TestClient]], Awaitable[str]]]:
async def creator(client_override: Optional[TestClient] = None) -> str:
# get the cookie by calling the root entrypoint
resp = await (client_override or client).get("/v0/")
Expand All @@ -55,7 +57,9 @@ async def socketio_client_factory(
socketio_url_factory: Callable,
security_cookie_factory: Callable,
client_session_id_factory: Callable,
) -> Callable[[str, Optional[TestClient]], socketio.AsyncClient]:
) -> AsyncIterable[
Callable[[Optional[str], Optional[TestClient]], Awaitable[socketio.AsyncClient]]
]:
clients = []

async def connect(
Expand All @@ -67,6 +71,7 @@ async def connect(

sio = socketio.AsyncClient(ssl_verify=False)
# enginio 3.10.0 introduced ssl verification
assert client_session_id
url = str(
URL(socketio_url_factory(client)).with_query(
{"client_session_id": client_session_id}
Expand Down
4 changes: 2 additions & 2 deletions packages/service-library/src/servicelib/archiving_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def _full_file_path_from_dir_and_subdirs(dir_path: Path) -> Iterator[Path]:


def _strip_directory_from_path(input_path: Path, to_strip: Path) -> Path:
to_strip = f"{str(to_strip)}/"
return Path(str(input_path).replace(to_strip, ""))
_to_strip = f"{str(to_strip)}/"
return Path(str(input_path).replace(_to_strip, ""))


def _read_in_chunks(file_object, chunk_size=1024 * 8):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from functools import cached_property
from typing import Any


class MixinLoggingSettings:
@classmethod
def validate_log_level(cls, value) -> str:
def validate_log_level(cls, value: Any) -> str:
try:
getattr(logging, value.upper())
except AttributeError as err:
Expand Down
1 change: 1 addition & 0 deletions packages/simcore-sdk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# by sanderegg, pcrespov
#
include ../../scripts/common.Makefile
include ../../scripts/common-package.Makefile


.PHONY: install-dev install-prod install-ci
Expand Down
5 changes: 3 additions & 2 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

from models_library.projects_nodes_io import UUID_REGEX, BaseFileLink, DownloadLink
from models_library.projects_nodes_io import PortLink as BasePortLink
from pydantic import Extra, Field, StrictBool, StrictFloat, StrictInt, StrictStr
from pydantic import AnyUrl, Extra, Field, StrictBool, StrictFloat, StrictInt, StrictStr


class PortLink(BasePortLink):
node_uuid: str = Field(..., regex=UUID_REGEX, alias="nodeUuid")


class FileLink(BaseFileLink):
""" allow all kind of file links """
"""allow all kind of file links"""

class Config:
extra = Extra.allow
Expand All @@ -22,5 +22,6 @@ class Config:
]

ItemConcreteValue = Union[int, float, bool, str, Path]
ItemValue = Union[int, float, bool, str, AnyUrl]

__all__ = ["FileLink", "DownloadLink", "PortLink", "DataItemValue", "ItemConcreteValue"]
15 changes: 12 additions & 3 deletions packages/simcore-sdk/src/simcore_sdk/node_ports_v2/nodeports_v2.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from collections import deque
from pathlib import Path
from typing import Any, Callable, Coroutine, Dict, Type
from typing import Any, Callable, Coroutine, Dict, Optional, Type

from pydantic import BaseModel, Field
from servicelib.utils import logged_gather

from ..node_ports_common.dbmanager import DBManager
from ..node_ports_common.exceptions import PortNotFound, UnboundPortError
from .links import ItemConcreteValue
from .links import ItemConcreteValue, ItemValue
from .port_utils import is_file_type
from .ports_mapping import InputsList, OutputsList

Expand Down Expand Up @@ -55,7 +55,16 @@ async def outputs(self) -> OutputsList:
await self._auto_update_from_db()
return self.internal_outputs

async def get(self, item_key: str) -> ItemConcreteValue:
async def get_value_link(self, item_key: str) -> Optional[ItemValue]:
try:
return await (await self.inputs)[item_key].get_value()
except UnboundPortError:
# not available try outputs
pass
# if this fails it will raise an exception
return await (await self.outputs)[item_key].get_value()

async def get(self, item_key: str) -> Optional[ItemConcreteValue]:
try:
return await (await self.inputs)[item_key].get()
except UnboundPortError:
Expand Down
Loading

0 comments on commit b014be7

Please sign in to comment.