From ffd57c2932c4774741e6dc6e279505f39fb0702a Mon Sep 17 00:00:00 2001 From: qdelamea Date: Fri, 5 Jan 2024 11:03:01 +0100 Subject: [PATCH] feat: Python API add events and health checks services --- .../python/src/armonik/client/__init__.py | 17 ++++ packages/python/src/armonik/client/events.py | 86 +++++++++++++++++++ .../src/armonik/client/health_checks.py | 21 +++++ .../python/src/armonik/common/__init__.py | 43 +++++++++- packages/python/src/armonik/common/events.py | 53 ++++++++++++ packages/python/tests/conftest.py | 10 ++- packages/python/tests/test_events.py | 22 +++++ packages/python/tests/test_healthcheck.py | 18 ++++ 8 files changed, 264 insertions(+), 6 deletions(-) create mode 100644 packages/python/src/armonik/client/events.py create mode 100644 packages/python/src/armonik/client/health_checks.py create mode 100644 packages/python/src/armonik/common/events.py create mode 100644 packages/python/tests/test_events.py create mode 100644 packages/python/tests/test_healthcheck.py diff --git a/packages/python/src/armonik/client/__init__.py b/packages/python/src/armonik/client/__init__.py index 398b36ca2..cfef686b6 100644 --- a/packages/python/src/armonik/client/__init__.py +++ b/packages/python/src/armonik/client/__init__.py @@ -4,3 +4,20 @@ from .tasks import ArmoniKTasks, TaskFieldFilter from .results import ArmoniKResults, ResultFieldFilter from .versions import ArmoniKVersions +from .events import ArmoniKEvents +from .health_checks import ArmoniKHealthChecks + +__all__ = [ + 'ArmoniKPartitions', + 'ArmoniKSessions', + 'ArmoniKSubmitter', + 'ArmoniKTasks', + 'ArmoniKResults', + "ArmoniKVersions", + "ArmoniKEvents", + "ArmoniKHealthChecks", + "TaskFieldFilter", + "PartitionFieldFilter", + "SessionFieldFilter", + "ResultFieldFilter" +] diff --git a/packages/python/src/armonik/client/events.py b/packages/python/src/armonik/client/events.py new file mode 100644 index 000000000..88695c31f --- /dev/null +++ b/packages/python/src/armonik/client/events.py @@ -0,0 +1,86 @@ +from typing import Any, Callable, cast, List + +from grpc import Channel + +from .results import ArmoniKResults +from ..common import EventTypes, Filter, NewTaskEvent, NewResultEvent, ResultOwnerUpdateEvent, ResultStatusUpdateEvent, TaskStatusUpdateEvent, ResultStatus, Event +from .results import ResultFieldFilter +from ..protogen.client.events_service_pb2_grpc import EventsStub +from ..protogen.common.events_common_pb2 import EventSubscriptionRequest, EventSubscriptionResponse +from ..protogen.common.results_filters_pb2 import Filters as rawResultFilters +from ..protogen.common.tasks_filters_pb2 import Filters as rawTaskFilters + +class ArmoniKEvents: + + _events_obj_mapping = { + "new_result": NewResultEvent, + "new_task": NewTaskEvent, + "result_owner_update": ResultOwnerUpdateEvent, + "result_status_update": ResultStatusUpdateEvent, + "task_status_update": TaskStatusUpdateEvent + } + + def __init__(self, grpc_channel: Channel): + """Events service client + + Args: + grpc_channel: gRPC channel to use + """ + self._client = EventsStub(grpc_channel) + self._results_client = ArmoniKResults(grpc_channel) + + def get_events(self, session_id: str, event_types: List[EventTypes], event_handlers: List[Callable[[str, EventTypes, Event], bool]], task_filter: Filter | None = None, result_filter: Filter | None = None) -> None: + """Get events that represents updates of result and tasks data. + + Args: + session_id: The ID of the session. + event_types: The list of the types of event to catch. + event_handlers: The list of handlers that process the events. Handlers are evaluated in he order they are provided. + An handler takes three positional arguments: the ID of the session, the type of event and the event as an object. + An handler returns a boolean, if True the process continues, otherwise the stream is closed and the service stops + listening to new events. + task_filter: A filter on tasks. + result_filter: A filter on results. + + """ + request = EventSubscriptionRequest( + session_id=session_id, + returned_events=event_types + ) + if task_filter: + request.tasks_filters=cast(rawTaskFilters, task_filter.to_disjunction().to_message()), + if result_filter: + request.results_filters=cast(rawResultFilters, result_filter.to_disjunction().to_message()), + + streaming_call = self._client.GetEvents(request) + for message in streaming_call: + event_type = message.WhichOneof("update") + if any([event_handler(session_id, EventTypes.from_string(event_type), self._events_obj_mapping[event_type].from_raw_event(getattr(message, event_type))) for event_handler in event_handlers]): + break + + def wait_for_result_availability(self, result_id: str, session_id: str) -> None: + """Wait until a result is ready i.e its status updates to COMPLETED. + + Args: + result_id: The ID of the result. + session_id: The ID of the session. + + Raises: + RuntimeError: If the result status is ABORTED. + """ + def handler(session_id, event_type, event): + if not isinstance(event, ResultStatusUpdateEvent): + raise ValueError("Handler should receive event of type 'ResultStatusUpdateEvent'.") + if event.status == ResultStatus.COMPLETED: + return False + elif event.status == ResultStatus.ABORTED: + raise RuntimeError(f"Result {result.name} with ID {result_id} is aborted.") + return True + + result = self._results_client.get_result(result_id) + if result.status == ResultStatus.COMPLETED: + return + elif result.status == ResultStatus.ABORTED: + raise RuntimeError(f"Result {result.name} with ID {result_id} is aborted.") + + self.get_events(session_id, [EventTypes.RESULT_STATUS_UPDATE], [handler], result_filter=(ResultFieldFilter.RESULT_ID == result_id)) diff --git a/packages/python/src/armonik/client/health_checks.py b/packages/python/src/armonik/client/health_checks.py new file mode 100644 index 000000000..76c04e251 --- /dev/null +++ b/packages/python/src/armonik/client/health_checks.py @@ -0,0 +1,21 @@ +from typing import cast, List, Tuple + +from grpc import Channel + +from ..common import HealthCheckStatus +from ..protogen.client.health_checks_service_pb2_grpc import HealthChecksServiceStub +from ..protogen.common.health_checks_common_pb2 import CheckHealthRequest, CheckHealthResponse + + +class ArmoniKHealthChecks: + def __init__(self, grpc_channel: Channel): + """ Result service client + + Args: + grpc_channel: gRPC channel to use + """ + self._client = HealthChecksServiceStub(grpc_channel) + + def check_health(self): + response: CheckHealthResponse = self._client.CheckHealth(CheckHealthRequest()) + return {service.name: {"message": service.message, "status": service.healthy} for service in response.services} diff --git a/packages/python/src/armonik/common/__init__.py b/packages/python/src/armonik/common/__init__.py index 5901a7262..04105d3d4 100644 --- a/packages/python/src/armonik/common/__init__.py +++ b/packages/python/src/armonik/common/__init__.py @@ -1,4 +1,41 @@ -from .helpers import datetime_to_timestamp, timestamp_to_datetime, duration_to_timedelta, timedelta_to_duration, get_task_filter +from .helpers import ( + datetime_to_timestamp, + timestamp_to_datetime, + duration_to_timedelta, + timedelta_to_duration, + get_task_filter, + batched +) from .objects import Task, TaskDefinition, TaskOptions, Output, ResultAvailability, Session, Result, Partition -from .enumwrapper import HealthCheckStatus, TaskStatus, Direction, ResultStatus, SessionStatus -from .filter import StringFilter, StatusFilter +from .enumwrapper import HealthCheckStatus, TaskStatus, Direction, SessionStatus, ResultStatus, EventTypes, ServiceHealthCheckStatus +from .events import * +from .filter import Filter, StringFilter, StatusFilter + +__all__ = [ + 'datetime_to_timestamp', + 'timestamp_to_datetime', + 'duration_to_timedelta', + 'timedelta_to_duration', + 'get_task_filter', + 'batched', + 'Task', + 'TaskDefinition', + 'TaskOptions', + 'Output', + 'ResultAvailability', + 'Session', + 'Result', + 'Partition', + 'HealthCheckStatus', + 'TaskStatus', + 'Direction', + 'SessionStatus', + 'ResultStatus', + 'EventTypes', + # Include all names from events module + # Add names from filter module + 'Filter', + 'StringFilter', + 'StatusFilter', + 'ServiceHealthCheckStatus' +] diff --git a/packages/python/src/armonik/common/events.py b/packages/python/src/armonik/common/events.py new file mode 100644 index 000000000..34acbd2c0 --- /dev/null +++ b/packages/python/src/armonik/common/events.py @@ -0,0 +1,53 @@ +from abc import ABC +from typing import List + +from dataclasses import dataclass, fields + +from .enumwrapper import TaskStatus, ResultStatus + + +class Event(ABC): + @classmethod + def from_raw_event(cls, raw_event): + values = {} + for raw_field in cls.__annotations__.keys(): + values[raw_field] = getattr(raw_event, raw_field) + return cls(**values) + + +@dataclass +class TaskStatusUpdateEvent(Event): + task_id: str + status: TaskStatus + + +@dataclass +class ResultStatusUpdateEvent(Event): + result_id: str + status: ResultStatus + + +@dataclass +class ResultOwnerUpdateEvent(Event): + result_id: str + previous_owner_id: str + current_owner_id: str + + +@dataclass +class NewTaskEvent(Event): + task_id: str + payload_id: str + origin_task_id: str + status: TaskStatus + expected_output_keys: List[str] + data_dependencies: List[str] + retry_of_ids: List[str] + parent_task_ids: List[str] + + +@dataclass +class NewResultEvent(Event): + result_id: str + owner_id: str + status: ResultStatus diff --git a/packages/python/tests/conftest.py b/packages/python/tests/conftest.py index c5eaada26..f784c8e34 100644 --- a/packages/python/tests/conftest.py +++ b/packages/python/tests/conftest.py @@ -3,7 +3,7 @@ import pytest import requests -from armonik.client import ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions +from armonik.client import ArmoniKEvents, ArmoniKHealthChecks, ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions from armonik.protogen.worker.agent_service_pb2_grpc import AgentStub from typing import List, Union @@ -55,7 +55,7 @@ def clean_up(request): print("An error occurred when resetting the server: " + str(e)) -def get_client(client_name: str, endpoint: str = grpc_endpoint) -> Union[AgentStub, ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions]: +def get_client(client_name: str, endpoint: str = grpc_endpoint) -> Union[AgentStub, ArmoniKEvents, ArmoniKHealthChecks, ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions]: """ Get the ArmoniK client instance based on the specified service name. @@ -64,7 +64,7 @@ def get_client(client_name: str, endpoint: str = grpc_endpoint) -> Union[AgentSt endpoint (str, optional): The gRPC server endpoint. Defaults to grpc_endpoint. Returns: - Union[AgentStub, ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions] + Union[AgentStub, ArmoniKEvents, ArmoniKHealthChecks, ArmoniKPartitions, ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKVersions] An instance of the specified ArmoniK client. Raises: @@ -78,6 +78,10 @@ def get_client(client_name: str, endpoint: str = grpc_endpoint) -> Union[AgentSt match client_name: case "Agent": return AgentStub(channel) + case "Events": + return ArmoniKEvents(channel) + case "HealthChecks": + return ArmoniKHealthChecks(channel) case "Partitions": return ArmoniKPartitions(channel) case "Results": diff --git a/packages/python/tests/test_events.py b/packages/python/tests/test_events.py new file mode 100644 index 000000000..86ca9e42b --- /dev/null +++ b/packages/python/tests/test_events.py @@ -0,0 +1,22 @@ +from .conftest import all_rpc_called, rpc_called, get_client +from armonik.client import ArmoniKEvents +from armonik.common import EventTypes, NewResultEvent, ResultStatus + + +class TestArmoniKEvents: + def test_get_events_no_filter(self): + def test_handler(session_id, event_type, event): + assert session_id == "session-id" + assert event_type == EventTypes.NEW_RESULT + assert isinstance(event, NewResultEvent) + assert event.result_id == "result-id" + assert event.owner_id == "owner-id" + assert event.status == ResultStatus.CREATED + + tasks_client: ArmoniKEvents = get_client("Events") + tasks_client.get_events("session-id", [EventTypes.TASK_STATUS_UPDATE], [test_handler]) + + assert rpc_called("Events", "GetEvents") + + def test_service_fully_implemented(self): + assert all_rpc_called("Events") diff --git a/packages/python/tests/test_healthcheck.py b/packages/python/tests/test_healthcheck.py new file mode 100644 index 000000000..8062aaab6 --- /dev/null +++ b/packages/python/tests/test_healthcheck.py @@ -0,0 +1,18 @@ +import datetime + +from .conftest import all_rpc_called, rpc_called, get_client +from armonik.client import ArmoniKHealthChecks +from armonik.common import ServiceHealthCheckStatus + + +class TestArmoniKHealthChecks: + + def test_check_health(self): + health_checks_client: ArmoniKHealthChecks = get_client("HealthChecks") + services_health = health_checks_client.check_health() + + assert rpc_called("HealthChecks", "CheckHealth") + assert services_health == {'mock': {'message': 'Mock is healthy', 'status': ServiceHealthCheckStatus.HEALTHY}} + + def test_service_fully_implemented(self): + assert all_rpc_called("HealthChecks")