Skip to content

Commit

Permalink
✨ Metrics: add simcore_user_agent in service_started/service_stopped …
Browse files Browse the repository at this point in the history
…metric (#4092)
  • Loading branch information
sanderegg authored Apr 13, 2023
1 parent f2721ba commit 224cfd7
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 106 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from enum import Enum, auto
from typing import Any, Literal, Optional
from typing import Any, Literal

from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
Expand Down Expand Up @@ -85,7 +85,8 @@ class InstrumentationRabbitMessage(RabbitMessageBase, NodeMessageBase):
service_type: str
service_key: str
service_tag: str
result: Optional[RunningState] = None
result: RunningState | None = None
simcore_user_agent: str


class _RabbitAutoscalingBaseMessage(RabbitMessageBase):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from enum import Enum
from typing import Union

from aiohttp import web
from prometheus_client import Counter
Expand All @@ -26,22 +25,19 @@
kSERVICE_STARTED = f"{__name__}.services_started"
kSERVICE_STOPPED = f"{__name__}.services_stopped"

SERVICE_STARTED_LABELS: list[str] = [
"service_key",
"service_tag",
]
SERVICE_STARTED_LABELS: list[str] = ["service_key", "service_tag", "simcore_user_agent"]

SERVICE_STOPPED_LABELS: list[str] = [
"service_key",
"service_tag",
"result",
"simcore_user_agent",
]


def add_instrumentation(
app: web.Application, reg: CollectorRegistry, app_name: str
) -> None:

app[kSERVICE_STARTED] = Counter(
name="services_started_total",
documentation="Counts the services started",
Expand Down Expand Up @@ -71,10 +67,12 @@ def service_started(
app: web.Application,
service_key: str,
service_tag: str,
simcore_user_agent: str,
) -> None:
app[kSERVICE_STARTED].labels(
service_key=service_key,
service_tag=service_tag,
simcore_user_agent=simcore_user_agent,
).inc()


Expand All @@ -83,10 +81,12 @@ def service_stopped(
app: web.Application,
service_key: str,
service_tag: str,
result: Union[ServiceResult, str],
simcore_user_agent: str,
result: ServiceResult | str,
) -> None:
app[kSERVICE_STOPPED].labels(
service_key=service_key,
service_tag=service_tag,
simcore_user_agent=simcore_user_agent,
result=result.name if isinstance(result, ServiceResult) else result,
).inc()
29 changes: 18 additions & 11 deletions packages/service-library/src/servicelib/aiohttp/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import logging
import time
from typing import Awaitable, Callable, Final, Optional
from typing import Awaitable, Callable

import prometheus_client
from aiohttp import web
Expand All @@ -21,7 +21,10 @@
from prometheus_client.registry import CollectorRegistry
from servicelib.aiohttp.typing_extension import Handler

from ..common_headers import X_SIMCORE_USER_AGENT
from ..common_headers import (
UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
X_SIMCORE_USER_AGENT,
)
from ..logging_utils import log_catch

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -113,8 +116,6 @@
kPLATFORM_COLLECTOR = f"{__name__}.collector_platform"
kGC_COLLECTOR = f"{__name__}.collector_gc"

UNDEFINED_REGULAR_USER_AGENT: Final[str] = "undefined"


def get_collector_registry(app: web.Application) -> CollectorRegistry:
return app[kCOLLECTOR_REGISTRY]
Expand All @@ -138,8 +139,8 @@ async def metrics_handler(request: web.Request):

def middleware_factory(
app_name: str,
enter_middleware_cb: Optional[EnterMiddlewareCB],
exit_middleware_cb: Optional[ExitMiddlewareCB],
enter_middleware_cb: EnterMiddlewareCB | None,
exit_middleware_cb: ExitMiddlewareCB | None,
):
@web.middleware
async def middleware_handler(request: web.Request, handler: Handler):
Expand Down Expand Up @@ -168,12 +169,16 @@ async def middleware_handler(request: web.Request, handler: Handler):
app_name,
request.method,
canonical_endpoint,
request.headers.get(X_SIMCORE_USER_AGENT, UNDEFINED_REGULAR_USER_AGENT),
request.headers.get(
X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
),
).track_inprogress(), response_summary.labels(
app_name,
request.method,
canonical_endpoint,
request.headers.get(X_SIMCORE_USER_AGENT, UNDEFINED_REGULAR_USER_AGENT),
request.headers.get(
X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
),
).time():
resp = await handler(request)

Expand Down Expand Up @@ -209,7 +214,9 @@ async def middleware_handler(request: web.Request, handler: Handler):
request.method,
canonical_endpoint,
resp.status,
request.headers.get(X_SIMCORE_USER_AGENT, UNDEFINED_REGULAR_USER_AGENT),
request.headers.get(
X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
),
).inc()

