Skip to content

Commit

Permalink
Merge pull request #399 from dataforgoodfr/398_improve_db_task_execut…
Browse files Browse the repository at this point in the history
…ions_traces

398 improve db task executions traces
  • Loading branch information
rv2931 authored Jan 7, 2025
2 parents 1ea3fd5 + 0c138f4 commit 397c83f
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""task_execution_add_duration_position
Revision ID: 2e4561a69da9
Revises: 5bfe00a08853
Create Date: 2024-12-21 22:04:04.659923
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '2e4561a69da9'
down_revision = '5bfe00a08853'
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column('tasks_executions',sa.Column("duration",
sa.Interval,
nullable=True))
op.add_column('tasks_executions',sa.Column("position_count",
sa.Integer,
nullable=True))
pass


def downgrade() -> None:
op.drop_column('tasks_executions',"position_count")
op.drop_column('tasks_executions',"duration")
pass
6 changes: 4 additions & 2 deletions backend/bloom/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from pathlib import Path

from datetime import timedelta
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Any

Expand All @@ -17,6 +17,7 @@
)

class Settings(BaseSettings):

model_config = SettingsConfigDict(
# validate_assignment=True allows to update db_url value as soon as one of
# postgres_user, postgres_password, postgres_hostname, postgres_port, postgres_db
Expand Down Expand Up @@ -50,6 +51,8 @@ class Settings(BaseSettings):
redis_password: str = Field(default='bloom',min_length=1)
redis_cache_expiration: int = Field(default=900)

api_pooling_period: timedelta = Field(default=timedelta(minutes=15))

logging_level:str=Field(
default="INFO",
pattern=r'NOTSET|DEBUG|INFO|WARNING|ERROR|CRITICAL'
Expand All @@ -65,7 +68,6 @@ def update_db_url(self)->dict:
self.db_url = new_url
return self


settings = Settings(_env_file=os.getenv('BLOOM_CONFIG',
Path(__file__).parent.parent.parent.joinpath('.env')),
_secrets_dir=os.getenv('BLOOM_SECRETS_DIR',
Expand Down
2 changes: 2 additions & 0 deletions backend/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ class TaskExecution(Base):
point_in_time = Column("point_in_time", DateTime(timezone=True))
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now())
duration = Column("duration", Interval, nullable=True)
delta = Column("delta", Interval, nullable=False)
active = Column("active", Boolean, nullable=False)
position_count = Column("position_count", Integer, nullable=True)


class RelSegmentZone(Base):
Expand Down
39 changes: 36 additions & 3 deletions backend/bloom/infra/repositories/repository_task_execution.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta

from bloom.infra.database import sql_model
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql.expression import update,asc,desc
from sqlalchemy.sql.expression import update,asc,desc,delete
from sqlalchemy.orm import Session


Expand All @@ -18,11 +18,26 @@ def get_point_in_time(session: Session, task_name: str) -> datetime:
return datetime.fromtimestamp(0, timezone.utc)
else:
return e.point_in_time

def set_duration(session: Session, task_name: str, pit: datetime,duration:timedelta)->None:
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==pit)
.values(duration=duration)
)
session.execute(stmt)
def set_position_count(session: Session, task_name: str, pit: datetime,count:int)->None:
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==pit)
.values(position_count=count)
)
session.execute(stmt)


def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None:
stmt= ( update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.active==True)
.values(active=False)
)
session.execute(stmt)
Expand All @@ -37,3 +52,21 @@ def set_point_in_time(session: Session, task_name: str, pit: datetime) -> None:
delta=subquery_delta,
active=True)
session.execute(stmt)

def remove_point_in_time(session: Session, task_name: str, pit: datetime) -> None:
stmt= (delete(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==pit)
)
session.execute(stmt)
subquery_last_pit=select(sql_model.TaskExecution.point_in_time)\
.select_from(sql_model.TaskExecution)\
.where(sql_model.TaskExecution.task_name==task_name)\
.order_by(desc(sql_model.TaskExecution.point_in_time))\
.limit(1).subquery()
stmt = (update(sql_model.TaskExecution)
.where(sql_model.TaskExecution.task_name==task_name)
.where(sql_model.TaskExecution.point_in_time==subquery_last_pit)
.values(active=True)
)
session.execute(stmt)
19 changes: 17 additions & 2 deletions backend/bloom/tasks/clean_positions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import warnings
from datetime import timedelta
from datetime import datetime,timedelta, timezone
from time import perf_counter

