diff --git a/backend/alembic/versions/2e4561a69da9_task_execution_add_duration_position.py b/backend/alembic/versions/2e4561a69da9_task_execution_add_duration_position.py new file mode 100644 index 00000000..997ed19a --- /dev/null +++ b/backend/alembic/versions/2e4561a69da9_task_execution_add_duration_position.py @@ -0,0 +1,32 @@ +"""task_execution_add_duration_position + +Revision ID: 2e4561a69da9 +Revises: 5bfe00a08853 +Create Date: 2024-12-21 22:04:04.659923 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '2e4561a69da9' +down_revision = '5bfe00a08853' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column('tasks_executions',sa.Column("duration", + sa.Interval, + nullable=True)) + op.add_column('tasks_executions',sa.Column("position_count", + sa.Integer, + nullable=True)) + pass + + +def downgrade() -> None: + op.drop_column('tasks_executions',"position_count") + op.drop_column('tasks_executions',"duration") + pass diff --git a/backend/bloom/config.py b/backend/bloom/config.py index 1b875151..5ec6cdf5 100644 --- a/backend/bloom/config.py +++ b/backend/bloom/config.py @@ -1,6 +1,6 @@ import os from pathlib import Path - +from datetime import timedelta from pydantic_settings import BaseSettings, SettingsConfigDict from typing import Any @@ -17,6 +17,7 @@ ) class Settings(BaseSettings): + model_config = SettingsConfigDict( # validate_assignment=True allows to update db_url value as soon as one of # postgres_user, postgres_password, postgres_hostname, postgres_port, postgres_db @@ -50,6 +51,8 @@ class Settings(BaseSettings): redis_password: str = Field(default='bloom',min_length=1) redis_cache_expiration: int = Field(default=900) + api_pooling_period: timedelta = Field(default=timedelta(minutes=15)) + logging_level:str=Field( default="INFO", pattern=r'NOTSET|DEBUG|INFO|WARNING|ERROR|CRITICAL' @@ -65,7 +68,6 @@ def update_db_url(self)->dict: self.db_url = new_url return self - settings = Settings(_env_file=os.getenv('BLOOM_CONFIG', Path(__file__).parent.parent.parent.joinpath('.env')), _secrets_dir=os.getenv('BLOOM_SECRETS_DIR', diff --git a/backend/bloom/infra/database/sql_model.py b/backend/bloom/infra/database/sql_model.py index b8b00529..535be415 100644 --- a/backend/bloom/infra/database/sql_model.py +++ b/backend/bloom/infra/database/sql_model.py @@ -230,8 +230,10 @@ class TaskExecution(Base): point_in_time = Column("point_in_time", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + duration = Column("duration", Interval, nullable=True) delta = Column("delta", Interval, nullable=False) active = Column("active", Boolean, nullable=False) + position_count = Column("position_count", Integer, nullable=True) class RelSegmentZone(Base): diff --git a/backend/bloom/infra/repositories/repository_task_execution.py b/backend/bloom/infra/repositories/repository_task_execution.py index 46d88141..79ad4744 100644 --- a/backend/bloom/infra/repositories/repository_task_execution.py +++ b/backend/bloom/infra/repositories/repository_task_execution.py @@ -1,9 +1,9 @@ -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta from bloom.infra.database import sql_model from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert -from sqlalchemy.sql.expression import update,asc,desc +from sqlalchemy.sql.expression import update,asc,desc,delete from sqlalchemy.orm import Session @@ -18,11 +18,26 @@ def get_point_in_time(session: Session, task_name: str) -> datetime: return datetime.fromtimestamp(0, timezone.utc) else: return e.point_in_time + + def set_duration(session: Session, task_name: str, pit: datetime,duration:timedelta)->None: + stmt = (update(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.point_in_time==pit) + .values(duration=duration) + ) + session.execute(stmt) + def set_position_count(session: Session, task_name: str, pit: datetime,count:int)->None: + stmt = (update(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.point_in_time==pit) + .values(position_count=count) + ) + session.execute(stmt) + def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: stmt= ( update(sql_model.TaskExecution) .where(sql_model.TaskExecution.task_name==task_name) - .where(sql_model.TaskExecution.active==True) .values(active=False) ) session.execute(stmt) @@ -37,3 +52,21 @@ def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None: delta=subquery_delta, active=True) session.execute(stmt) + + def remove_point_in_time(session: Session, task_name: str, pit: datetime) -> None: + stmt= (delete(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.point_in_time==pit) + ) + session.execute(stmt) + subquery_last_pit=select(sql_model.TaskExecution.point_in_time)\ + .select_from(sql_model.TaskExecution)\ + .where(sql_model.TaskExecution.task_name==task_name)\ + .order_by(desc(sql_model.TaskExecution.point_in_time))\ + .limit(1).subquery() + stmt = (update(sql_model.TaskExecution) + .where(sql_model.TaskExecution.task_name==task_name) + .where(sql_model.TaskExecution.point_in_time==subquery_last_pit) + .values(active=True) + ) + session.execute(stmt) diff --git a/backend/bloom/tasks/clean_positions.py b/backend/bloom/tasks/clean_positions.py index bab0f357..4e801f87 100644 --- a/backend/bloom/tasks/clean_positions.py +++ b/backend/bloom/tasks/clean_positions.py @@ -1,6 +1,6 @@ import argparse import warnings -from datetime import timedelta +from datetime import datetime,timedelta, timezone from time import perf_counter import numpy as np @@ -57,6 +57,9 @@ def run(batch_time): excursion_repository = use_cases.excursion_repository() segment_repository = use_cases.segment_repository() vessel_position_repository = use_cases.vessel_position_repository() + process_start=datetime.now(timezone.utc) + point_in_time=None + position_count=None with db.session() as session: point_in_time = TaskExecutionRepository.get_point_in_time( session, "clean_positions", @@ -71,7 +74,8 @@ def run(batch_time): # à la prochaine execution pour traiter les enregistrements + récents max_created = max(batch["created_at"]) if len(batch) > 0 else batch_limit logger.info(f"Traitement des positions entre le {point_in_time} et le {max_created}") - logger.info(f"{len(batch)} nouvelles positions de Spire") + position_count=len(batch) + logger.info(f"{position_count} nouvelles positions de Spire") batch.dropna( subset=[ "position_latitude", @@ -164,6 +168,17 @@ def run(batch_time): TaskExecutionRepository.set_point_in_time(session, "clean_positions", max_created) logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions") session.commit() + if point_in_time: + TaskExecutionRepository.set_duration(session, + "clean_positions", + max_created, + datetime.now(timezone.utc)-process_start) + if position_count != None: + TaskExecutionRepository.set_position_count(session, + "clean_positions", + max_created, + position_count) + session.commit() if __name__ == "__main__": diff --git a/backend/bloom/tasks/create_update_excursions_segments.py b/backend/bloom/tasks/create_update_excursions_segments.py index e0706a8d..943384b9 100644 --- a/backend/bloom/tasks/create_update_excursions_segments.py +++ b/backend/bloom/tasks/create_update_excursions_segments.py @@ -107,13 +107,17 @@ def run(): nb_created_excursion = 0 nb_closed_excursion = 0 + process_start = datetime.now(timezone.utc) + point_in_time = None + position_count = None with db.session() as session: point_in_time = TaskExecutionRepository.get_point_in_time( session, "create_update_excursions_segments", ) logger.info(f"Lecture des nouvelles positions depuis le {point_in_time}") batch = vessel_position_repository.get_positions_with_vessel_created_updated_after(session, point_in_time) - logger.info(f"{len(batch)} nouvelles positions") + position_count=len(batch) + logger.info(f"{position_count} nouvelles positions") last_segment = segment_repository.get_last_vessel_id_segments(session) last_segment["longitude"] = None last_segment["latitude"] = None @@ -471,7 +475,17 @@ def get_time_of_departure(): logger.info(f"{nb_last} derniers segments mis à jour") now = datetime.now(timezone.utc) TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments", now) - + session.commit() + if point_in_time: + TaskExecutionRepository.set_duration(session, + "create_update_excursions_segments", + now, + datetime.now(timezone.utc)-process_start) + if now != None: + TaskExecutionRepository.set_position_count(session, + "create_update_excursions_segments", + now, + position_count) session.commit() diff --git a/backend/bloom/tasks/load_spire_data_from_api.py b/backend/bloom/tasks/load_spire_data_from_api.py index 6948f9bf..4a271abd 100644 --- a/backend/bloom/tasks/load_spire_data_from_api.py +++ b/backend/bloom/tasks/load_spire_data_from_api.py @@ -1,6 +1,6 @@ import argparse import json -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta from pathlib import Path from time import perf_counter @@ -10,6 +10,7 @@ from bloom.logger import logger from pydantic import ValidationError from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository +from bloom.config import settings def run(dump_path: str) -> None: @@ -21,29 +22,69 @@ def run(dump_path: str) -> None: orm_data = [] try: + process_start=datetime.now(timezone.utc) + current_datetime=None + position_count= None with db.session() as session: - vessels: list[Vessel] = vessel_repository.get_vessels_list(session) - if len(vessels) > 0: - raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) - current_datetime=datetime.now(timezone.utc) - if dump_path is not None: - try: - now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S") - dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json") - with dump_file.open("wt") as handle: - json.dump(raw_vessels, handle) - except Exception as e: - logger.warning("Echec de l'écriture de la réponse Spire", exc_info=e) - else: - spire_ais_data = map_raw_vessels_to_domain(raw_vessels) - orm_data = spire_ais_data_repository.batch_create_ais_data( - spire_ais_data, - session, - ) + currentTaskTime=TaskExecutionRepository.get_point_in_time(session,"load_spire_data_from_api") + if(currentTaskTime <= datetime.now(timezone.utc) - settings.api_pooling_period): + vessels: list[Vessel] = vessel_repository.get_vessels_list(session) + if len(vessels) > 0: + current_datetime=datetime.now(timezone.utc) TaskExecutionRepository.set_point_in_time(session, - "load_spire_data_from_api", - current_datetime) - session.commit() + "load_spire_data_from_api", + current_datetime) + logger.info(f"Enregistrement du début d'exécution load_spire_data_from_api {current_datetime}") + # Afin de séquencer plusieurs tâche load_spire_data_from_api qui pourraient + # être lancée en parallèle sur différentes machines, on enregistre le point_in_time + # dès le début de la tâche afin que les autres instances détectent qu'une instance + # est déjà en cours d'exécution + # Pour ça, on est obligé de commiter en base dès le début afin + # d'éviter que d'autres instances ne se lancent pendant le traitement + # de la première instance + session.commit() + try: + raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) + position_count=len(raw_vessels) + if dump_path is not None: + try: + now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S") + dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json") + with dump_file.open("wt") as handle: + json.dump(raw_vessels, handle) + except Exception as e: + logger.warning("Echec de l'écriture de la réponse Spire", exc_info=e) + else: + spire_ais_data = map_raw_vessels_to_domain(raw_vessels) + orm_data = spire_ais_data_repository.batch_create_ais_data( + spire_ais_data, + session, + ) + # Vu que l'on a enregistré et commité en bdd une ligne pour signalter qu'une + # instance load_spire_data_from_api était en cours + # en cas d'erreur ou d'interruption volontaire, on supprime la ligne en cours + # cela permettra aux autres instances de se lancer + except (KeyboardInterrupt,Exception) as e: + TaskExecutionRepository.remove_point_in_time(session, + "load_spire_data_from_api", + current_datetime) + session.commit() + raise(e) + session.commit() + if current_datetime != None: + TaskExecutionRepository.set_duration(session, + "load_spire_data_from_api", + current_datetime, + datetime.now(timezone.utc)-process_start) + if position_count != None: + TaskExecutionRepository.set_position_count(session, + "load_spire_data_from_api", + current_datetime, + position_count) + session.commit() + else: + logger.info(f'Le temps écoulé depuis le dernier chargement est inférieur à la période d\'interrogation {settings.api_pooling_period}') + except ValidationError as e: logger.error("Erreur de validation des données JSON") logger.error(e.errors())