if exit_middleware_cb:
Expand Down Expand Up @@ -242,8 +249,8 @@ def setup_monitoring(
app: web.Application,
app_name: str,
*,
enter_middleware_cb: Optional[EnterMiddlewareCB] = None,
exit_middleware_cb: Optional[ExitMiddlewareCB] = None,
enter_middleware_cb: EnterMiddlewareCB | None = None,
exit_middleware_cb: ExitMiddlewareCB | None = None,
**app_info_kwargs,
):
# app-scope registry
Expand Down
1 change: 1 addition & 0 deletions packages/service-library/src/servicelib/common_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
X_DYNAMIC_SIDECAR_REQUEST_SCHEME: Final[str] = "X-Dynamic-Sidecar-Request-Scheme"
X_FORWARDED_PROTO: Final[str] = "X-Forwarded-Proto"
X_SIMCORE_USER_AGENT: Final[str] = "X-Simcore-User-Agent"
UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE: Final[str] = "undefined"
11 changes: 7 additions & 4 deletions packages/service-library/tests/aiohttp/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from aiohttp.test_utils import TestClient
from faker import Faker
from prometheus_client.parser import text_string_to_metric_families
from servicelib.aiohttp.monitoring import UNDEFINED_REGULAR_USER_AGENT, setup_monitoring
from servicelib.common_headers import X_SIMCORE_USER_AGENT
from servicelib.aiohttp.monitoring import setup_monitoring
from servicelib.common_headers import (
UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
X_SIMCORE_USER_AGENT,
)


@pytest.fixture
Expand Down Expand Up @@ -93,7 +96,7 @@ async def test_setup_monitoring(client: TestClient):
"endpoint": "/monitored_request",
"http_status": "200",
"method": "GET",
"simcore_user_agent": UNDEFINED_REGULAR_USER_AGENT,
"simcore_user_agent": UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
},
value=NUM_CALLS,
)
Expand All @@ -107,7 +110,7 @@ async def test_setup_monitoring(client: TestClient):
"endpoint": "/metrics",
"http_status": "200",
"method": "GET",
"simcore_user_agent": UNDEFINED_REGULAR_USER_AGENT,
"simcore_user_agent": UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
},
value=1,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ProgressRabbitMessageNode,
)
from models_library.users import UserID
from servicelib.common_headers import UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE
from simcore_postgres_database.models.comp_tasks import NodeClass
from simcore_service_director_v2.core.errors import TaskSchedulingError

Expand Down Expand Up @@ -198,6 +199,7 @@ async def _process_task_result(
service_key=service_key,
service_tag=service_version,
result=task_final_state,
simcore_user_agent=UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
)
await self.rabbitmq_client.publish(message.channel_name, message.json())

Expand Down Expand Up @@ -225,6 +227,7 @@ async def _task_state_change_handler(self, event: str) -> None:
service_type=NodeClass.COMPUTATIONAL.value,
service_key=service_key,
service_tag=service_version,
simcore_user_agent=UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE,
)
await self.rabbitmq_client.publish(message.channel_name, message.json())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
service_type=NodeClass.INTERACTIVE.value,
service_key=scheduler_data.key,
service_tag=scheduler_data.version,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
)
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
await rabbitmq_client.publish(message.channel_name, message.json())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ async def _remove_containers_save_state_and_outputs() -> None:
service_type=NodeClass.INTERACTIVE.value,
service_key=scheduler_data.key,
service_tag=scheduler_data.version,
simcore_user_agent=scheduler_data.request_simcore_user_agent,
)
rabbitmq_client: RabbitMQClient = app.state.rabbitmq_client
await rabbitmq_client.publish(message.channel_name, message.json())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import logging
from contextlib import AsyncExitStack
from functools import partial
from typing import Optional