import numpy as np
Expand Down Expand Up @@ -57,6 +57,9 @@ def run(batch_time):
excursion_repository = use_cases.excursion_repository()
segment_repository = use_cases.segment_repository()
vessel_position_repository = use_cases.vessel_position_repository()
process_start=datetime.now(timezone.utc)
point_in_time=None
position_count=None
with db.session() as session:
point_in_time = TaskExecutionRepository.get_point_in_time(
session, "clean_positions",
Expand All @@ -71,7 +74,8 @@ def run(batch_time):
# à la prochaine execution pour traiter les enregistrements + récents
max_created = max(batch["created_at"]) if len(batch) > 0 else batch_limit
logger.info(f"Traitement des positions entre le {point_in_time} et le {max_created}")
logger.info(f"{len(batch)} nouvelles positions de Spire")
position_count=len(batch)
logger.info(f"{position_count} nouvelles positions de Spire")
batch.dropna(
subset=[
"position_latitude",
Expand Down Expand Up @@ -164,6 +168,17 @@ def run(batch_time):
TaskExecutionRepository.set_point_in_time(session, "clean_positions", max_created)
logger.info(f"Ecriture de {len(clean_positions)} positions dans la table vessel_positions")
session.commit()
if point_in_time:
TaskExecutionRepository.set_duration(session,
"clean_positions",
max_created,
datetime.now(timezone.utc)-process_start)
if position_count != None:
TaskExecutionRepository.set_position_count(session,
"clean_positions",
max_created,
position_count)
session.commit()


if __name__ == "__main__":
Expand Down
18 changes: 16 additions & 2 deletions backend/bloom/tasks/create_update_excursions_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,17 @@ def run():
nb_created_excursion = 0
nb_closed_excursion = 0

process_start = datetime.now(timezone.utc)
point_in_time = None
position_count = None
with db.session() as session:
point_in_time = TaskExecutionRepository.get_point_in_time(
session, "create_update_excursions_segments",
)
logger.info(f"Lecture des nouvelles positions depuis le {point_in_time}")
batch = vessel_position_repository.get_positions_with_vessel_created_updated_after(session, point_in_time)
logger.info(f"{len(batch)} nouvelles positions")
position_count=len(batch)
logger.info(f"{position_count} nouvelles positions")
last_segment = segment_repository.get_last_vessel_id_segments(session)
last_segment["longitude"] = None
last_segment["latitude"] = None
Expand Down Expand Up @@ -471,7 +475,17 @@ def get_time_of_departure():
logger.info(f"{nb_last} derniers segments mis à jour")
now = datetime.now(timezone.utc)
TaskExecutionRepository.set_point_in_time(session, "create_update_excursions_segments", now)

session.commit()
if point_in_time:
TaskExecutionRepository.set_duration(session,
"create_update_excursions_segments",
now,
datetime.now(timezone.utc)-process_start)
if now != None:
TaskExecutionRepository.set_position_count(session,
"create_update_excursions_segments",
now,
position_count)
session.commit()


Expand Down
85 changes: 63 additions & 22 deletions backend/bloom/tasks/load_spire_data_from_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import json
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from pathlib import Path
from time import perf_counter

Expand All @@ -10,6 +10,7 @@
from bloom.logger import logger
from pydantic import ValidationError
from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository
from bloom.config import settings


def run(dump_path: str) -> None:
Expand All @@ -21,29 +22,69 @@ def run(dump_path: str) -> None:

orm_data = []
try:
process_start=datetime.now(timezone.utc)
current_datetime=None
position_count= None
with db.session() as session:
vessels: list[Vessel] = vessel_repository.get_vessels_list(session)
if len(vessels) > 0:
raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels)
current_datetime=datetime.now(timezone.utc)
if dump_path is not None:
try:
now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S")
dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json")
with dump_file.open("wt") as handle:
json.dump(raw_vessels, handle)
except Exception as e:
logger.warning("Echec de l'écriture de la réponse Spire", exc_info=e)
else:
spire_ais_data = map_raw_vessels_to_domain(raw_vessels)
orm_data = spire_ais_data_repository.batch_create_ais_data(
spire_ais_data,
session,
)
currentTaskTime=TaskExecutionRepository.get_point_in_time(session,"load_spire_data_from_api")
if(currentTaskTime <= datetime.now(timezone.utc) - settings.api_pooling_period):
vessels: list[Vessel] = vessel_repository.get_vessels_list(session)
if len(vessels) > 0:
current_datetime=datetime.now(timezone.utc)
TaskExecutionRepository.set_point_in_time(session,
"load_spire_data_from_api",
current_datetime)
session.commit()
"load_spire_data_from_api",
current_datetime)
logger.info(f"Enregistrement du début d'exécution load_spire_data_from_api {current_datetime}")
# Afin de séquencer plusieurs tâche load_spire_data_from_api qui pourraient
# être lancée en parallèle sur différentes machines, on enregistre le point_in_time
# dès le début de la tâche afin que les autres instances détectent qu'une instance
# est déjà en cours d'exécution
# Pour ça, on est obligé de commiter en base dès le début afin
# d'éviter que d'autres instances ne se lancent pendant le traitement
# de la première instance
session.commit()
try:
raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels)
position_count=len(raw_vessels)
if dump_path is not None:
try:
now =current_datetime.strftime("%Y-%m-%dT%H:%M:%S")
dump_file = Path(args.dump_path, f"spire_{now}").with_suffix(".json")
with dump_file.open("wt") as handle:
json.dump(raw_vessels, handle)
except Exception as e:
logger.warning("Echec de l'écriture de la réponse Spire", exc_info=e)
else:
spire_ais_data = map_raw_vessels_to_domain(raw_vessels)
orm_data = spire_ais_data_repository.batch_create_ais_data(
spire_ais_data,
session,
)
# Vu que l'on a enregistré et commité en bdd une ligne pour signalter qu'une
# instance load_spire_data_from_api était en cours
# en cas d'erreur ou d'interruption volontaire, on supprime la ligne en cours
# cela permettra aux autres instances de se lancer
except (KeyboardInterrupt,Exception) as e:
TaskExecutionRepository.remove_point_in_time(session,
"load_spire_data_from_api",
current_datetime)
session.commit()
raise(e)
session.commit()
if current_datetime != None:
TaskExecutionRepository.set_duration(session,
"load_spire_data_from_api",
current_datetime,
datetime.now(timezone.utc)-process_start)
if position_count != None:
TaskExecutionRepository.set_position_count(session,
"load_spire_data_from_api",
current_datetime,
position_count)
session.commit()
else:
logger.info(f'Le temps écoulé depuis le dernier chargement est inférieur à la période d\'interrogation {settings.api_pooling_period}')

except ValidationError as e:
logger.error("Erreur de validation des données JSON")
logger.error(e.errors())
Expand Down

0 comments on commit 397c83f

Please sign in to comment.