Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stopper la génération d'alertes de DEP manquant pendant les pannes JPE #4007

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions datascience/src/pipeline/flows/missing_dep_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
make_alerts,
)
from src.pipeline.shared_tasks.control_flow import check_flow_not_running
from src.pipeline.shared_tasks.healthcheck import (
assert_logbook_health,
get_monitorfish_healthcheck,
)


@task(checkpoint=False)
Expand All @@ -28,18 +32,26 @@ def extract_missing_deps(hours_from_now: int):
with Flow("Missing DEP alerts", executor=LocalDaskExecutor()) as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
healthcheck = get_monitorfish_healthcheck()
logbook_healthcheck = assert_logbook_health(healthcheck)
hours_from_now = Parameter("hours_from_now", MISSING_DEP_TRACK_ANALYSIS_HOURS)
vessels_with_missing_deps = extract_missing_deps(hours_from_now)
vessels_with_missing_deps = extract_missing_deps(
hours_from_now, upstream_tasks=[logbook_healthcheck]
)

alerts = make_alerts(
vessels_with_missing_deps,
AlertType.MISSING_DEP_ALERT.value,
AlertType.MISSING_DEP_ALERT.value,
)
silenced_alerts = extract_silenced_alerts(
AlertType.MISSING_DEP_ALERT.value, number_of_hours=hours_from_now
AlertType.MISSING_DEP_ALERT.value,
number_of_hours=hours_from_now,
upstream_tasks=[logbook_healthcheck],
)
active_reportings = extract_active_reportings(
AlertType.MISSING_DEP_ALERT.value, upstream_tasks=[logbook_healthcheck]
)
active_reportings = extract_active_reportings(AlertType.MISSING_DEP_ALERT.value)
filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings)

# Load
Expand Down
40 changes: 40 additions & 0 deletions datascience/src/pipeline/shared_tasks/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,43 @@ def assert_last_positions_flow_health(
f"{max_minutes_without_data} minutes old."
)
)


@task(checkpoint=False)
def assert_logbook_health(
healthcheck: MonitorfishHealthcheck,
utcnow: datetime = None,
max_minutes_without_data: int = 20,
):
"""
Checks if the `date_logbook_message_received` of in the input
`MonitorfishHealthcheck` is older than `max_minutes_without_data` minutes.

Args:
healthcheck (MonitorfishHealthcheck): `MonitorfishHealthcheck` to check.
utcnow (datetime, optional): Date with which to compare the
`date_logbook_message_received` of the input healthcheck. If not supplied,
the current server time is used. Defaults to None.
max_minutes_without_data (int, optional): Number of minutes above which an
absence of data is considered an unhealthy. Defaults to 20.

Raises:
MonitorfishHealthError: If the most recent data received is older than
`max_minutes_without_data`.
"""
if not utcnow:
utcnow = datetime.utcnow()

time_without_data = utcnow - healthcheck.date_logbook_message_received
minutes_without_data = time_without_data.total_seconds() / 60

try:
assert minutes_without_data <= max_minutes_without_data
except AssertionError:
raise MonitorfishHealthError(
(
"The most recent logbook message is too old: it is "
f"{minutes_without_data} minutes old whereas it should be less than "
f"{max_minutes_without_data} minutes old."
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@

import pandas as pd
import pytest
from prefect.engine.signals import TRIGGERFAIL
from sqlalchemy import text

from src.db_config import create_engine
from src.pipeline.exceptions.monitorfish_health_error import MonitorfishHealthError
from src.pipeline.flows.missing_dep_alerts import extract_missing_deps, flow
from src.read_query import read_query
from tests.mocks import mock_check_flow_not_running
from tests.mocks import (
get_monitorfish_healthcheck_mock_factory,
mock_check_flow_not_running,
)

flow.replace(flow.get_tasks("check_flow_not_running")[0], mock_check_flow_not_running)
flow.replace(
flow.get_tasks("get_monitorfish_healthcheck")[0],
get_monitorfish_healthcheck_mock_factory(),
)


@pytest.fixture
Expand Down Expand Up @@ -143,3 +152,25 @@ def test_flow(reset_test_data_missing_dep_alerts):
"flag_state": "FR",
}
assert abs((creation_date - datetime.now(timezone.utc)).total_seconds()) < 60


def test_flow_fails_if_logbook_healthcheck_fails(reset_test_data):
flow.replace(
flow.get_tasks("get_monitorfish_healthcheck")[0],
get_monitorfish_healthcheck_mock_factory(
logbook_message_received_minutes_ago=25
),
)

flow.schedule = None
state = flow.run()

assert not state.is_successful()
assert isinstance(
state.result[flow.get_tasks("assert_logbook_health")[0]].result,
MonitorfishHealthError,
)
assert isinstance(
state.result[flow.get_tasks("load_alerts")[0]].result,
TRIGGERFAIL,
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.pipeline.exceptions import MonitorfishHealthError
from src.pipeline.shared_tasks.healthcheck import (
assert_last_positions_flow_health,
assert_logbook_health,
assert_positions_received_by_api_health,
get_monitorfish_healthcheck,
)
Expand Down Expand Up @@ -80,3 +81,14 @@ def test_assert_last_positions_flow_health_raises_if_last_position_is_too_old(
assert_last_positions_flow_health.run(
healthcheck=healthcheck, utcnow=utcnow, max_minutes_without_data=5
)


def test_assert_logbook_health_with_default_parameters(healthcheck, utcnow):
assert_logbook_health.run(healthcheck=healthcheck, utcnow=utcnow)


def test_assert_logbook_health_raises_if_logbook_is_too_old(healthcheck, utcnow):
with pytest.raises(MonitorfishHealthError):
assert_logbook_health.run(
healthcheck=healthcheck, utcnow=utcnow, max_minutes_without_data=5
)
Loading