from aiohttp import web
from models_library.projects import ProjectID
Expand Down Expand Up @@ -40,11 +39,10 @@
log = logging.getLogger(__name__)


@log_decorator(logger=log)
async def list_dynamic_services(
app: web.Application,
user_id: Optional[PositiveInt] = None,
project_id: Optional[str] = None,
user_id: PositiveInt | None = None,
project_id: str | None = None,
) -> list[DataType]:
params = {}
if user_id:
Expand Down Expand Up @@ -80,7 +78,6 @@ async def get_dynamic_service(app: web.Application, node_uuid: str) -> DataType:
return service_state


@log_decorator(logger=log)
async def run_dynamic_service(
*,
app: web.Application,
Expand All @@ -92,7 +89,7 @@ async def run_dynamic_service(
service_uuid: str,
request_dns: str,
request_scheme: str,
request_simcore_user_agent: str,
simcore_user_agent: str,
service_resources: ServiceResourcesDict,
) -> DataType:
"""
Expand All @@ -116,7 +113,7 @@ async def run_dynamic_service(
headers = {
X_DYNAMIC_SIDECAR_REQUEST_DNS: request_dns,
X_DYNAMIC_SIDECAR_REQUEST_SCHEME: request_scheme,
X_SIMCORE_USER_AGENT: request_simcore_user_agent,
X_SIMCORE_USER_AGENT: simcore_user_agent,
}

settings: DirectorV2Settings = get_plugin_settings(app)
Expand All @@ -133,18 +130,21 @@ async def run_dynamic_service(
return started_service


@log_decorator(logger=log)
async def stop_dynamic_service(
app: web.Application,
service_uuid: NodeIDStr,
simcore_user_agent: str,
save_state: bool = True,
progress: Optional[ProgressBarData] = None,
progress: ProgressBarData | None = None,
) -> None:
"""
Stopping a service can take a lot of time
bumping the stop command timeout to 1 hour
this will allow to sava bigger datasets from the services
"""
headers = {
X_SIMCORE_USER_AGENT: simcore_user_agent,
}
settings: DirectorV2Settings = get_plugin_settings(app)

async with AsyncExitStack() as stack:
Expand All @@ -157,6 +157,7 @@ async def stop_dynamic_service(
url=(settings.base_url / f"dynamic_services/{service_uuid}").update_query(
can_save="true" if save_state else "false",
),
headers=headers,
expected_status=web.HTTPNoContent,
timeout=settings.DIRECTOR_V2_STOP_SERVICE_TIMEOUT,
on_error={
Expand All @@ -176,7 +177,7 @@ async def _post_progress_message(
) -> None:
progress_message = ProgressRabbitMessageProject(
user_id=user_id,
project_id=project_id,
project_id=ProjectID(project_id),
progress_type=ProgressType.PROJECT_CLOSING,
progress=progress_value,
)
Expand All @@ -186,11 +187,11 @@ async def _post_progress_message(
)


@log_decorator(logger=log)
async def stop_dynamic_services_in_project(
app: web.Application,
user_id: PositiveInt,
project_id: str,
simcore_user_agent: str,
save_state: bool = True,
) -> None:
"""Stops all dynamic services of either project_id or user_id in concurrently"""
Expand All @@ -215,6 +216,7 @@ async def stop_dynamic_services_in_project(
stop_dynamic_service(
app=app,
service_uuid=service["service_uuid"],
simcore_user_agent=simcore_user_agent,
save_state=save_state,
progress=progress_bar.sub_progress(1),
)
Expand Down
Loading

0 comments on commit 224cfd7

Please sign in to comment.