From 8027d86a13c1ff2889409572244844eb572d99e2 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Tue, 16 May 2023 08:53:24 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8FSidecars:=20Image=20pulling?= =?UTF-8?q?=20is=20now=20a=20debug=20log,=20try=20to=20guess=20log=20level?= =?UTF-8?q?=20of=20services=20(#4232)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../container_tasks/events.py | 14 +- .../tests/container_tasks/test_events.py | 7 +- .../src/models_library/rabbitmq_messages.py | 9 +- .../src/servicelib/logging_utils.py | 19 ++- .../tests/test_logging_utils.py | 37 ++++- .../src/settings_library/docker_registry.py | 11 +- .../computational_sidecar/constants.py | 19 +++ .../computational_sidecar/core.py | 16 ++- .../computational_sidecar/docker_utils.py | 128 ++++++++++-------- .../dask_utils.py | 16 ++- .../file_utils.py | 59 ++++---- .../tests/unit/test_dask_utils.py | 5 +- .../tests/unit/test_docker_utils.py | 114 ++++++---------- .../core/docker_compose_utils.py | 5 +- .../core/docker_logs.py | 12 +- .../core/docker_utils.py | 28 ++-- .../core/rabbitmq.py | 20 +-- .../modules/long_running_tasks.py | 2 +- .../tests/unit/test_core_docker_utils.py | 14 +- .../unit/test_modules_outputs_manager.py | 4 +- 20 files changed, 327 insertions(+), 212 deletions(-) create mode 100644 services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py index 17a752be644..8ea228653e5 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py @@ -1,6 +1,6 @@ import logging from abc import ABC, abstractmethod -from typing import Union +from typing import TypeAlias, Union from distributed.worker import get_worker from pydantic import BaseModel, Extra, NonNegativeFloat @@ -45,17 +45,21 @@ class Config(BaseTaskEvent.Config): } +LogMessageStr: TypeAlias = str +LogLevelInt: TypeAlias = int + + class TaskLogEvent(BaseTaskEvent): - log: str - log_level: int = logging.INFO + log: LogMessageStr + log_level: LogLevelInt @staticmethod def topic_name() -> str: return "task_logs" @classmethod - def from_dask_worker(cls, log: str) -> "TaskLogEvent": - return cls(job_id=get_worker().get_current_task(), log=log) + def from_dask_worker(cls, log: str, log_level: LogLevelInt) -> "TaskLogEvent": + return cls(job_id=get_worker().get_current_task(), log=log, log_level=log_level) class Config(BaseTaskEvent.Config): schema_extra = { diff --git a/packages/dask-task-models-library/tests/container_tasks/test_events.py b/packages/dask-task-models-library/tests/container_tasks/test_events.py index e68d6b2a8d0..55c5bb1d1bf 100644 --- a/packages/dask-task-models-library/tests/container_tasks/test_events.py +++ b/packages/dask-task-models-library/tests/container_tasks/test_events.py @@ -5,6 +5,8 @@ # pylint:disable=protected-access # pylint:disable=too-many-arguments +import logging + import pytest from dask_task_models_library.container_tasks.events import ( BaseTaskEvent, @@ -51,7 +53,10 @@ def test_task_progress_from_worker(mocked_dask_worker_job_id: str): def test_task_log_from_worker(mocked_dask_worker_job_id: str): - event = TaskLogEvent.from_dask_worker(log="here is the amazing logs") + event = TaskLogEvent.from_dask_worker( + log="here is the amazing logs", log_level=logging.INFO + ) assert event.job_id == mocked_dask_worker_job_id assert event.log == "here is the amazing logs" + assert event.log_level == logging.INFO diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index b33f1276f2c..930e9178714 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -1,7 +1,7 @@ import logging from abc import abstractmethod from enum import Enum, auto -from typing import Any, Literal +from typing import Any, Literal, TypeAlias from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID @@ -11,6 +11,9 @@ from pydantic import BaseModel, Field from pydantic.types import NonNegativeFloat +LogLevelInt: TypeAlias = int +LogMessageStr: TypeAlias = str + class RabbitEventMessageType(str, Enum): RELOAD_IFRAME = "RELOAD_IFRAME" @@ -47,8 +50,8 @@ class NodeMessageBase(ProjectMessageBase): class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase): channel_name: Literal["simcore.services.logs.v2"] = "simcore.services.logs.v2" - messages: list[str] - log_level: int = logging.INFO + messages: list[LogMessageStr] + log_level: LogLevelInt = logging.INFO def routing_key(self) -> str: return f"{self.project_id}.{self.log_level}" diff --git a/packages/service-library/src/servicelib/logging_utils.py b/packages/service-library/src/servicelib/logging_utils.py index 1d79e130538..77886ca52db 100644 --- a/packages/service-library/src/servicelib/logging_utils.py +++ b/packages/service-library/src/servicelib/logging_utils.py @@ -12,7 +12,7 @@ from asyncio import iscoroutinefunction from contextlib import contextmanager from inspect import getframeinfo, stack -from typing import Callable, TypedDict +from typing import Callable, TypeAlias, TypedDict log = logging.getLogger(__name__) @@ -247,3 +247,20 @@ def get_log_record_extra(*, user_id: int | str | None = None) -> LogExtra | None assert int(user_id) > 0 # nosec extra["log_uid"] = f"{user_id}" return extra or None + + +LogLevelInt: TypeAlias = int +LogMessageStr: TypeAlias = str + + +def guess_message_log_level(message: str) -> LogLevelInt: + lower_case_message = message.lower().strip() + if lower_case_message.startswith( + ("error:", "err:", "error ", "err ", "[error]", "[err]") + ): + return logging.ERROR + if lower_case_message.startswith( + ("warning:", "warn:", "warning ", "warn ", "[warning]", "[warn]") + ): + return logging.WARNING + return logging.INFO diff --git a/packages/service-library/tests/test_logging_utils.py b/packages/service-library/tests/test_logging_utils.py index c8899e4dc19..3337544d600 100644 --- a/packages/service-library/tests/test_logging_utils.py +++ b/packages/service-library/tests/test_logging_utils.py @@ -2,11 +2,15 @@ import logging from threading import Thread -from typing import Optional import pytest from pytest import LogCaptureFixture -from servicelib.logging_utils import log_decorator +from servicelib.logging_utils import ( + LogLevelInt, + LogMessageStr, + guess_message_log_level, + log_decorator, +) from servicelib.utils import logged_gather logger = logging.getLogger(__name__) @@ -15,7 +19,7 @@ @pytest.mark.parametrize("logger", [None, logger]) @pytest.mark.parametrize("log_traceback", [True, False]) async def test_error_regression_async_def( - caplog: LogCaptureFixture, logger: Optional[logging.Logger], log_traceback: bool + caplog: LogCaptureFixture, logger: logging.Logger | None, log_traceback: bool ): @log_decorator(logger, log_traceback=log_traceback) async def _raising_error() -> None: @@ -34,7 +38,7 @@ async def _raising_error() -> None: @pytest.mark.parametrize("logger", [None, logger]) @pytest.mark.parametrize("log_traceback", [True, False]) async def test_error_regression_def( - caplog: LogCaptureFixture, logger: Optional[logging.Logger], log_traceback: bool + caplog: LogCaptureFixture, logger: logging.Logger | None, log_traceback: bool ): @log_decorator(logger, log_traceback=log_traceback) def _raising_error() -> None: @@ -50,3 +54,28 @@ def _raising_error() -> None: assert "Traceback" in caplog.text else: assert "Traceback" not in caplog.text + + +@pytest.mark.parametrize( + "message, expected_log_level", + [ + ("", logging.INFO), + ("Error: this is an error", logging.ERROR), + ("[Error] this is an error", logging.ERROR), + ("[Error]: this is an error", logging.ERROR), + ("[Err] this is an error", logging.ERROR), + ("[Err]: this is an error", logging.ERROR), + ("Err: this is an error", logging.ERROR), + ("Warning: this is an warning", logging.WARNING), + ("[Warning] this is an warning", logging.WARNING), + ("[Warning]: this is an warning", logging.WARNING), + ("[Warn] this is an warning", logging.WARNING), + ("[Warn]: this is an warning", logging.WARNING), + ("Warn: this is an warning", logging.WARNING), + ("Not a Warn: this is an warning", logging.INFO), + ], +) +def test_guess_message_log_level( + message: LogMessageStr, expected_log_level: LogLevelInt +): + assert guess_message_log_level(message) == expected_log_level diff --git a/packages/settings-library/src/settings_library/docker_registry.py b/packages/settings-library/src/settings_library/docker_registry.py index c79ecaac0d6..a0ef89de664 100644 --- a/packages/settings-library/src/settings_library/docker_registry.py +++ b/packages/settings-library/src/settings_library/docker_registry.py @@ -1,5 +1,4 @@ from functools import cached_property -from typing import Optional from pydantic import Field, SecretStr, validator @@ -7,13 +6,13 @@ class RegistrySettings(BaseCustomSettings): - REGISTRY_AUTH: bool = Field(..., description="do registry authentication") - REGISTRY_PATH: Optional[str] = Field( - None, description="development mode only, in case a local registry is used" + REGISTRY_PATH: str | None = Field( + default=None, + description="development mode only, in case a local registry is used", ) # NOTE: name is missleading, http or https protocol are not included - REGISTRY_URL: str = Field("", description="address to the docker registry") + REGISTRY_URL: str = Field(default="", description="address to the docker registry") REGISTRY_USER: str = Field( ..., description="username to access the docker registry" @@ -25,7 +24,7 @@ class RegistrySettings(BaseCustomSettings): @validator("REGISTRY_PATH", pre=True) @classmethod - def escape_none_string(cls, v) -> Optional[str]: + def escape_none_string(cls, v) -> str | None: return None if v == "None" else v @cached_property diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py new file mode 100644 index 00000000000..fa2a269e1c7 --- /dev/null +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/constants.py @@ -0,0 +1,19 @@ +import re +from typing import Final + +LEGACY_SERVICE_LOG_FILE_NAME: Final[str] = "log.dat" +PARSE_LOG_INTERVAL_S: Final[float] = 0.5 + +DOCKER_LOG_REGEXP: re.Pattern[str] = re.compile( + r"^(?P(?:(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})?)" + r"\s(?P.*)$" +) +PROGRESS_REGEXP: re.Pattern[str] = re.compile( + r"^(?:\[?progress\]?:?)?\s*" + r"(?P[0-1]?\.\d+|" + r"\d+\s*(?:(?P%)|" + r"\d+\s*" + r"(?Ppercent))|" + r"\[?(?P\d+\/\d+)\]?" + r"|0|1)" +) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index 608ab611fad..d5fb2b4270b 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -1,5 +1,6 @@ import asyncio import json +import logging import os import socket from dataclasses import dataclass @@ -23,6 +24,7 @@ from packaging import version from pydantic import ValidationError from pydantic.networks import AnyUrl +from servicelib.logging_utils import LogLevelInt, LogMessageStr from settings_library.s3 import S3Settings from yarl import URL @@ -155,12 +157,14 @@ async def _retrieve_output_data( exc=exc, ) from exc - async def _publish_sidecar_log(self, log: str) -> None: + async def _publish_sidecar_log( + self, log: LogMessageStr, log_level: LogLevelInt = logging.INFO + ) -> None: publish_event( self.task_publishers.logs, - TaskLogEvent.from_dask_worker(log=f"[sidecar] {log}"), + TaskLogEvent.from_dask_worker(log=f"[sidecar] {log}", log_level=log_level), ) - logger.info(log) + logger.log(log_level, log) async def run(self, command: list[str]) -> TaskOutputData: await self._publish_sidecar_log( @@ -253,7 +257,9 @@ async def __aexit__( tb: TracebackType | None, ) -> None: if exc: - await self._publish_sidecar_log(f"Task error:\n{exc}") await self._publish_sidecar_log( - "There might be more information in the service log file" + f"Task error:\n{exc}", log_level=logging.ERROR + ) + await self._publish_sidecar_log( + "TIP: There might be more information in the service log file in the service outputs", ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py index a72a14adf41..eec3b978e14 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py @@ -14,7 +14,6 @@ Awaitable, Callable, Coroutine, - Final, cast, ) @@ -31,12 +30,24 @@ from pydantic import ByteSize from pydantic.networks import AnyUrl from servicelib.docker_utils import to_datetime -from servicelib.logging_utils import log_catch, log_context +from servicelib.logging_utils import ( + LogLevelInt, + LogMessageStr, + guess_message_log_level, + log_catch, + log_context, +) from settings_library.s3 import S3Settings from ..dask_utils import LogType, create_dask_worker_logger, publish_task_logs from ..file_utils import push_file_to_remote from ..settings import Settings +from .constants import ( + DOCKER_LOG_REGEXP, + LEGACY_SERVICE_LOG_FILE_NAME, + PARSE_LOG_INTERVAL_S, + PROGRESS_REGEXP, +) from .models import ( LEGACY_INTEGRATION_VERSION, ContainerHostConfig, @@ -45,7 +56,7 @@ from .task_shared_volume import TaskSharedVolumes logger = create_dask_worker_logger(__name__) -LogPublishingCB = Callable[[str], Awaitable[None]] +LogPublishingCB = Callable[[LogMessageStr, LogLevelInt], Awaitable[None]] async def create_container_config( @@ -128,57 +139,58 @@ async def managed_container( raise -_DOCKER_LOG_REGEXP: re.Pattern[str] = re.compile( - r"^(?P(?:(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})?)" - r"\s(?P.*)$" -) -_PROGRESS_REGEXP: re.Pattern[str] = re.compile( - r"^(?:\[?progress\]?:?)?\s*" - r"(?P[0-1]?\.\d+|" - r"\d+\s*(?:(?P%)|" - r"\d+\s*" - r"(?Ppercent))|" - r"\[?(?P\d+\/\d+)\]?" - r"|0|1)" -) +def _guess_progress_value(progress_match: re.Match[str]) -> float: + value: float = 0.0 + try: + # can be anything from "23 percent", 23%, 23/234, 0.0-1.0 + progress_str = progress_match.group("value") + if progress_match.group("percent_sign"): + # this is of the 23% kind + value = float(progress_str.split("%")[0].strip()) / 100.0 + elif progress_match.group("percent_explicit"): + # this is of the 23 percent kind + value = float(progress_str.split("percent")[0].strip()) / 100.0 + elif progress_match.group("fraction"): + # this is of the 23/123 kind + nums = progress_match.group("fraction").strip().split("/") + value = float(nums[0].strip()) / float(nums[1].strip()) + else: + # this is of the 0.0-1.0 kind + value = float(progress_str.strip()) + except ValueError: + logger.exception("Could not extract progress from log line %s", progress_match) + return value -async def _parse_line(line: str) -> tuple[LogType, datetime.datetime, str]: - match = re.search(_DOCKER_LOG_REGEXP, line) +async def _parse_line( + line: str, +) -> tuple[LogType, datetime.datetime, LogMessageStr, LogLevelInt]: + match = re.search(DOCKER_LOG_REGEXP, line) if not match: # try to correct the log, it might be coming from an old comp service that does not put timestamps corrected_line = f"{arrow.utcnow().datetime.isoformat()} {line}" - match = re.search(_DOCKER_LOG_REGEXP, corrected_line) + match = re.search(DOCKER_LOG_REGEXP, corrected_line) if not match: # default return as log - return (LogType.LOG, arrow.utcnow().datetime, f"{line}") + return ( + LogType.LOG, + arrow.utcnow().datetime, + f"{line}", + guess_message_log_level(line), + ) - log_type = LogType.LOG timestamp = to_datetime(match.group("timestamp")) log = f"{match.group('log')}" # now look for progress - match = re.search(_PROGRESS_REGEXP, log.lower()) - if match: - try: - # can be anything from "23 percent", 23%, 23/234, 0.0-1.0 - progress = match.group("value") - log_type = LogType.PROGRESS - if match.group("percent_sign"): - # this is of the 23% kind - log = f"{float(progress.split('%')[0].strip()) / 100.0:.2f}" - elif match.group("percent_explicit"): - # this is of the 23 percent kind - log = f"{float(progress.split('percent')[0].strip()) / 100.0:.2f}" - elif match.group("fraction"): - # this is of the 23/123 kind - nums = match.group("fraction").strip().split("/") - log = f"{float(nums[0].strip()) / float(nums[1].strip()):.2f}" - else: - # this is of the 0.0-1.0 kind - log = f"{float(progress.strip()):.2f}" - except ValueError: - logger.exception("Could not extract progress from log line %s", line) - return (log_type, timestamp, log) + if match := re.search(PROGRESS_REGEXP, log.lower()): + return ( + LogType.PROGRESS, + timestamp, + f"{_guess_progress_value(match):.2f}", + logging.INFO, + ) + + return (LogType.LOG, timestamp, log, guess_message_log_level(log)) async def _publish_container_logs( @@ -189,7 +201,8 @@ async def _publish_container_logs( progress_pub: Pub, logs_pub: Pub, log_type: LogType, - message: str, + message: LogMessageStr, + log_level: LogLevelInt, ) -> None: return publish_task_logs( progress_pub, @@ -197,13 +210,10 @@ async def _publish_container_logs( log_type, message_prefix=f"{service_key}:{service_version} - {container.id}{container_name}", message=message, + log_level=log_level, ) -LEGACY_SERVICE_LOG_FILE_NAME: Final[str] = "log.dat" -PARSE_LOG_INTERVAL_S: Final[float] = 0.5 - - async def _parse_container_log_file( container: DockerContainer, service_key: str, @@ -223,7 +233,7 @@ async def _parse_container_log_file( logger.debug("monitoring legacy-style container log file: opened %s", log_file) while (await container.show())["State"]["Running"]: if line := await file_pointer.readline(): - log_type, _, message = await _parse_line(line) + log_type, _, message, log_level = await _parse_line(line) await _publish_container_logs( service_key=service_key, service_version=service_version, @@ -233,12 +243,13 @@ async def _parse_container_log_file( logs_pub=logs_pub, log_type=log_type, message=message, + log_level=log_level, ) await asyncio.sleep(PARSE_LOG_INTERVAL_S) # finish reading the logs if possible async for line in file_pointer: - log_type, _, message = await _parse_line(line) + log_type, _, message, log_level = await _parse_line(line) await _publish_container_logs( service_key=service_key, service_version=service_version, @@ -248,6 +259,7 @@ async def _parse_container_log_file( logs_pub=logs_pub, log_type=log_type, message=message, + log_level=log_level, ) logger.debug( "monitoring legacy-style container log file: completed reading of %s", @@ -299,7 +311,9 @@ async def _parse_container_docker_logs( container.log(stdout=True, stderr=True, follow=True, timestamps=True), ): await log_fp.write(log_line.encode("utf-8")) - log_type, latest_log_timestamp, message = await _parse_line(log_line) + log_type, latest_log_timestamp, message, log_level = await _parse_line( + log_line + ) await _publish_container_logs( service_key=service_key, service_version=service_version, @@ -309,6 +323,7 @@ async def _parse_container_docker_logs( logs_pub=logs_pub, log_type=log_type, message=message, + log_level=log_level, ) logger.debug( @@ -330,7 +345,9 @@ async def _parse_container_docker_logs( ) for log_line in missing_logs: await log_fp.write(log_line.encode("utf-8")) - log_type, latest_log_timestamp, message = await _parse_line(log_line) + log_type, latest_log_timestamp, message, log_level = await _parse_line( + log_line + ) await _publish_container_logs( service_key=service_key, service_version=service_version, @@ -340,6 +357,7 @@ async def _parse_container_docker_logs( logs_pub=logs_pub, log_type=log_type, message=message, + log_level=log_level, ) logger.debug( @@ -492,10 +510,12 @@ async def pull_image( }, ): await log_publishing_cb( - f"Pulling {service_key}:{service_version}: {pull_progress}..." + f"Pulling {service_key}:{service_version}: {pull_progress}...", + logging.DEBUG, ) await log_publishing_cb( - f"Docker image for {service_key}:{service_version} ready on {socket.gethostname()}." + f"Docker image for {service_key}:{service_version} ready on {socket.gethostname()}.", + logging.INFO, ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py index 554fd89ed92..5f4e3186c76 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py @@ -15,6 +15,7 @@ from dask_task_models_library.container_tasks.io import TaskCancelEventName from distributed.worker import get_worker from distributed.worker_state_machine import TaskState +from servicelib.logging_utils import LogLevelInt, LogMessageStr def create_dask_worker_logger(name: str) -> logging.Logger: @@ -88,7 +89,9 @@ async def cancel_task(task_name: str) -> None: ): publish_event( log_publisher, - TaskLogEvent.from_dask_worker(log="[sidecar] cancelling task..."), + TaskLogEvent.from_dask_worker( + log="[sidecar] cancelling task...", log_level=logging.INFO + ), ) logger.debug("cancelling %s....................", f"{task=}") task.cancel() @@ -110,7 +113,9 @@ async def periodicaly_check_if_aborted(task_name: str) -> None: except asyncio.CancelledError as exc: publish_event( log_publisher, - TaskLogEvent.from_dask_worker(log="[sidecar] task run was aborted"), + TaskLogEvent.from_dask_worker( + log="[sidecar] task run was aborted", log_level=logging.INFO + ), ) raise TaskCancelledError from exc finally: @@ -139,7 +144,8 @@ def publish_task_logs( logs_pub: distributed.Pub, log_type: LogType, message_prefix: str, - message: str, + message: LogMessageStr, + log_level: LogLevelInt, ) -> None: logger.info("[%s - %s]: %s", message_prefix, log_type.name, message) if log_type == LogType.PROGRESS: @@ -148,4 +154,6 @@ def publish_task_logs( TaskProgressEvent.from_dask_worker(progress=float(message)), ) else: - publish_event(logs_pub, TaskLogEvent.from_dask_worker(log=message)) + publish_event( + logs_pub, TaskLogEvent.from_dask_worker(log=message, log_level=log_level) + ) diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py index 50e766846d7..206dfbd3eef 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/file_utils.py @@ -1,17 +1,19 @@ import asyncio import functools +import logging import mimetypes import time import zipfile from io import BytesIO from pathlib import Path -from typing import Any, Awaitable, Callable, Final, Optional, TypedDict, Union, cast +from typing import Any, Awaitable, Callable, Final, TypedDict, cast import aiofiles import aiofiles.tempfile import fsspec from pydantic import ByteSize, FileUrl, parse_obj_as from pydantic.networks import AnyUrl +from servicelib.logging_utils import LogLevelInt, LogMessageStr from settings_library.s3 import S3Settings from yarl import URL @@ -23,7 +25,7 @@ S3_FILE_SYSTEM_SCHEMES: Final = ["s3", "s3a"] -LogPublishingCB = Callable[[str], Awaitable[None]] +LogPublishingCB = Callable[[LogMessageStr, LogLevelInt], Awaitable[None]] def _file_progress_cb( @@ -38,7 +40,8 @@ def _file_progress_cb( log_publishing_cb( f"{text_prefix}" f" {100.0 * float(value or 0)/float(size or 1):.1f}%" - f" ({ByteSize(value).human_readable() if value else 0} / {ByteSize(size).human_readable() if size else 'NaN'})" + f" ({ByteSize(value).human_readable() if value else 0} / {ByteSize(size).human_readable() if size else 'NaN'})", + logging.DEBUG, ), main_loop, ) @@ -54,7 +57,7 @@ class ClientKWArgsDict(TypedDict): class S3FsSettingsDict(TypedDict): key: str secret: str - token: Optional[str] + token: str | None use_ssl: bool client_kwargs: ClientKWArgsDict @@ -85,8 +88,8 @@ async def _copy_file( *, log_publishing_cb: LogPublishingCB, text_prefix: str, - src_storage_cfg: Optional[dict[str, Any]] = None, - dst_storage_cfg: Optional[dict[str, Any]] = None, + src_storage_cfg: dict[str, Any] | None = None, + dst_storage_cfg: dict[str, Any] | None = None, ): src_storage_kwargs = src_storage_cfg or {} dst_storage_kwargs = dst_storage_cfg or {} @@ -109,7 +112,8 @@ async def _copy_file( f"{text_prefix}" f" {100.0 * float(total_data_written or 0)/float(file_size or 1):.1f}%" f" ({ByteSize(total_data_written).human_readable() if total_data_written else 0} / {ByteSize(file_size).human_readable() if file_size else 'NaN'})" - f" [{ByteSize(total_data_written).to('MB')/elapsed_time:.2f} MBytes/s (avg)]" + f" [{ByteSize(total_data_written).to('MB')/elapsed_time:.2f} MBytes/s (avg)]", + logging.DEBUG, ) @@ -118,14 +122,15 @@ async def _copy_file( async def pull_file_from_remote( src_url: AnyUrl, - target_mime_type: Optional[str], + target_mime_type: str | None, dst_path: Path, log_publishing_cb: LogPublishingCB, - s3_settings: Optional[S3Settings], + s3_settings: S3Settings | None, ) -> None: assert src_url.path # nosec await log_publishing_cb( - f"Downloading '{src_url.path.strip('/')}' into local file '{dst_path.name}'..." + f"Downloading '{src_url.path.strip('/')}' into local file '{dst_path.name}'...", + logging.INFO, ) if not dst_path.parent.exists(): raise ValueError( @@ -136,7 +141,7 @@ async def pull_file_from_remote( if not target_mime_type: target_mime_type, _ = mimetypes.guess_type(dst_path) - storage_kwargs: Union[S3FsSettingsDict, dict[str, Any]] = {} + storage_kwargs: S3FsSettingsDict | dict[str, Any] = {} if s3_settings and src_url.scheme in S3_FILE_SYSTEM_SCHEMES: storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings) await _copy_file( @@ -148,24 +153,27 @@ async def pull_file_from_remote( ) await log_publishing_cb( - f"Download of '{src_url.path.strip('/')}' into local file '{dst_path.name}' complete." + f"Download of '{src_url.path.strip('/')}' into local file '{dst_path.name}' complete.", + logging.INFO, ) if src_mime_type == _ZIP_MIME_TYPE and target_mime_type != _ZIP_MIME_TYPE: - await log_publishing_cb(f"Uncompressing '{dst_path.name}'...") + await log_publishing_cb(f"Uncompressing '{dst_path.name}'...", logging.INFO) logger.debug("%s is a zip file and will be now uncompressed", dst_path) with zipfile.ZipFile(dst_path, "r") as zip_obj: await asyncio.get_event_loop().run_in_executor( None, zip_obj.extractall, dst_path.parents[0] ) # finally remove the zip archive - await log_publishing_cb(f"Uncompressing '{dst_path.name}' complete.") + await log_publishing_cb( + f"Uncompressing '{dst_path.name}' complete.", logging.INFO + ) dst_path.unlink() async def _push_file_to_http_link( file_to_upload: Path, dst_url: AnyUrl, log_publishing_cb: LogPublishingCB -): +) -> None: # NOTE: special case for http scheme when uploading. this is typically a S3 put presigned link. # Therefore, we need to use the http filesystem directly in order to call the put_file function. # writing on httpfilesystem is disabled by default. @@ -198,12 +206,12 @@ async def _push_file_to_remote( file_to_upload: Path, dst_url: AnyUrl, log_publishing_cb: LogPublishingCB, - s3_settings: Optional[S3Settings], -): + s3_settings: S3Settings | None, +) -> None: logger.debug("Uploading %s to %s...", file_to_upload, dst_url) assert dst_url.path # nosec - storage_kwargs: Union[S3FsSettingsDict, dict[str, Any]] = {} + storage_kwargs: S3FsSettingsDict | dict[str, Any] = {} if s3_settings: storage_kwargs = _s3fs_settings_from_s3_settings(s3_settings) @@ -223,7 +231,7 @@ async def push_file_to_remote( src_path: Path, dst_url: AnyUrl, log_publishing_cb: LogPublishingCB, - s3_settings: Optional[S3Settings], + s3_settings: S3Settings | None, ) -> None: if not src_path.exists(): raise ValueError(f"{src_path=} does not exist") @@ -237,7 +245,8 @@ async def push_file_to_remote( if dst_mime_type == _ZIP_MIME_TYPE and src_mime_type != _ZIP_MIME_TYPE: archive_file_path = Path(tmp_dir) / Path(URL(dst_url).path).name await log_publishing_cb( - f"Compressing '{src_path.name}' to '{archive_file_path.name}'..." + f"Compressing '{src_path.name}' to '{archive_file_path.name}'...", + logging.INFO, ) with zipfile.ZipFile( archive_file_path, mode="w", compression=zipfile.ZIP_DEFLATED @@ -249,10 +258,13 @@ async def push_file_to_remote( assert archive_file_path.exists() # nosec file_to_upload = archive_file_path await log_publishing_cb( - f"Compression of '{src_path.name}' to '{archive_file_path.name}' complete." + f"Compression of '{src_path.name}' to '{archive_file_path.name}' complete.", + logging.INFO, ) - await log_publishing_cb(f"Uploading '{file_to_upload.name}' to '{dst_url}'...") + await log_publishing_cb( + f"Uploading '{file_to_upload.name}' to '{dst_url}'...", logging.INFO + ) if dst_url.scheme in HTTP_FILE_SYSTEM_SCHEMES: logger.debug("destination is a http presigned link") @@ -263,5 +275,6 @@ async def push_file_to_remote( ) await log_publishing_cb( - f"Upload of '{src_path.name}' to '{dst_url.path.strip('/')}' complete" + f"Upload of '{src_path.name}' to '{dst_url.path.strip('/')}' complete", + logging.INFO, ) diff --git a/services/dask-sidecar/tests/unit/test_dask_utils.py b/services/dask-sidecar/tests/unit/test_dask_utils.py index b75e5366a50..0a52a650d34 100644 --- a/services/dask-sidecar/tests/unit/test_dask_utils.py +++ b/services/dask-sidecar/tests/unit/test_dask_utils.py @@ -6,6 +6,7 @@ import asyncio import concurrent.futures +import logging import time from typing import Any @@ -46,7 +47,9 @@ async def test_publish_event(dask_client: distributed.Client): assert dask_pub.subscribers print("we do have subscribers!") - event_to_publish = TaskLogEvent(job_id="some_fake_job_id", log="the log") + event_to_publish = TaskLogEvent( + job_id="some_fake_job_id", log="the log", log_level=logging.INFO + ) publish_event(dask_pub=dask_pub, event=event_to_publish) # NOTE: this tests runs a sync dask client, # and the CI seems to have sometimes difficulties having this run in a reasonable time diff --git a/services/dask-sidecar/tests/unit/test_docker_utils.py b/services/dask-sidecar/tests/unit/test_docker_utils.py index 091c010012b..6732c6a0c78 100644 --- a/services/dask-sidecar/tests/unit/test_docker_utils.py +++ b/services/dask-sidecar/tests/unit/test_docker_utils.py @@ -4,6 +4,7 @@ # pylint: disable=no-member import asyncio +import logging from typing import Any from unittest.mock import call @@ -12,6 +13,7 @@ import pytest from models_library.services_resources import BootMode from pytest_mock.plugin import MockerFixture +from servicelib.logging_utils import LogLevelInt, LogMessageStr from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( LogType, _parse_line, @@ -101,99 +103,64 @@ async def test_create_container_config( ids=lambda id: f"version{'>=1' if id is True else '0'}-logs", ) @pytest.mark.parametrize( - "log_line, expected_log_type, expected_message", + "log_line, expected_log_type, expected_message, expected_log_level", [ - ( - "hello from the logs", - LogType.LOG, - "hello from the logs", - ), + ("hello from the logs", LogType.LOG, "hello from the logs", logging.INFO), ( "[progress] this is some whatever progress without number", LogType.LOG, "[progress] this is some whatever progress without number", + logging.INFO, ), - ("[Progress] 34%", LogType.PROGRESS, "0.34"), - ( - "[PROGRESS] .34", - LogType.PROGRESS, - "0.34", - ), - ( - "[progress] 0.44", - LogType.PROGRESS, - "0.44", - ), - ( - "[progress] 44 percent done", - LogType.PROGRESS, - "0.44", - ), - ( - "[progress] 44/150", - LogType.PROGRESS, - f"{(44.0 / 150.0):.2f}", - ), + ("[Progress] 34%", LogType.PROGRESS, "0.34", logging.INFO), + ("[PROGRESS] .34", LogType.PROGRESS, "0.34", logging.INFO), + ("[progress] 0.44", LogType.PROGRESS, "0.44", logging.INFO), + ("[progress] 44 percent done", LogType.PROGRESS, "0.44", logging.INFO), + ("[progress] 44/150", LogType.PROGRESS, f"{(44.0 / 150.0):.2f}", logging.INFO), ( "Progress: this is some progress", LogType.LOG, "Progress: this is some progress", + logging.INFO, ), - ("progress: 34%", LogType.PROGRESS, "0.34"), ( - "PROGRESS: .34", + "progress: 34%", LogType.PROGRESS, "0.34", + logging.INFO, ), - ( - "progress: 0.44", - LogType.PROGRESS, - "0.44", - ), - ( - "progress: 44 percent done", - LogType.PROGRESS, - "0.44", - ), - ( - "44 percent done", - LogType.PROGRESS, - "0.44", - ), - ( - "progress: 44/150", - LogType.PROGRESS, - f"{(44.0/150.0):.2f}", - ), - ( - "progress: 44/150...", - LogType.PROGRESS, - f"{(44.0/150.0):.2f}", - ), + ("PROGRESS: .34", LogType.PROGRESS, "0.34", logging.INFO), + ("progress: 0.44", LogType.PROGRESS, "0.44", logging.INFO), + ("progress: 44 percent done", LogType.PROGRESS, "0.44", logging.INFO), + ("44 percent done", LogType.PROGRESS, "0.44", logging.INFO), + ("progress: 44/150", LogType.PROGRESS, f"{(44.0/150.0):.2f}", logging.INFO), + ("progress: 44/150...", LogType.PROGRESS, f"{(44.0/150.0):.2f}", logging.INFO), ( "any kind of message even with progress inside", LogType.LOG, "any kind of message even with progress inside", + logging.INFO, ), + ("[PROGRESS]1.000000\n", LogType.PROGRESS, "1.00", logging.INFO), + ("[PROGRESS] 1\n", LogType.PROGRESS, "1.00", logging.INFO), + ("[PROGRESS] 0\n", LogType.PROGRESS, "0.00", logging.INFO), ( - "[PROGRESS]1.000000\n", - LogType.PROGRESS, - "1.00", - ), - ( - "[PROGRESS] 1\n", + "[PROGRESS]: 1% [ 10 / 624 ] Time Update, estimated remaining time 1 seconds @ 26.43 MCells/s", LogType.PROGRESS, - "1.00", + "0.01", + logging.INFO, ), ( - "[PROGRESS] 0\n", - LogType.PROGRESS, - "0.00", + "[warn]: this is some warning", + LogType.LOG, + "[warn]: this is some warning", + logging.WARNING, ), ( - "[PROGRESS]: 1% [ 10 / 624 ] Time Update, estimated remaining time 1 seconds @ 26.43 MCells/s", - LogType.PROGRESS, - "0.01", + "err: this is some error", + LogType.LOG, + "err: this is some error", + logging.ERROR, ), ], ) @@ -201,7 +168,8 @@ async def test_parse_line( version1_logs: bool, log_line: str, expected_log_type: LogType, - expected_message: str, + expected_message: LogMessageStr, + expected_log_level: LogLevelInt, ): expected_time_stamp = arrow.utcnow().datetime if version1_logs: @@ -209,11 +177,15 @@ async def test_parse_line( # version 0 does not contain a timestamp and is added at parsing time log_line = f"{expected_time_stamp.isoformat()} {log_line}" - received_log_type, received_time_stamp, received_message = await _parse_line( - log_line - ) + ( + received_log_type, + received_time_stamp, + received_message, + received_log_level, + ) = await _parse_line(log_line) assert received_log_type == expected_log_type assert received_message == expected_message + assert received_log_level == expected_log_level if version1_logs: assert received_time_stamp == expected_time_stamp else: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py index 206c4f85502..fb84fca8253 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py @@ -11,6 +11,7 @@ from fastapi import FastAPI from models_library.rabbitmq_messages import ProgressType from servicelib.async_utils import run_sequentially_in_context +from servicelib.logging_utils import LogLevelInt, LogMessageStr from settings_library.basic_types import LogLevel from simcore_service_dynamic_sidecar.core.rabbitmq import ( post_progress_message, @@ -109,8 +110,8 @@ async def _progress_cb(current: int, total: int) -> None: float(current / (total or 1)), ) - async def _log_cb(msg: str) -> None: - await post_sidecar_log_message(app, msg, log_level=logging.INFO) + async def _log_cb(msg: LogMessageStr, log_level: LogLevelInt) -> None: + await post_sidecar_log_message(app, msg, log_level=log_level) await pull_images(list_of_images, registry_settings, _progress_cb, _log_cb) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py index 933a07eea2c..74363b1927f 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py @@ -9,9 +9,10 @@ import logging from asyncio import CancelledError, Task, create_task from contextlib import suppress -from typing import Any, Callable, Coroutine, cast +from typing import Any, AsyncGenerator, Callable, Coroutine, cast from fastapi import FastAPI +from servicelib.logging_utils import guess_message_log_level from ..core.rabbitmq import post_log_message from .docker_utils import docker_client @@ -34,7 +35,10 @@ async def _logs_fetcher_worker( image_name = container_inspect["Config"]["Image"].split("/")[-1] logger.debug("Streaming logs from %s, image %s", container_name, image_name) - async for line in container.log(stdout=True, stderr=True, follow=True): + async for line in cast( + AsyncGenerator[str, None], + container.log(stdout=True, stderr=True, follow=True), + ): await dispatch_log(image_name=image_name, message=line) @@ -46,7 +50,9 @@ def __init__(self, app: FastAPI) -> None: async def _dispatch_logs(self, image_name: str, message: str) -> None: await post_log_message( - self._app, f"[{image_name}] {message}", log_level=logging.INFO + self._app, + f"[{image_name}] {message}", + log_level=guess_message_log_level(message), ) async def start_log_feching(self, container_name: str) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py index 3f874e1d551..914d3bdf882 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_utils.py @@ -2,14 +2,15 @@ import logging from contextlib import asynccontextmanager from enum import Enum -from typing import Any, AsyncGenerator, Awaitable, Callable, Final, Optional, TypedDict +from typing import Any, AsyncGenerator, Awaitable, Callable, Final, TypedDict import aiodocker import yaml from aiodocker.utils import clean_filters from models_library.basic_regex import DOCKER_GENERIC_TAG_KEY_RE from models_library.services import RunID -from pydantic import PositiveInt +from pydantic import PositiveInt, parse_obj_as +from servicelib.logging_utils import log_catch from settings_library.docker_registry import RegistrySettings from .errors import UnexpectedDockerError, VolumeNotFoundError @@ -69,7 +70,8 @@ def get_docker_service_images(compose_spec_yaml: str) -> set[str]: ProgressCB = Callable[[int, int], Awaitable[None]] -LogCB = Callable[[str], Awaitable[None]] +LogLevel = int +LogCB = Callable[[str, LogLevel], Awaitable[None]] async def pull_images( @@ -137,7 +139,7 @@ def _parse_docker_pull_progress( # {'status': 'Digest: sha256:27cb6e6ccef575a4698b66f5de06c7ecd61589132d5a91d098f7f3f9285415a9'} # {'status': 'Status: Downloaded newer image for ubuntu:latest'} - status: Optional[str] = docker_pull_progress.get("status") + status: str | None = docker_pull_progress.get("status") if status in list(_TargetPullStatus): assert "id" in docker_pull_progress # nosec @@ -232,10 +234,14 @@ async def _pull_image_with_progress( if registry_host else None, ): - if _parse_docker_pull_progress( - pull_progress, all_image_pulling_data[image_name] - ): - total_current, total_total = _compute_sizes(all_image_pulling_data) - await progress_cb(total_current, total_total) - - await log_cb(f"pulling {shorter_image_name}: {pull_progress}...") + with log_catch(logger, reraise=False): + if _parse_docker_pull_progress( + parse_obj_as(_DockerProgressDict, pull_progress), + all_image_pulling_data[image_name], + ): + total_current, total_total = _compute_sizes(all_image_pulling_data) + await progress_cb(total_current, total_total) + + await log_cb( + f"pulling {shorter_image_name}: {pull_progress}...", logging.DEBUG + ) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index 637007bee8a..b3072ef3777 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -12,27 +12,29 @@ RabbitMessageBase, ) from pydantic import NonNegativeFloat -from servicelib.logging_utils import log_catch, log_context +from servicelib.logging_utils import LogLevelInt, LogMessageStr, log_catch, log_context from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive from ..core.settings import ApplicationSettings -log = logging.getLogger(__file__) +_logger = logging.getLogger(__file__) async def _post_rabbit_message(app: FastAPI, message: RabbitMessageBase) -> None: - with log_catch(log, reraise=False): + with log_catch(_logger, reraise=False): await get_rabbitmq_client(app).publish(message.channel_name, message) -async def post_log_message(app: FastAPI, logs: str, *, log_level: int) -> None: +async def post_log_message( + app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt +) -> None: app_settings: ApplicationSettings = app.state.settings message = LoggerRabbitMessage( node_id=app_settings.DY_SIDECAR_NODE_ID, user_id=app_settings.DY_SIDECAR_USER_ID, project_id=app_settings.DY_SIDECAR_PROJECT_ID, - messages=[logs], + messages=[log], log_level=log_level, ) @@ -53,8 +55,10 @@ async def post_progress_message( await _post_rabbit_message(app, message) -async def post_sidecar_log_message(app: FastAPI, logs: str, *, log_level: int) -> None: - await post_log_message(app, f"[sidecar] {logs}", log_level=log_level) +async def post_sidecar_log_message( + app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt +) -> None: + await post_log_message(app, f"[sidecar] {log}", log_level=log_level) async def post_event_reload_iframe(app: FastAPI) -> None: @@ -74,7 +78,7 @@ async def on_startup() -> None: assert app_settings.RABBIT_SETTINGS # nosec settings = app_settings.RABBIT_SETTINGS await wait_till_rabbitmq_responsive(settings.dsn) - with log_context(log, logging.INFO, msg="Create RabbitMQClient"): + with log_context(_logger, logging.INFO, msg="Create RabbitMQClient"): app.state.rabbitmq_client = RabbitMQClient( client_name=f"dynamic-sidecar_{app_settings.DY_SIDECAR_NODE_ID}", settings=settings, diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 9002032f045..766e40ce348 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -178,7 +178,7 @@ async def task_create_service_containers( application_health.error_message = message logger.error("Marked sidecar as unhealthy, see below for details\n:%s", message) await post_sidecar_log_message( - app, "could not start service containers", log_level=logging.INFO + app, "could not start service containers", log_level=logging.ERROR ) return shared_store.container_names diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py index 838a7b3a94d..31e7d129b49 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_utils.py @@ -8,7 +8,7 @@ import yaml from faker import Faker from models_library.services import RunID -from pydantic import PositiveInt +from pydantic import PositiveInt, SecretStr from pytest import FixtureRequest from settings_library.docker_registry import RegistrySettings from simcore_service_dynamic_sidecar.core.docker_utils import ( @@ -142,10 +142,10 @@ async def test_issue_3793_pulling_images_raises_error(): Reproduces (sometimes) https://github.com/ITISFoundation/osparc-simcore/issues/3793 """ - async def _print_progress(*args, **kwargs): + async def _print_progress(*args, **kwargs) -> None: print("progress -> ", args, kwargs) - async def _print_log(*args, **kwargs): + async def _print_log(*args, **kwargs) -> None: print("log -> ", args, kwargs) for n in range(2): @@ -162,7 +162,7 @@ async def _print_log(*args, **kwargs): registry_settings=RegistrySettings( REGISTRY_AUTH=False, REGISTRY_USER="", - REGISTRY_PW="", + REGISTRY_PW=SecretStr(""), REGISTRY_SSL=False, ), progress_cb=_print_progress, @@ -175,9 +175,9 @@ async def test_pull_image(repeat: str): async def _print_progress(current: int, total: int): print("progress ->", f"{current=}", f"{total=}") - async def _print_log(msg): + async def _print_log(msg, log_level): assert "alpine" in msg - print("log -> ", msg) + print(f"log: {log_level=}: {msg}") await pull_images( images={ @@ -186,7 +186,7 @@ async def _print_log(msg): registry_settings=RegistrySettings( REGISTRY_AUTH=False, REGISTRY_USER="", - REGISTRY_PW="", + REGISTRY_PW=SecretStr(""), REGISTRY_SSL=False, ), progress_cb=_print_progress, diff --git a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py index a9d7b2106d1..e56536af570 100644 --- a/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py +++ b/services/dynamic-sidecar/tests/unit/test_modules_outputs_manager.py @@ -397,7 +397,7 @@ async def test_regression_io_log_redirect_cb( assert inspect.getfullargspec( outputs_manager.io_log_redirect_cb.func ) == FullArgSpec( - args=["app", "logs"], + args=["app", "log"], varargs=None, varkw=None, defaults=None, @@ -406,7 +406,7 @@ async def test_regression_io_log_redirect_cb( annotations={ "return": None, "app": FastAPI, - "logs": str, + "log": str, "log_level": int, }, )