diff --git a/datascience/src/pipeline/flows/missing_dep_alerts.py b/datascience/src/pipeline/flows/missing_dep_alerts.py index 8e5dac8e0a..abd8fa5540 100644 --- a/datascience/src/pipeline/flows/missing_dep_alerts.py +++ b/datascience/src/pipeline/flows/missing_dep_alerts.py @@ -14,6 +14,10 @@ make_alerts, ) from src.pipeline.shared_tasks.control_flow import check_flow_not_running +from src.pipeline.shared_tasks.healthcheck import ( + assert_logbook_health, + get_monitorfish_healthcheck, +) @task(checkpoint=False) @@ -28,8 +32,12 @@ def extract_missing_deps(hours_from_now: int): with Flow("Missing DEP alerts", executor=LocalDaskExecutor()) as flow: flow_not_running = check_flow_not_running() with case(flow_not_running, True): + healthcheck = get_monitorfish_healthcheck() + logbook_healthcheck = assert_logbook_health(healthcheck) hours_from_now = Parameter("hours_from_now", MISSING_DEP_TRACK_ANALYSIS_HOURS) - vessels_with_missing_deps = extract_missing_deps(hours_from_now) + vessels_with_missing_deps = extract_missing_deps( + hours_from_now, upstream_tasks=[logbook_healthcheck] + ) alerts = make_alerts( vessels_with_missing_deps, @@ -37,9 +45,13 @@ def extract_missing_deps(hours_from_now: int): AlertType.MISSING_DEP_ALERT.value, ) silenced_alerts = extract_silenced_alerts( - AlertType.MISSING_DEP_ALERT.value, number_of_hours=hours_from_now + AlertType.MISSING_DEP_ALERT.value, + number_of_hours=hours_from_now, + upstream_tasks=[logbook_healthcheck], + ) + active_reportings = extract_active_reportings( + AlertType.MISSING_DEP_ALERT.value, upstream_tasks=[logbook_healthcheck] ) - active_reportings = extract_active_reportings(AlertType.MISSING_DEP_ALERT.value) filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings) # Load diff --git a/datascience/src/pipeline/shared_tasks/healthcheck.py b/datascience/src/pipeline/shared_tasks/healthcheck.py index 184cb8f503..533bddb87f 100644 --- a/datascience/src/pipeline/shared_tasks/healthcheck.py +++ b/datascience/src/pipeline/shared_tasks/healthcheck.py @@ -105,3 +105,43 @@ def assert_last_positions_flow_health( f"{max_minutes_without_data} minutes old." ) ) + + +@task(checkpoint=False) +def assert_logbook_health( + healthcheck: MonitorfishHealthcheck, + utcnow: datetime = None, + max_minutes_without_data: int = 20, +): + """ + Checks if the `date_logbook_message_received` of in the input + `MonitorfishHealthcheck` is older than `max_minutes_without_data` minutes. + + Args: + healthcheck (MonitorfishHealthcheck): `MonitorfishHealthcheck` to check. + utcnow (datetime, optional): Date with which to compare the + `date_logbook_message_received` of the input healthcheck. If not supplied, + the current server time is used. Defaults to None. + max_minutes_without_data (int, optional): Number of minutes above which an + absence of data is considered an unhealthy. Defaults to 20. + + Raises: + MonitorfishHealthError: If the most recent data received is older than + `max_minutes_without_data`. + """ + if not utcnow: + utcnow = datetime.utcnow() + + time_without_data = utcnow - healthcheck.date_logbook_message_received + minutes_without_data = time_without_data.total_seconds() / 60 + + try: + assert minutes_without_data <= max_minutes_without_data + except AssertionError: + raise MonitorfishHealthError( + ( + "The most recent logbook message is too old: it is " + f"{minutes_without_data} minutes old whereas it should be less than " + f"{max_minutes_without_data} minutes old." + ) + ) diff --git a/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py b/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py index c8fdc940d6..7f8795a750 100644 --- a/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py +++ b/datascience/tests/test_pipeline/test_flows/test_missing_dep_alerts.py @@ -2,14 +2,23 @@ import pandas as pd import pytest +from prefect.engine.signals import TRIGGERFAIL from sqlalchemy import text from src.db_config import create_engine +from src.pipeline.exceptions.monitorfish_health_error import MonitorfishHealthError from src.pipeline.flows.missing_dep_alerts import extract_missing_deps, flow from src.read_query import read_query -from tests.mocks import mock_check_flow_not_running +from tests.mocks import ( + get_monitorfish_healthcheck_mock_factory, + mock_check_flow_not_running, +) flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running) +flow.replace( + flow.get_tasks("get_monitorfish_healthcheck")[0], + get_monitorfish_healthcheck_mock_factory(), +) @pytest.fixture @@ -143,3 +152,25 @@ def test_flow(reset_test_data_missing_dep_alerts): "flag_state": "FR", } assert abs((creation_date - datetime.now(timezone.utc)).total_seconds()) < 60 + + +def test_flow_fails_if_logbook_healthcheck_fails(reset_test_data): + flow.replace( + flow.get_tasks("get_monitorfish_healthcheck")[0], + get_monitorfish_healthcheck_mock_factory( + logbook_message_received_minutes_ago=25 + ), + ) + + flow.schedule = None + state = flow.run() + + assert not state.is_successful() + assert isinstance( + state.result[flow.get_tasks("assert_logbook_health")[0]].result, + MonitorfishHealthError, + ) + assert isinstance( + state.result[flow.get_tasks("load_alerts")[0]].result, + TRIGGERFAIL, + ) diff --git a/datascience/tests/test_pipeline/test_shared_tasks/test_healthcheck.py b/datascience/tests/test_pipeline/test_shared_tasks/test_healthcheck.py index f734fbd245..87a6bb1384 100644 --- a/datascience/tests/test_pipeline/test_shared_tasks/test_healthcheck.py +++ b/datascience/tests/test_pipeline/test_shared_tasks/test_healthcheck.py @@ -10,6 +10,7 @@ from src.pipeline.exceptions import MonitorfishHealthError from src.pipeline.shared_tasks.healthcheck import ( assert_last_positions_flow_health, + assert_logbook_health, assert_positions_received_by_api_health, get_monitorfish_healthcheck, ) @@ -80,3 +81,14 @@ def test_assert_last_positions_flow_health_raises_if_last_position_is_too_old( assert_last_positions_flow_health.run( healthcheck=healthcheck, utcnow=utcnow, max_minutes_without_data=5 ) + + +def test_assert_logbook_health_with_default_parameters(healthcheck, utcnow): + assert_logbook_health.run(healthcheck=healthcheck, utcnow=utcnow) + + +def test_assert_logbook_health_raises_if_logbook_is_too_old(healthcheck, utcnow): + with pytest.raises(MonitorfishHealthError): + assert_logbook_health.run( + healthcheck=healthcheck, utcnow=utcnow, max_minutes_without_data=5 + )