diff --git a/.env.template b/.env.template index 1891c821..a3536517 100644 --- a/.env.template +++ b/.env.template @@ -17,6 +17,21 @@ DATA_FOLDER= LOGGING_LEVEL=INFO +AMP_DATA_CSV_PATH= +PORT_DATA_CSV_PATH= +PORT_POLYGON_DATA_CSV_PATH= +VESSEL_DATA_CSV_PATH= + +VESSEL_POSITIONS_DATA_CSV_PATH= +SPIRE_DATA_CSV_PATH= + +#PORT_RADIUS_M=3000 +#PORT_RESOLUTION=10 + + +#LOGGING_FORMAT="[%(name)s %(levelname)s @ %(asctime)s] %(pathname)s:%(lineno)s: %(message)s" + + ############################################################################################### # DOCKER SPECIFIC VARIABLES ############################################################################################### diff --git a/README.md b/README.md index c4400478..7e0eb59a 100644 --- a/README.md +++ b/README.md @@ -156,10 +156,8 @@ password, database name, port to use) # * data/spire_positions_subset.csv # * data/vessels_subset.csv # * data/zones_subset.csv - $ python3 src/bloom/tasks/load_dim_vessel_from_csv.py - $ python3 src/bloom/tasks/load_dim_port_from_csv.py - $ python3 src/bloom/tasks/load_dim_zone_amp_from_csv.py - $ python3 src/bloom/tasks/compute_port_geometry_buffer.py + $ python3 src/tasks/init.py && + $ python3 src/tasks/pipeline_ais_data.py ``` #### Starting the application diff --git a/data/.keep b/data/.keep new file mode 100644 index 00000000..e69de29b diff --git a/docker-compose-load-data.yaml b/docker-compose-load-data.yaml index 1cfd1969..ed9412c7 100644 --- a/docker-compose-load-data.yaml +++ b/docker-compose-load-data.yaml @@ -10,16 +10,14 @@ x-common-infos: services: load-data: container_name: bloom-load-data - image: d4g/bloom:${VERSION:-latest} + image: ghcr.io/dataforgoodfr/12_bloom:${VERSION:-latest} entrypoint: /bin/bash # Nominal start: command: - -c - /venv/bin/python3 src/bloom/tasks/load_dim_vessel_from_csv.py && - /venv/bin/python3 src/alembic/init_script/load_positions_data.py && - /venv/bin/python3 src/alembic/init_script/load_amp_data.py && - /venv/bin/python3 src/bloom/tasks/load_dim_port_from_csv.py && - /venv/bin/python3 src/bloom/tasks/compute_port_geometry_buffer.py + - | + /venv/bin/python3 src/tasks/init.py && + /venv/bin/python3 src/tasks/pipeline_ais_data.py volumes: - ./:/project/ - ./data:/project/data diff --git a/docker-compose.yaml b/docker-compose.yaml index c9cd9e4b..3e6f020d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -10,7 +10,7 @@ x-common-infos: services: bloom: container_name: bloom - image: d4g/bloom:${VERSION:-latest} + image: ghcr.io/dataforgoodfr/12_bloom:${VERSION:-latest} build: context: . dockerfile: ./docker/Dockerfile @@ -60,7 +60,7 @@ services: init: container_name: init_bloom hostname: init_bloom - image: d4g/bloom:${VERSION:-latest} + image: ghcr.io/dataforgoodfr/12_bloom:${VERSION:-latest} # Nominal start: # As postgres+postgis gt available, then unavialable, then avialable at database init diff --git a/src/alembic/versions/72af814ca33d_add_batch_reference.py b/src/alembic/versions/72af814ca33d_add_batch_reference.py new file mode 100644 index 00000000..5dd6c7b9 --- /dev/null +++ b/src/alembic/versions/72af814ca33d_add_batch_reference.py @@ -0,0 +1,31 @@ +"""add batch reference + +Revision ID: 72af814ca33d +Revises: 4e912be8a176 +Create Date: 2024-04-03 21:45:21.431765 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '72af814ca33d' +down_revision = '4e912be8a176' +branch_labels = None +depends_on = '4e912be8a176' + + +def upgrade() -> None: + op.add_column('dim_vessel',sa.Column("batch", sa.String),schema='public') + op.add_column('spire_ais_data',sa.Column("batch", sa.String), + schema='public') + op.rename_table('spire_ais_data','stg_spire_ais_data',schema='public') + pass + + +def downgrade() -> None: + op.rename_table('stg_spire_ais_data','spire_ais_data',schema='public') + op.drop_column('spire_ais_data','batch',schema='public') + op.drop_column('dim_vessel','batch',schema='public') + pass diff --git a/src/bloom/config.py b/src/bloom/config.py index 254eedc2..8bcda624 100644 --- a/src/bloom/config.py +++ b/src/bloom/config.py @@ -30,6 +30,8 @@ class Settings(BaseSettings): env_file_encoding = 'utf-8', extra='ignore' ) + + logging_format:str="[%(name)s %(levelname)s @ %(asctime)s] %(message)s" # Déclaration des attributs/paramètres disponibles au sein de la class settings postgres_user:str = Field(default='') @@ -49,6 +51,17 @@ class Settings(BaseSettings): default="INFO", pattern=r'NOTSET|DEBUG|INFO|WARNING|ERROR|CRITICAL' ) + + amp_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/zones_subset.csv") + port_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports.csv") + port_polygon_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/ports_rad3000_res10.csv") + port_radius_m:int = 3000 # Radius in meters + port_resolution:int = 10 # Number of points in the resulting polygon + vessel_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/vessels_subset.csv") + spire_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/spire_positions_subset.csv") + + + segment_data_csv_path:Path = Path(__file__).parent.joinpath("../../data/segments_subset.csv") @model_validator(mode='after') def update_db_url(self)->dict: diff --git a/src/bloom/domain/excursion.py b/src/bloom/domain/excursion.py index 58f47121..06f58ab6 100644 --- a/src/bloom/domain/excursion.py +++ b/src/bloom/domain/excursion.py @@ -7,7 +7,7 @@ class Excursion(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - id: int | None = None + id: Union[int , None] = None vessel_id: int departure_port_id: Union[int, None] = None departure_at: Union[datetime, None] = None @@ -27,3 +27,4 @@ class Excursion(BaseModel): total_time_extincting_amp: Union[timedelta, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/port.py b/src/bloom/domain/port.py index 8e36d6a8..de26bd60 100644 --- a/src/bloom/domain/port.py +++ b/src/bloom/domain/port.py @@ -21,3 +21,4 @@ class Port(BaseModel): has_excursion: bool = False created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/rel_segment_zone.py b/src/bloom/domain/rel_segment_zone.py index 0c74431c..5650d1bb 100644 --- a/src/bloom/domain/rel_segment_zone.py +++ b/src/bloom/domain/rel_segment_zone.py @@ -9,3 +9,4 @@ class RelSegmentZone(BaseModel): segment_id: int zone_id: int created_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/segment.py b/src/bloom/domain/segment.py index 00d61bd1..8ade946c 100644 --- a/src/bloom/domain/segment.py +++ b/src/bloom/domain/segment.py @@ -1,30 +1,36 @@ from datetime import datetime, timedelta +<<<<<<< HEAD +======= + +from pydantic import BaseModel + +>>>>>>> 576cbd6 (fix/feat: load SpireDataFromCsv task) from typing import Union from pydantic import BaseModel, ConfigDict from shapely import Point - class Segment(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - id: int | None = None + id: Union[ int , None ] = None excursion_id: int timestamp_start: Union[datetime, None] = None timestamp_end: Union[datetime, None] = None segment_duration: Union[timedelta, None] = None start_position: Union[Point, None] = None end_position: Union[Point, None] = None - heading: Union[float | None] = None - distance: Union[float | None] = None - average_speed: Union[float | None] = None - speed_at_start: Union[float | None] = None - speed_at_end: Union[float | None] = None - heading_at_start: Union[float | None] = None - heading_at_end: Union[float | None] = None - type: Union[str | None] = None - in_amp_zone: Union[bool | None] = None - in_territorial_waters: Union[bool | None] = None - in_costal_waters: Union[bool | None] = None - last_vessel_segment: Union[bool | None] = None + heading: Union[float , None] = None + distance: Union[float , None] = None + average_speed: Union[float , None] = None + speed_at_start: Union[float , None] = None + speed_at_end: Union[float , None] = None + heading_at_start: Union[float , None] = None + heading_at_end: Union[float , None] = None + type: Union[str , None] = None + in_amp_zone: Union[bool , None] = None + in_territorial_waters: Union[bool , None] = None + in_costal_waters: Union[bool , None] = None + last_vessel_segment: Union[bool , None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/domain/spire_ais_data.py b/src/bloom/domain/spire_ais_data.py index 0f51f496..7ec450ea 100644 --- a/src/bloom/domain/spire_ais_data.py +++ b/src/bloom/domain/spire_ais_data.py @@ -8,38 +8,39 @@ class SpireAisData(BaseModel): - id: Union[int | None] = None # noqa: UP007 + id: Union[int , None] = None # noqa: UP007 spire_update_statement: datetime - vessel_ais_class: Union[str | None] = None # noqa: UP007 - vessel_flag: Union[str | None] = None # noqa: UP007 - vessel_name: Union[str | None] = None # noqa: UP007 - vessel_callsign: Union[str | None] = None # noqa: UP007 - vessel_timestamp: Union[datetime | None] = None # noqa: UP007 - vessel_update_timestamp: Union[datetime | None] = None # noqa: UP007 - vessel_ship_type: Union[str | None] = None # noqa: UP007 - vessel_sub_ship_type: Union[str | None] = None # noqa: UP007 - vessel_mmsi: Union[int | None] = None # noqa: UP007 - vessel_imo: Union[int | None] = None # noqa: UP007 - vessel_width: Union[int | None] = None # noqa: UP007 - vessel_length: Union[int | None] = None # noqa: UP007 - position_accuracy: Union[str | None] = None # noqa: UP007 - position_collection_type: Union[str | None] = None # noqa: UP007 - position_course: Union[float | None] = None # noqa: UP007 - position_heading: Union[float | None] = None # noqa: UP007 - position_latitude: Union[float | None] = None # noqa: UP007 - position_longitude: Union[float | None] = None # noqa: UP007 - position_maneuver: Union[str | None] = None # noqa: UP007 - position_navigational_status: Union[str | None] = None # noqa: UP007 - position_rot: Union[float | None] = None # noqa: UP007 - position_speed: Union[float | None] = None # noqa: UP007 - position_timestamp: Union[datetime | None] = None # noqa: UP007 - position_update_timestamp: Union[datetime | None] = None # noqa: UP007 - voyage_destination: Union[str | None] = None # noqa: UP007 - voyage_draught: Union[float | None] = None # noqa: UP007 - voyage_eta: Union[datetime | None] = None # noqa: UP007 - voyage_timestamp: Union[datetime | None] = None # noqa: UP007 - voyage_update_timestamp: Union[datetime | None] = None # noqa: UP007 + vessel_ais_class: Union[str , None] = None # noqa: UP007 + vessel_flag: Union[str , None] = None # noqa: UP007 + vessel_name: Union[str , None] = None # noqa: UP007 + vessel_callsign: Union[str , None] = None # noqa: UP007 + vessel_timestamp: Union[datetime , None] = None # noqa: UP007 + vessel_update_timestamp: Union[datetime , None] = None # noqa: UP007 + vessel_ship_type: Union[str , None] = None # noqa: UP007 + vessel_sub_ship_type: Union[str , None] = None # noqa: UP007 + vessel_mmsi: Union[int , None] = None # noqa: UP007 + vessel_imo: Union[int , None] = None # noqa: UP007 + vessel_width: Union[int , None] = None # noqa: UP007 + vessel_length: Union[int , None] = None # noqa: UP007 + position_accuracy: Union[str , None] = None # noqa: UP007 + position_collection_type: Union[str , None] = None # noqa: UP007 + position_course: Union[float , None] = None # noqa: UP007 + position_heading: Union[float , None] = None # noqa: UP007 + position_latitude: Union[float , None] = None # noqa: UP007 + position_longitude: Union[float , None] = None # noqa: UP007 + position_maneuver: Union[str , None] = None # noqa: UP007 + position_navigational_status: Union[str , None] = None # noqa: UP007 + position_rot: Union[float , None] = None # noqa: UP007 + position_speed: Union[float , None] = None # noqa: UP007 + position_timestamp: Union[datetime , None] = None # noqa: UP007 + position_update_timestamp: Union[datetime , None] = None # noqa: UP007 + voyage_destination: Union[str , None] = None # noqa: UP007 + voyage_draught: Union[float , None] = None # noqa: UP007 + voyage_eta: Union[datetime , None] = None # noqa: UP007 + voyage_timestamp: Union[datetime , None] = None # noqa: UP007 + voyage_update_timestamp: Union[datetime , None] = None # noqa: UP007 created_at: Union[datetime, None] = None # noqa: UP007 + batch: Union[str, None] = None def map_from_spire(spire_update_timestamp: datetime, vessel: dict[str, Any]): # noqa: ANN201 def deep_get(dictionary: dict[str, Any], *keys) -> str: diff --git a/src/bloom/domain/vessel_data.py b/src/bloom/domain/vessel_data.py index 745f08d9..569ac663 100644 --- a/src/bloom/domain/vessel_data.py +++ b/src/bloom/domain/vessel_data.py @@ -6,18 +6,22 @@ class VesselData(BaseModel): - id: int | None = None + id: Union[ int , None ] timestamp: datetime - ais_class: str | None - flag: str | None - name: str | None - callsign: str | None - timestamp: datetime | None - ship_type: str | None - sub_ship_type: str | None - mmsi: int | None - imo: int | None - width: int | None - length: int | None + ais_class: Union[str , None] + flag: Union[str , None] + name: Union[str , None] + callsign: Union[str , None] + timestamp: Union[datetime , None] + ship_type: Union[str , None] + sub_ship_type: Union[str , None] + mmsi: Union[int , None] + imo: Union[int , None] + width: Union[int , None] + length: Union[int , None] vessel_id: int - created_at: Union[datetime, None] = None + created_at: Union[datetime, None] + + def __init__(self): + self.id = None + self.created_at = None \ No newline at end of file diff --git a/src/bloom/domain/vessel_position.py b/src/bloom/domain/vessel_position.py index 19b4e9f0..4c82363c 100644 --- a/src/bloom/domain/vessel_position.py +++ b/src/bloom/domain/vessel_position.py @@ -9,18 +9,18 @@ class VesselPosition(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - id: int | None = None + id: Union[int , None] = None timestamp: datetime accuracy: Union[str, None] = None collection_type: Union[str, None] = None course: Union[float, None] = None heading: Union[float, None] = None position: Union[Point, None] = None - latitude: Union[float | None] = None - longitude: Union[float | None] = None - maneuver: Union[str | None] = None - navigational_status: Union[str | None] = None - rot: Union[float | None] = None - speed: Union[float | None] = None + latitude: Union[float , None] = None + longitude: Union[float , None] = None + maneuver: Union[str , None] = None + navigational_status: Union[str , None] = None + rot: Union[float , None] = None + speed: Union[float , None] = None vessel_id: int created_at: Union[datetime, None] = None diff --git a/src/bloom/domain/vessel_voyage.py b/src/bloom/domain/vessel_voyage.py index b29903aa..a4c4aa12 100644 --- a/src/bloom/domain/vessel_voyage.py +++ b/src/bloom/domain/vessel_voyage.py @@ -6,7 +6,7 @@ class VesselVoyage(BaseModel): - id: int | None = None + id: Union[ int , None ] = None timestamp: datetime destination: str | None draught: float | None diff --git a/src/bloom/domain/white_zone.py b/src/bloom/domain/white_zone.py index 4c10d4e3..3ef076ab 100644 --- a/src/bloom/domain/white_zone.py +++ b/src/bloom/domain/white_zone.py @@ -6,7 +6,7 @@ class WhiteZone(BaseModel): - id: int | None = None + id: Union[ int | None ] = None geometry: Union[Geometry, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None diff --git a/src/bloom/domain/zone.py b/src/bloom/domain/zone.py index 5effbddd..0f38369f 100644 --- a/src/bloom/domain/zone.py +++ b/src/bloom/domain/zone.py @@ -7,7 +7,7 @@ class Zone(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - id: int | None = None + id: Union[int , None] = None category: str sub_category: Union[str, None] = None name: str @@ -16,3 +16,4 @@ class Zone(BaseModel): json_data: Union[dict, None] = None created_at: Union[datetime, None] = None updated_at: Union[datetime, None] = None + batch: Union[str, None] = None diff --git a/src/bloom/infra/database/sql_model.py b/src/bloom/infra/database/sql_model.py index 34fa8196..7f51d857 100644 --- a/src/bloom/infra/database/sql_model.py +++ b/src/bloom/infra/database/sql_model.py @@ -44,6 +44,7 @@ class Vessel(Base): "created_at", DateTime(timezone=True), nullable=False, server_default=func.now(), ) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class VesselPositionSpire(Base): @@ -164,6 +165,7 @@ class Port(Base): has_excursion = Column("has_excursion", Boolean, default=False) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class SpireAisData(Base): @@ -201,6 +203,7 @@ class SpireAisData(Base): voyage_timestamp = Column("voyage_timestamp", DateTime(timezone=True)) voyage_update_timestamp = Column("voyage_update_timestamp", DateTime(timezone=True)) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) + batch = Column("batch", String) class Zone(Base): @@ -213,6 +216,7 @@ class Zone(Base): centroid = Column("centroid", Geometry(geometry_type="POINT", srid=settings.srid)) json_data = Column("json_data", JSONB) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) + batch = Column("batch", String) class WhiteZone(Base): @@ -293,6 +297,7 @@ class Excursion(Base): total_time_extincting_amp = Column("total_time_extincting_amp", Interval) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class Segment(Base): @@ -318,6 +323,7 @@ class Segment(Base): last_vessel_segment = Column("last_vessel_segment", Boolean) created_at = Column("created_at", DateTime(timezone=True), server_default=func.now()) updated_at = Column("updated_at", DateTime(timezone=True), onupdate=func.now()) + batch = Column("batch", String) class TaskExecution(Base): diff --git a/src/bloom/infra/repositories/repository_excursion.py b/src/bloom/infra/repositories/repository_excursion.py index ddc60309..eba32a4f 100644 --- a/src/bloom/infra/repositories/repository_excursion.py +++ b/src/bloom/infra/repositories/repository_excursion.py @@ -15,7 +15,7 @@ def __init__( ) -> Callable[..., AbstractContextManager]: self.session_factory = session_factory - def get_vessel_current_excursion(self, session: Session, vessel_id: int) -> Union[Excursion | None]: + def get_vessel_current_excursion(self, session: Session, vessel_id: int) -> Union[Excursion , None]: """Recheche l'excursion en cours d'un bateau, c'est-à-dire l'excursion qui n'a pas de date d'arrivée""" sql = select(sql_model.Excursion).where(sql_model.Excursion.vessel_id == vessel_id).where( sql_model.Excursion.arrival_at.isnot(None)) diff --git a/src/bloom/infra/repositories/repository_port.py b/src/bloom/infra/repositories/repository_port.py index eff9d086..96986115 100644 --- a/src/bloom/infra/repositories/repository_port.py +++ b/src/bloom/infra/repositories/repository_port.py @@ -38,8 +38,11 @@ def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]: return [PortRepository.map_to_domain(entity) for entity in q] def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]: - stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after, - sql_model.Port.updated_at >= created_updated_after)) + if created_updated_after is None: + stmt = select(sql_model.Port) + else: + stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after, + sql_model.Port.updated_at >= created_updated_after)) q = session.execute(stmt).scalars() if not q: return [] @@ -62,7 +65,7 @@ def batch_create_port(self, session: Session, ports: list[Port]) -> list[Port]: session.add_all(orm_list) return [PortRepository.map_to_domain(orm) for orm in orm_list] - def find_port_by_position_in_port_buffer(self, session: Session, position: Point) -> Union[Port | None]: + def find_port_by_position_in_port_buffer(self, session: Session, position: Point) -> Union[Port, None]: stmt = select(sql_model.Port).where( func.ST_contains(sql_model.Port.geometry_buffer, from_shape(position, srid=settings.srid)) == True) port = session.execute(stmt).scalar() diff --git a/src/bloom/infra/repositories/repository_vessel.py b/src/bloom/infra/repositories/repository_vessel.py index bbfbda59..8d96470d 100644 --- a/src/bloom/infra/repositories/repository_vessel.py +++ b/src/bloom/infra/repositories/repository_vessel.py @@ -20,7 +20,7 @@ def __init__( ) -> Callable[..., AbstractContextManager]: self.session_factory = session_factory - def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel | None]: + def get_vessel_by_id(self, session: Session, vessel_id: int) -> Union[Vessel, None]: return session.get(sql_model.Vessel, vessel_id) def get_vessels_list(self, session: Session) -> list[Vessel]: diff --git a/src/bloom/logger.py b/src/bloom/logger.py index 6ef1369a..0a1097d0 100644 --- a/src/bloom/logger.py +++ b/src/bloom/logger.py @@ -3,7 +3,7 @@ logger = logging.getLogger("bloom") formatter = logging.Formatter( - "[%(name)s %(levelname)s @ %(asctime)s] %(message)s", + settings.logging_format, datefmt="%H:%M:%S", ) handler = logging.StreamHandler() diff --git a/src/bloom/tasks/compute_port_geometry_buffer.py b/src/bloom/tasks/compute_port_geometry_buffer.py deleted file mode 100644 index 186caa86..00000000 --- a/src/bloom/tasks/compute_port_geometry_buffer.py +++ /dev/null @@ -1,129 +0,0 @@ -from time import perf_counter - -import geopandas as gpd -import pandas as pd -import pyproj -import shapely -from bloom.config import settings -from bloom.container import UseCases -from bloom.logger import logger -from scipy.spatial import Voronoi -from shapely.geometry import LineString, Polygon -from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository -from datetime import datetime, timezone - -radius_m = 3000 # Radius in meters -resolution = 10 # Number of points in the resulting polygon - - -# Function to create geodesic buffer around a point -def geodesic_point_buffer(lat: float, lon: float, radius_m: int, resolution: int) -> Polygon: - """ - Input - lat: latitude of the center point - lon: longitude of the center point - radius_m: radius of the buffer in meters - resolution: number of points in the resulting polygon - """ - geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid - # Create a circle in geodesic coordinates - angles = range(0, 360, 360 // resolution) - circle_points = [] - for angle in angles: - # Calculate the point on the circle for this angle - lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m) - circle_points.append((lon2, lat2)) - # Create a polygon from these points - return Polygon(circle_points) - - -def assign_voronoi_buffer(ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame: - """Computes a buffer around each port such as buffers do not overlap each other - - :param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point" - from the table "dim_ports" - :return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced - by "geometry_buffer" - """ - # Convert to CRS 6933 to write distances as meters (instead of degrees) - ports.to_crs(6933, inplace=True) - - # Create an 8km buffer around each port - # FIXME: maybe put the buffer distance as a global parameter - ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000)) - - # Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933 - ports.to_crs(settings.srid, inplace=True) - - # Convert buffers to 4326 - ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid) - - # Create Voronoi polygons, i.e. match each port to the area which is closest to this port - vor = Voronoi(list(zip(ports.longitude, ports.latitude))) - lines = [] - for line in vor.ridge_vertices: - if -1 not in line: - lines.append(LineString(vor.vertices[line])) - else: - lines.append(LineString()) - vor_polys = [poly for poly in shapely.ops.polygonize(lines)] - - # Match each port to its Voronoi polygon - vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326) - vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy() - ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326) - vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"]) - - # Corner case: ports at extremes latitude and longitude have no Voroni polygon - # (15 / >5800) cases --> duplicate regular buffer instead - vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[ - vor_ports.voronoi_poly.isna(), "buffer" - ] - - # Get intersection between fixed-size buffer and Voronoi polygon to get final buffer - vor_ports["geometry_buffer"] = vor_ports.apply( - lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]), - axis=1 - ) - vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy() - vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326) - vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy() - - return vor_ports - - -def run() -> None: - use_cases = UseCases() - port_repository = use_cases.port_repository() - db = use_cases.db() - items = [] - with db.session() as session: - point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer") - logger.info(f"Point in time={point_in_time}") - now = datetime.now(timezone.utc) - ports = port_repository.get_ports_updated_created_after(session, point_in_time) - if ports: - df = pd.DataFrame( - [[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports], - columns=["id", "geometry_point", "latitude", "longitude"], - ) - gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) - - # Apply the buffer function to create buffers - gdf = assign_voronoi_buffer(gdf) - - for row in gdf.itertuples(): - items.append({"id": row.id, "geometry_buffer": row.geometry_buffer}) - port_repository.batch_update_geometry_buffer(session, items) - TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now) - session.commit() - logger.info(f"{len(items)} buffer de ports mis à jour") - - -if __name__ == "__main__": - time_start = perf_counter() - logger.info("DEBUT - Calcul des buffer de ports") - run() - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s") diff --git a/src/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py b/src/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py deleted file mode 100644 index df120b9e..00000000 --- a/src/bloom/tasks/convert_spire_vessels_to_spire_ais_data.py +++ /dev/null @@ -1,88 +0,0 @@ -from time import perf_counter -from typing import Generator - -from bloom.container import UseCases -from bloom.domain.spire_ais_data import SpireAisData -from bloom.infra.database.sql_model import VesselPositionSpire -from bloom.logger import logger -from geoalchemy2.shape import to_shape -from shapely import Point -from sqlalchemy.orm.session import Session - -use_cases = UseCases() -vessel_repo = use_cases.vessel_repository() -spire_ais_data_repo = use_cases.spire_ais_data_repository() -db = use_cases.db() -batch_size = 1000 - - -def map_to_ais_spire_data(vessel_position: VesselPositionSpire) -> SpireAisData: - position: Point = to_shape(vessel_position.position) - return SpireAisData( - spire_update_statement=vessel_position.timestamp, - vessel_ais_class=None, - vessel_flag=None, - vessel_name=vessel_position.ship_name, - vessel_callsign=None, - vessel_timestamp=None, - vessel_update_timestamp=None, - vessel_ship_type=None, - vessel_sub_ship_type=None, - vessel_mmsi=vessel_position.mmsi, - vessel_imo=vessel_position.IMO, - vessel_width=vessel_position.vessel_width, - vessel_length=vessel_position.vessel_length, - position_accuracy=vessel_position.accuracy, - position_collection_type=vessel_position.position_sensors, - position_course=vessel_position.course, - position_heading=vessel_position.heading, - position_latitude=position.x, - position_longitude=position.y, - position_maneuver=None, - position_navigational_status=vessel_position.navigation_status, - position_rot=vessel_position.rot, - position_speed=vessel_position.speed, - position_timestamp=vessel_position.last_position_time, - position_update_timestamp=vessel_position.last_position_time, - voyage_destination=vessel_position.voyage_destination, - voyage_draught=vessel_position.voyage_draught, - voyage_eta=vessel_position.voyage_draught, - voyage_timestamp=None, - voyage_update_timestamp=None, - ) - - -def batch_convert(session: Session) -> Generator[list[SpireAisData], None, None]: - batch = [] - for vessel_position in vessel_repo.get_all_spire_vessels_position(session, batch_size): - batch.append(map_to_ais_spire_data(vessel_position)) - if len(batch) >= batch_size: - yield batch - batch = [] - yield batch - - -def run() -> None: - count = 0 - try: - with db.session() as session: - for b in batch_convert(session): - count = count + len(b) - spire_ais_data_repo.batch_create_ais_data(b, session) - session.commit() - except Exception as e: - session.rollback() - logger.error("Erreur lors de conversion, transaction ROLLBACK") - logger.error(e) - logger.info(f"{count} enregistrements convertis") - - -if __name__ == "__main__": - time_start = perf_counter() - logger.info( - f"Conversion spire_vessel_positions -> spire_ais_data (taille des lots: {batch_size})" - ) - run() - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"Conversion spire_vessel_positions -> spire_ais_data en {duration:.2f}s") diff --git a/src/bloom/tasks/load_dim_port_from_csv.py b/src/bloom/tasks/load_dim_port_from_csv.py deleted file mode 100644 index 00a2eef8..00000000 --- a/src/bloom/tasks/load_dim_port_from_csv.py +++ /dev/null @@ -1,62 +0,0 @@ -from pathlib import Path -from time import perf_counter - -import geopandas as gpd -import pandas as pd -import pycountry -from bloom.config import settings -from bloom.container import UseCases -from bloom.domain.port import Port -from bloom.infra.database.errors import DBException -from bloom.logger import logger -from pydantic import ValidationError -from shapely import wkt - - -def map_to_domain(row) -> Port: - iso_code = pycountry.countries.get(name=row["country"]) - iso_code = iso_code.alpha_3 if iso_code is not None else "XXX" - - return Port( - name=row["port"], - locode=row["locode"], - url=row["url"], - country_iso3=iso_code, - latitude=float(row["latitude"]), - longitude=float(row["longitude"]), - geometry_point=row["geometry_point"], - ) - - -def run(csv_file_name: str) -> None: - use_cases = UseCases() - port_repository = use_cases.port_repository() - db = use_cases.db() - - ports = [] - total = 0 - try: - df = pd.read_csv(csv_file_name, sep=";") - df["geometry_point"] = df["geometry_point"].apply(wkt.loads) - gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) - ports = gdf.apply(map_to_domain, axis=1) - with db.session() as session: - ports = port_repository.batch_create_port(session, list(ports)) - session.commit() - total = len(ports) - except ValidationError as e: - logger.error("Erreur de validation des données de port") - logger.error(e.errors()) - except DBException as e: - logger.error("Erreur d'insertion en base") - logger.info(f"{total} ports(s) créés") - - -if __name__ == "__main__": - time_start = perf_counter() - file_name = Path(settings.data_folder).joinpath("./ports.csv") - logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {file_name}") - run(file_name) - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Chargement des données de ports en {duration:.2f}s") diff --git a/src/bloom/tasks/load_dim_vessel_from_csv.py b/src/bloom/tasks/load_dim_vessel_from_csv.py deleted file mode 100644 index 52e4686c..00000000 --- a/src/bloom/tasks/load_dim_vessel_from_csv.py +++ /dev/null @@ -1,97 +0,0 @@ -from pathlib import Path -from time import perf_counter - -import pandas as pd -from bloom.config import settings -from bloom.container import UseCases -from bloom.domain.vessel import Vessel -from bloom.infra.database.errors import DBException -from bloom.logger import logger -from pydantic import ValidationError - - -def map_to_domain(row: pd.Series) -> Vessel: - isna = row.isna() - return Vessel( - id=int(row["id"]) if not isna["id"] else None, - mmsi=int(row["mmsi"]) if not isna["mmsi"] else None, - ship_name=row["ship_name"], - width=int(row["width"]) if not isna["width"] else None, - length=int(row["length"]) if not isna["length"] else None, - country_iso3=row["country_iso3"] if not isna["country_iso3"] else "XXX", - type=row["type"], - imo=row["imo"] if not isna["imo"] else None, - cfr=row["cfr"] if not isna["cfr"] else None, - registration_number=row["registration_number"] - if not isna["registration_number"] - else None, - external_marking=row["external_marking"] if not isna["external_marking"] else None, - ircs=row["ircs"] if not isna["ircs"] else None, - tracking_activated=row["tracking_activated"], - tracking_status=row["tracking_status"] if not isna["tracking_status"] else None, - ) - - -def run(csv_file_name: str) -> None: - use_cases = UseCases() - vessel_repository = use_cases.vessel_repository() - db = use_cases.db() - - inserted_ports = [] - deleted_ports = [] - try: - df = pd.read_csv(csv_file_name, sep=";") - vessels = df.apply(map_to_domain, axis=1) - with db.session() as session: - ports_inserts = [] - ports_updates = [] - # Pour chaque enregistrement du fichier CSV - for vessel in vessels: - if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id): - # si la valeur du champ id n'est pas vide: - # rechercher l'enregistrement correspondant dans la table dim_vessel - # mettre à jour l'enregistrement à partir des données CSV. - ports_updates.append(vessel) - else: - # sinon: - # insérer les données CSV dans la table dim_vessel; - ports_inserts.append(vessel) - # Insertions / MAJ en batch - inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts) - vessel_repository.batch_update_vessel(session, ports_updates) - - # En fin de traitement: - # les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour - # avec la valeur tracking_activated=FALSE - csv_mmsi = list(df['mmsi']) - deleted_ports = list( - filter(lambda v: v.mmsi not in csv_mmsi, vessel_repository.get_all_vessels_list(session))) - vessel_repository.set_tracking(session, [v.id for v in deleted_ports], False, - "Suppression logique suite import nouveau fichier CSV") - # le traitement vérifie qu'il n'existe qu'un seul enregistrement à l'état tracking_activated==True - # pour chaque valeur distincte de MMSI. - integrity_errors = vessel_repository.check_mmsi_integrity(session) - if not integrity_errors: - session.commit() - else: - logger.error( - f"Erreur d'intégrité fonctionnelle, plusieurs bateaux actifs avec le même MMSI: {integrity_errors}") - session.rollback() - except ValidationError as e: - logger.error("Erreur de validation des données de bateau") - logger.error(e.errors()) - except DBException: - logger.error("Erreur d'insertion en base") - logger.info(f"{len(inserted_ports)} bateau(x) créés") - logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés") - logger.info(f"{len(deleted_ports)} bateau(x) désactivés") - - -if __name__ == "__main__": - time_start = perf_counter() - file_name = Path(settings.data_folder).joinpath("./chalutiers_pelagiques.csv") - logger.info(f"DEBUT - Chargement des données de bateaux depuis le fichier {file_name}") - run(file_name) - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Chargement des données de bateaux en {duration:.2f}s") diff --git a/src/bloom/tasks/load_dim_zone_amp_from_csv.py b/src/bloom/tasks/load_dim_zone_amp_from_csv.py deleted file mode 100644 index cf5a711a..00000000 --- a/src/bloom/tasks/load_dim_zone_amp_from_csv.py +++ /dev/null @@ -1,67 +0,0 @@ -from pathlib import Path -from time import perf_counter - -import pandas as pd -from bloom.config import settings -from bloom.container import UseCases -from bloom.domain.zone import Zone -from bloom.infra.database.errors import DBException -from bloom.logger import logger -from pydantic import ValidationError -from shapely import wkb - - -def map_to_domain(row: pd.Series) -> Zone: - isna = row.isna() - - return Zone( - category="amp", - sub_category=None, - name=row["name"], - geometry=row["geometry"], - centroid=row["geometry"].centroid, - json_data={k: row[k] if not isna[k] else None for k in - ["index", "desig_eng", "desig_type", "iucn_cat", "parent_iso", "iso3", "benificiaries"]}, - ) - - -def run(csv_file_name: str): - use_cases = UseCases() - db = use_cases.db() - zone_repository = use_cases.zone_repository() - - total = 0 - try: - df = pd.read_csv(csv_file_name, sep=",") - df = df.rename(columns={"Geometry": "geometry", - "Index": "index", "WDPAID": "wdpaid", - "Name": "name", - "DESIG_ENG": "desig_eng", - "DESIG_TYPE": "desig_type", - "IUCN_CAT": "iucn_cat", - "PARENT_ISO": "parent_iso", - "ISO3": "iso3", - "Benificiaries": "benificiaries"}) - df["geometry"] = df["geometry"].apply(wkb.loads) - zones = df.apply(map_to_domain, axis=1) - with db.session() as session: - zones = zone_repository.batch_create_zone(session, list(zones)) - session.commit() - total = len(zones) - print(zones) - except ValidationError as e: - logger.error("Erreur de validation des données de bateau") - logger.error(e.errors()) - except DBException: - logger.error("Erreur d'insertion en base") - logger.info(f"{total} bateau(x) créés") - - -if __name__ == "__main__": - time_start = perf_counter() - file_name = Path(settings.data_folder).joinpath("./zones_subset.csv") - logger.info(f"DEBUT - Chargement des données des zones AMP depuis le fichier {file_name}") - run(file_name) - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Chargement des données des zones AMP en {duration:.2f}s") diff --git a/src/bloom/tasks/load_spire_data_from_json.py b/src/bloom/tasks/load_spire_data_from_json.py deleted file mode 100644 index 04fbf907..00000000 --- a/src/bloom/tasks/load_spire_data_from_json.py +++ /dev/null @@ -1,43 +0,0 @@ -import argparse -import json -from pathlib import Path -from time import perf_counter - -from bloom.container import UseCases -from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain -from bloom.logger import logger -from pydantic import ValidationError - - -def run(file_name: str) -> None: - use_cases = UseCases() - spire_ais_data_repository = use_cases.spire_ais_data_repository() - db = use_cases.db() - - logger.info(f"Loading spire data from {file_name}") - orm_data = [] - try: - with Path.open(file_name) as json_data, db.session() as session: - raw_vessels = json.load(json_data) - spire_ais_data = map_raw_vessels_to_domain(raw_vessels) - orm_data = spire_ais_data_repository.batch_create_ais_data(spire_ais_data, session) - session.commit() - except ValidationError as e: - logger.error("Erreur de validation des données JSON") - logger.error(e.errors()) - logger.info(f"{len(orm_data)} vessel data loaded") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Load Spire data from file JSON file") - parser.add_argument( - "filename", - help="Path to JSON file to load", - ) - args = parser.parse_args() - time_start = perf_counter() - logger.info(f"DEBUT - Chargement des données JSON depuis le fichier {args.filename}") - run(args.filename) - time_end = perf_counter() - duration = time_end - time_start - logger.info(f"FIN - Chargement des données JSON en {duration:.2f}s") diff --git a/src/tasks/base.py b/src/tasks/base.py new file mode 100644 index 00000000..73919f5e --- /dev/null +++ b/src/tasks/base.py @@ -0,0 +1,53 @@ +from bloom.logger import logger + +import hashlib +from datetime import datetime + +class BaseTask(): + args = None + kwargs = None + _stop_on_error: bool = True + + @property + def stop_on_error(self) -> bool: + return self._stop_on_error + + @stop_on_error.setter + def stop_on_error(self, value: bool) -> None: + self._stop_on_error = value + + def __init__(self, *args, **kwargs): + if 'batch' not in kwargs: + kwargs['batch']=datetime.now().strftime(f"{self.__class__.__name__}-%y%m%d%H%M%S%f") + self.args = args + self.kwargs = kwargs + + def start(self) -> None: + logger.info(f"Starting task {self.__class__.__name__}") + try: + self.run(*self.args, **self.kwargs) + logger.info(f"Task {self.__class__.__name__} sucess") + self.on_success(*self.args, **self.kwargs) + except Exception as e: + logger.error(f"Task {self} did not finished correctly. {e}") + self.on_error(*self.args, **self.kwargs) + raise(e) + #if self.stop_on_error: + # exit(e.args[0]) + #else: + # raise (e) + + def run(self, *args, **kwargs) -> None: + logger.info(f"Task {self.__class__.__name__} success") + pass + + def on_success(self,*args, **kwargs) -> None: + logger.info(f"Task {self.__class__.__name__} success") + pass + + def on_error(self,*args, **kwargs) -> None: + logger.error(f"Task {self.__class__.__name__} on_error") + pass + + def stop(self) -> None: + pass diff --git a/src/tasks/data/__init__.py b/src/tasks/data/__init__.py new file mode 100644 index 00000000..93f943db --- /dev/null +++ b/src/tasks/data/__init__.py @@ -0,0 +1,3 @@ +from .load_amp_data import * +from .load_port_data import * +from .load_vessels_data import * \ No newline at end of file diff --git a/src/tasks/data/load_amp_data.py b/src/tasks/data/load_amp_data.py new file mode 100644 index 00000000..9db19dfe --- /dev/null +++ b/src/tasks/data/load_amp_data.py @@ -0,0 +1,37 @@ +import geopandas as gpd +import pandas as pd +from bloom.config import settings +from bloom.logger import logger +from shapely import wkb +from sqlalchemy import create_engine +from tasks.base import BaseTask + + +class LoadAmpDataTask(BaseTask): + def run(self, *args, **kwargs): + logger.debug(f"args={args} kwargs={kwargs}") + engine = create_engine(settings.db_url, echo=False) + df = pd.read_csv( + settings.amp_data_csv_path, + sep=",", + ) + + df = df.rename(columns={"Geometry": "geometry", + "Index": "index", "Wdpaid": "WDPAID", + "Name": "name", + "Desig Eng": "DESIG_ENG", + "Desig Type": "DESIG_TYPE", + "Iucn Cat": "IUCN_CAT", + "Parent Iso": "PARENT_ISO", + "Iso3": "ISO3", + "Beneficiaries": "BENEFICIARIES"}) + + df['geometry'] = df['geometry'].apply(wkb.loads) + gdf = gpd.GeoDataFrame(df, crs='epsg:4326') + gdf.head() + + gdf.to_postgis("mpa_fr_with_mn", con=engine, if_exists="replace", index=False) + + +if __name__ == "__main__": + LoadAmpDataTask().start() diff --git a/src/tasks/data/load_port_data.py b/src/tasks/data/load_port_data.py new file mode 100644 index 00000000..65ced95d --- /dev/null +++ b/src/tasks/data/load_port_data.py @@ -0,0 +1,20 @@ +import pandas as pd +from bloom.config import settings +from sqlalchemy import create_engine +from tasks.base import BaseTask + + +class LoadPortDataTask(BaseTask): + def run(self, *args, **kwargs): + engine = create_engine(settings.db_url, echo=False) + + df = pd.read_csv( + settings.port_polygon_data_csv_path, + sep=";", + ) + + df.to_sql("ports", engine, if_exists="append", index=False) + + +if __name__ == "__main__": + LoadPortDataTask().start() diff --git a/src/tasks/data/load_vessels_data.py b/src/tasks/data/load_vessels_data.py new file mode 100644 index 00000000..eb0be606 --- /dev/null +++ b/src/tasks/data/load_vessels_data.py @@ -0,0 +1,23 @@ +import pandas as pd +from sqlalchemy import create_engine + +from bloom.config import settings +from tasks.base import BaseTask + + +class LoadVesselsDataTask(BaseTask): + def run(self, *args, **kwargs): + engine = create_engine(settings.db_url) + df = pd.read_csv( + settings.vessel_data_csv_path, + sep=";", + dtype={"loa": float, "IMO": str}, + ) + + df = df.drop(columns="comment", errors='ignore') + + df.to_sql("vessels", engine, if_exists="append", index=False) + + +if __name__ == "__main__": + LoadVesselsDataTask().start() diff --git a/src/tasks/dimensions/__init__.py b/src/tasks/dimensions/__init__.py new file mode 100644 index 00000000..f928c0a3 --- /dev/null +++ b/src/tasks/dimensions/__init__.py @@ -0,0 +1,5 @@ +from .load_dim_port_from_csv import * +from .load_dim_vessel_from_csv import * +from .load_dim_zone_amp_from_csv import * + +from .compute_port_geometry_buffer import * \ No newline at end of file diff --git a/src/tasks/dimensions/compute_port_geometry_buffer.py b/src/tasks/dimensions/compute_port_geometry_buffer.py new file mode 100644 index 00000000..7bb041d0 --- /dev/null +++ b/src/tasks/dimensions/compute_port_geometry_buffer.py @@ -0,0 +1,129 @@ +from datetime import datetime, timezone +from time import perf_counter + +import geopandas as gpd +import pandas as pd +import pyproj +import shapely +from bloom.config import settings +from bloom.container import UseCases +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository +from bloom.logger import logger +from scipy.spatial import Voronoi +from shapely.geometry import LineString, Polygon +from tasks.base import BaseTask + + +class ComputePortGeometryBuffer(BaseTask): + radius_m = settings.port_radius_m # Radius in meters + resolution = settings.port_resolution # Number of points in the resulting polygon + + # Function to create geodesic buffer around a point + def geodesic_point_buffer(self, lat: float, lon: float, radius_m: int, resolution: int) -> Polygon: + """ + Input + lat: latitude of the center point + lon: longitude of the center point + radius_m: radius of the buffer in meters + resolution: number of points in the resulting polygon + """ + geod = pyproj.Geod(ellps="WGS84") # Define the ellipsoid + # Create a circle in geodesic coordinates + angles = range(0, 360, 360 // resolution) + circle_points = [] + for angle in angles: + # Calculate the point on the circle for this angle + lon2, lat2, _ = geod.fwd(lon, lat, angle, radius_m) + circle_points.append((lon2, lat2)) + # Create a polygon from these points + return Polygon(circle_points) + + def assign_voronoi_buffer(self, ports: gpd.GeoDataFrame) -> gpd.GeoDataFrame: + """Computes a buffer around each port such as buffers do not overlap each other + + :param gpd.GeoDataFrame ports: fields "id", "latitude", "longitude", "geometry_point" + from the table "dim_ports" + :return gpd.GeoDataFrame: same as input but field "geometry_point" is replaced + by "geometry_buffer" + """ + # Convert to CRS 6933 to write distances as meters (instead of degrees) + ports.to_crs(6933, inplace=True) + + # Create an 8km buffer around each port + # FIXME: maybe put the buffer distance as a global parameter + ports["buffer"] = ports["geometry_point"].apply(lambda p: shapely.buffer(p, 8000)) + + # Convert points back to CRS 4326 (lat/lon) --> buffers are still expressed in 6933 + ports.to_crs(settings.srid, inplace=True) + + # Convert buffers to 4326 + ports = gpd.GeoDataFrame(ports, geometry="buffer", crs=6933).to_crs(settings.srid) + + # Create Voronoi polygons, i.e. match each port to the area which is closest to this port + vor = Voronoi(list(zip(ports.longitude, ports.latitude))) + lines = [] + for line in vor.ridge_vertices: + if -1 not in line: + lines.append(LineString(vor.vertices[line])) + else: + lines.append(LineString()) + vor_polys = [poly for poly in shapely.ops.polygonize(lines)] + + # Match each port to its Voronoi polygon + vor_gdf = gpd.GeoDataFrame(geometry=gpd.GeoSeries(vor_polys), crs=4326) + vor_gdf["voronoi_poly"] = vor_gdf["geometry"].copy() + ports = gpd.GeoDataFrame(ports, geometry="geometry_point", crs=4326) + vor_ports = ports.sjoin(vor_gdf, how="left").drop(columns=["index_right"]) + + # Corner case: ports at extremes latitude and longitude have no Voroni polygon + # (15 / >5800) cases --> duplicate regular buffer instead + vor_ports.loc[vor_ports.voronoi_poly.isna(), "voronoi_poly"] = vor_ports.loc[ + vor_ports.voronoi_poly.isna(), "buffer" + ] + + # Get intersection between fixed-size buffer and Voronoi polygon to get final buffer + vor_ports["geometry_buffer"] = vor_ports.apply( + lambda row: shapely.intersection(row["buffer"], row["voronoi_poly"]), + axis=1 + ) + vor_ports["buffer_voronoi"] = vor_ports["geometry_buffer"].copy() + vor_ports = gpd.GeoDataFrame(vor_ports, geometry="geometry_buffer", crs=4326) + vor_ports = vor_ports[["id", "latitude", "longitude", "geometry_buffer"]].copy() + + return vor_ports + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + port_repository = use_cases.port_repository() + db = use_cases.db() + items = [] + with db.session() as session: + point_in_time = TaskExecutionRepository.get_point_in_time(session, "compute_port_geometry_buffer") + logger.info(f"Point in time={point_in_time}") + now = datetime.now(timezone.utc) + ports = port_repository.get_ports_updated_created_after(session, point_in_time) + if ports: + df = pd.DataFrame( + [[p.id, p.geometry_point, p.latitude, p.longitude] for p in ports], + columns=["id", "geometry_point", "latitude", "longitude"], + ) + gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) + + # Apply the buffer function to create buffers + gdf = self.assign_voronoi_buffer(gdf) + + for row in gdf.itertuples(): + items.append({"id": row.id, "geometry_buffer": row.geometry_buffer}) + port_repository.batch_update_geometry_buffer(session, items) + TaskExecutionRepository.set_point_in_time(session, "compute_port_geometry_buffer", now) + session.commit() + logger.info(f"{len(items)} buffer de ports mis à jour") + + +if __name__ == "__main__": + time_start = perf_counter() + logger.info("DEBUT - Calcul des buffer de ports") + ComputePortGeometryBuffer().start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Calcul des buffer de ports en {duration:.2f}s") diff --git a/src/tasks/dimensions/load_dim_port_from_csv.py b/src/tasks/dimensions/load_dim_port_from_csv.py new file mode 100644 index 00000000..9adf395d --- /dev/null +++ b/src/tasks/dimensions/load_dim_port_from_csv.py @@ -0,0 +1,66 @@ +from time import perf_counter + +import geopandas as gpd +import pandas as pd +import pycountry +from bloom.config import settings +from bloom.container import UseCases +from bloom.domain.port import Port +from bloom.infra.database.errors import DBException +from bloom.logger import logger +from pydantic import ValidationError +from shapely import wkt +from tasks.base import BaseTask + + +class LoadDimPortFromCsv(BaseTask): + def map_to_domain(self, row) -> Port: + iso_code = pycountry.countries.get(name=row["country"]) + iso_code = iso_code.alpha_3 if iso_code is not None else "XXX" + + return Port( + name=row["port"], + locode=row["locode"], + url=row["url"], + country_iso3=iso_code, + latitude=float(row["latitude"]), + longitude=float(row["longitude"]), + geometry_point=row["geometry_point"], + ) + + def run(self, *args, **kwargs): + use_cases = UseCases() + port_repository = use_cases.port_repository() + db = use_cases.db() + + csv_file_name = kwargs['port_data_csv_path'] if 'port_data_csv_path' in kwargs \ + else settings.port_data_csv_path + + ports = [] + total = 0 + try: + df = pd.read_csv(csv_file_name, sep=";") + df["geometry_point"] = df["geometry_point"].apply(wkt.loads) + gdf = gpd.GeoDataFrame(df, geometry="geometry_point", crs=settings.srid) + ports = gdf.apply(self.map_to_domain, axis=1) + with db.session() as session: + ports = port_repository.batch_create_port(session, list(ports)) + session.commit() + total = len(ports) + except ValidationError as e: + logger.error("Erreur de validation des données de port") + logger.error(e.errors()) + raise(e) + except DBException as e: + logger.error("Erreur d'insertion en base") + raise(e) + logger.info(f"{total} ports(s) créés") + + +if __name__ == "__main__": + time_start = perf_counter() + logger.info(f"DEBUT - Chargement des données de ports depuis le fichier {settings.port_data_csv_path}") + LoadDimPortFromCsv(port_data_csv_path=settings.port_data_csv_path).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données de ports en {duration:.2f}s") diff --git a/src/tasks/dimensions/load_dim_vessel_from_csv.py b/src/tasks/dimensions/load_dim_vessel_from_csv.py new file mode 100644 index 00000000..f751f582 --- /dev/null +++ b/src/tasks/dimensions/load_dim_vessel_from_csv.py @@ -0,0 +1,99 @@ +from time import perf_counter + +import pandas as pd +from bloom.config import settings +from bloom.container import UseCases +from bloom.domain.vessel import Vessel +from bloom.infra.database.errors import DBException +from bloom.logger import logger +from pydantic import ValidationError +from tasks.base import BaseTask + + +class LoadDimVesselFromCsv(BaseTask): + def map_to_domain(self, row: pd.Series) -> Vessel: + isna = row.isna() + return Vessel( + mmsi=int(row["mmsi"]) if not isna["mmsi"] else None, + ship_name=row["ship_name"], + width=int(row["width"]) if not isna["width"] else None, + length=int(row["length"]) if not isna["length"] else None, + country_iso3=row["country_iso3"] if not isna["country_iso3"] else "XXX", + type=row["type"], + imo=row["imo"] if not isna["imo"] else None, + cfr=row["cfr"] if not isna["cfr"] else None, + registration_number=row["registration_number"] + if not isna["registration_number"] + else None, + external_marking=row["external_marking"] if not isna["external_marking"] else None, + ircs=row["ircs"] if not isna["ircs"] else None, + tracking_activated=row["tracking_activated"], + tracking_status=row["tracking_status"] if not isna["tracking_status"] else None, + ) + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + vessel_repository = use_cases.vessel_repository() + db = use_cases.db() + + csv_file_name = kwargs['vessel_data_csv_path'] if 'vessel_data_csv_path' in kwargs \ + else settings.vessel_data_csv_path + + inserted_ports = [] + deleted_ports = [] + try: + df = pd.read_csv(csv_file_name, sep=",") + vessels = df.apply(self.map_to_domain, axis=1) + with db.session() as session: + ports_inserts = [] + ports_updates = [] + # Pour chaque enregistrement du fichier CSV + for vessel in vessels: + if vessel.id and vessel_repository.get_vessel_by_id(session, vessel.id): + # si la valeur du champ id n'est pas vide: + # rechercher l'enregistrement correspondant dans la table dim_vessel + # mettre à jour l'enregistrement à partir des données CSV. + ports_updates.append(vessel) + else: + # sinon: + # insérer les données CSV dans la table dim_vessel; + ports_inserts.append(vessel) + # Insertions / MAJ en batch + inserted_ports = vessel_repository.batch_create_vessel(session, ports_inserts) + vessel_repository.batch_update_vessel(session, ports_updates) + + # En fin de traitement: + # les enregistrements de la table dim_vessel pourtant un MMSI absent du fichier CSV sont mis à jour + # avec la valeur tracking_activated=FALSE + csv_mmsi = list(df['mmsi']) + deleted_ports = list( + filter(lambda v: v.mmsi not in csv_mmsi, vessel_repository.get_all_vessels_list(session))) + vessel_repository.set_tracking(session, [v.id for v in deleted_ports], False, + "Suppression logique suite import nouveau fichier CSV") + # le traitement vérifie qu'il n'existe qu'un seul enregistrement à l'état tracking_activated==True + # pour chaque valeur distincte de MMSI. + integrity_errors = vessel_repository.check_mmsi_integrity(session) + if not integrity_errors: + session.commit() + else: + logger.error( + f"Erreur d'intégrité fonctionnelle, plusieurs bateaux actifs avec le même MMSI: {integrity_errors}") + session.rollback() + except ValidationError as e: + logger.error("Erreur de validation des données de bateau") + logger.error(e.errors()) + except DBException: + logger.error("Erreur d'insertion en base") + logger.info(f"{len(inserted_ports)} bateau(x) créés") + logger.info(f"{len(ports_updates)} bateau(x) mise à jour ou inchangés") + logger.info(f"{len(deleted_ports)} bateau(x) désactivés") + + +if __name__ == "__main__": + time_start = perf_counter() + file_name = settings.vessel_data_csv_path + logger.info(f"DEBUT - Chargement des données de bateaux depuis le fichier {file_name}") + LoadDimVesselFromCsv(vessel_data_csv_path=file_name).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données de bateaux en {duration:.2f}s") diff --git a/src/tasks/dimensions/load_dim_zone_amp_from_csv.py b/src/tasks/dimensions/load_dim_zone_amp_from_csv.py new file mode 100644 index 00000000..270f0498 --- /dev/null +++ b/src/tasks/dimensions/load_dim_zone_amp_from_csv.py @@ -0,0 +1,68 @@ +from time import perf_counter + +import pandas as pd +from bloom.config import settings +from bloom.container import UseCases +from bloom.domain.zone import Zone +from bloom.infra.database.errors import DBException +from bloom.logger import logger +from pydantic import ValidationError +from shapely import wkb +from tasks.base import BaseTask + + +class LoadDimZoneAmpFromCsv(BaseTask): + def map_to_domain(self, row: pd.Series) -> Zone: + isna = row.isna() + + return Zone( + category="amp", + sub_category=None, + name=row["name"], + geometry=row["geometry"], + centroid=row["geometry"].centroid, + json_data={k: row[k] if not isna[k] else None for k in + ["index", "desig_eng", "desig_type", "iucn_cat", "parent_iso", "iso3", "benificiaries"]}, + ) + + def run(self, *args, **kwargs): + use_cases = UseCases() + db = use_cases.db() + zone_repository = use_cases.zone_repository() + + amp_data_csv_path= kwargs['amp_data_csv_path'] if 'amp_data_csv_path' in kwargs else settings.amp_data_csv_path + + total = 0 + try: + df = pd.read_csv(amp_data_csv_path, sep=',') + df = df.rename(columns={"Geometry": "geometry", + "Index": "index", "WDPAID": "wdpaid", + "Name": "name", + "DESIG_ENG": "desig_eng", + "DESIG_TYPE": "desig_type", + "IUCN_CAT": "iucn_cat", + "PARENT_ISO": "parent_iso", + "ISO3": "iso3", + "Benificiaries": "benificiaries"}) + df["geometry"] = df["geometry"].apply(wkb.loads) + zones = df.apply(self.map_to_domain, axis=1) + with db.session() as session: + zones = zone_repository.batch_create_zone(session, list(zones)) + session.commit() + total = len(zones) + except ValidationError as e: + logger.error("Erreur de validation des données de zone AMP") + logger.error(e.errors()) + except DBException: + logger.error("Erreur d'insertion en base") + logger.info(f"{total} zone(s) AMP créés") + + +if __name__ == "__main__": + time_start = perf_counter() + file_name = settings.amp_data_csv_path + logger.info(f"DEBUT - Chargement des données des zones AMP depuis le fichier {file_name}") + LoadDimZoneAmpFromCsv(amp_data_csv_path=file_name).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données des zones AMP en {duration:.2f}s") diff --git a/src/tasks/facts/__init__.py b/src/tasks/facts/__init__.py new file mode 100644 index 00000000..b91b1195 --- /dev/null +++ b/src/tasks/facts/__init__.py @@ -0,0 +1,5 @@ +from .clean_positions import * +from .extract_spire_data_from_api import * +from .extract_spire_data_from_csv import * +from .extract_spire_data_from_json import * +from .extract_vessel_positions_data import * diff --git a/src/tasks/facts/clean_positions.py b/src/tasks/facts/clean_positions.py new file mode 100644 index 00000000..45676491 --- /dev/null +++ b/src/tasks/facts/clean_positions.py @@ -0,0 +1,81 @@ +from datetime import datetime, timezone +from time import perf_counter + +from bloom.container import UseCases +from bloom.domain.spire_ais_data import SpireAisData +from bloom.domain.vessel import Vessel +from bloom.domain.vessel_position import VesselPosition +from bloom.infra.repositories.repository_spire_ais_data import SpireAisDataRepository +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository +from bloom.logger import logger +from shapely import Point +from tasks.base import BaseTask + + +class CleanPositionsTask(BaseTask): + def map_ais_data_to_vessel_position(ais_data: SpireAisData, vessel: Vessel) -> VesselPosition: + return VesselPosition( + timestamp=ais_data.position_timestamp, + accuracy=ais_data.position_accuracy, + collection_type=ais_data.position_collection_type, + course=ais_data.position_course, + heading=ais_data.position_heading, + position=Point(ais_data.position_longitude, ais_data.position_latitude), + latitude=ais_data.position_latitude, + longitude=ais_data.position_longitude, + maneuver=ais_data.position_maneuver, + navigational_status=ais_data.position_navigational_status, + rot=ais_data.position_rot, + speed=ais_data.position_speed, + vessel_id=vessel.id + ) + + def run(self, *args, **kwargs): + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + vessel_repository = use_cases.vessel_repository() + port_repository = use_cases.port_repository() + vessel_position_repository = use_cases.vessel_position_repository() + db = use_cases.db() + with db.session() as session: + point_in_time = TaskExecutionRepository.get_point_in_time(session, "clean_positions") + logger.info(f"Point in time={point_in_time}") + now = datetime.now(timezone.utc) + nb_donnees = 0 + nb_au_port = 0 + nb_pas_au_port = 0 + vessels = vessel_repository.load_vessel_metadata(session) + logger.info(f"{len(vessels)} bateaux à traiter") + # Foreach vessel + for vessel in vessels: + # Recheche des données AIS de chaque bateau créées depuis la dernière exécution du traitement (point in time) + spire_datas = spire_ais_data_repository.get_all_data_by_mmsi(session, vessel.mmsi, + SpireAisDataRepository.ORDER_BY_POSITION, + point_in_time) + for spire_data in spire_datas: + nb_donnees += 1 + # Foreach position + position = Point(spire_data.position_longitude, spire_data.position_latitude) + port = port_repository.find_port_by_position_in_port_buffer(session, position) + if not port: + vessel_position = map_ais_data_to_vessel_position(spire_data, vessel) + vessel_position_repository.create_vessel_position(session, vessel_position) + nb_pas_au_port += 1 + else: + nb_au_port += 1 + # TODO: A poursuivre, voir MIRO pour l'algorithme + pass + TaskExecutionRepository.set_point_in_time(session, "clean_positions", now) + session.commit() + logger.info(f"{nb_donnees} données SPIRE traitées") + logger.info(f"{nb_au_port} données ignorées pour des bateaux au port") + logger.info(f"{nb_pas_au_port} données importées dans vessel_positions") + + +if __name__ == "__main__": + time_start = perf_counter() + logger.info("DEBUT - Nettoyage des positions") + CleanPositionsTask().start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Nettoyage des positions en {duration:.2f}s") diff --git a/src/tasks/facts/extract_spire_data_from_api.py b/src/tasks/facts/extract_spire_data_from_api.py new file mode 100644 index 00000000..df60e077 --- /dev/null +++ b/src/tasks/facts/extract_spire_data_from_api.py @@ -0,0 +1,68 @@ +import argparse +import json +from datetime import datetime, timezone +from pathlib import Path +from time import perf_counter + +from bloom.container import UseCases +from bloom.domain.vessel import Vessel +from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain +from bloom.logger import logger +from pydantic import ValidationError +from tasks.base import BaseTask + + +class ExtractSpireDataFromApi(BaseTask): + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + spire_traffic_usecase = use_cases.get_spire_data_usecase() + vessel_repository = use_cases.vessel_repository() + db = use_cases.db() + + orm_data = [] + try: + with db.session() as session: + vessels: list[Vessel] = vessel_repository.load_all_vessel_metadata(session) + + raw_vessels = spire_traffic_usecase.get_raw_vessels_from_spire(vessels) + if kwargs['dump_path'] is not None: + try: + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + dump_file = Path(kwargs['dump_path'], f"spire_{now}").with_suffix(".json") + with dump_file.open("wt") as handle: + json.dump(raw_vessels, handle) + except Exception as e: + logger.warning("Echec de l'écriture de la réponse Spire", exc_info=e) + else: + spire_ais_data = map_raw_vessels_to_domain(raw_vessels) + orm_data = spire_ais_data_repository.batch_create_ais_data( + spire_ais_data, + session, + ) + session.commit() + except ValidationError as e: + logger.error("Erreur de validation des données JSON") + logger.error(e.errors()) + except Exception as e: + logger.error("Echec de l'appel API", exc_info=e) + logger.info(f"{len(orm_data)} vessel data loaded") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Bloom scraping application") + parser.add_argument( + "-d", + "--dump-path", + help="Répertoire de destination des dump", + required=False, + default=None, + ) + args = parser.parse_args() + time_start = perf_counter() + logger.info("DEBUT - Chargement des données JSON depuis l'API SPIRE") + ExtractSpireDataFromApi(dump_path=args.dump_path).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données depuis l'API SPIRE en {duration:.2f}s") diff --git a/src/tasks/facts/extract_spire_data_from_csv.py b/src/tasks/facts/extract_spire_data_from_csv.py new file mode 100644 index 00000000..622e3b09 --- /dev/null +++ b/src/tasks/facts/extract_spire_data_from_csv.py @@ -0,0 +1,73 @@ +import argparse +import json +import pandas as pd +import numpy as np +from pathlib import Path +from time import perf_counter +from datetime import datetime + +from bloom.container import UseCases +from bloom.domain.spire_ais_data import SpireAisData +from bloom.config import settings +from bloom.logger import logger +from pydantic import ValidationError +from tasks.base import BaseTask + + +class ExtractSpireDataFromCsv(BaseTask): + def map_to_domain(self, row: pd.Series) -> SpireAisData: + isna = row.isna() + row=row.reindex(index=SpireAisData.__fields__.keys()) + #logger.debug(row.to_dict()) + + try: + logger.debug(SpireAisData(**row.to_dict())) + except Exception as e: + logger.debug(f"{type(e)}:{e}") + #return SpireAisData(**row.to_dict()) + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + db = use_cases.db() + + file_name = kwargs['segment_data_csv_path'] if 'segment_data_csv_path' in kwargs \ + else settings.segment_data_csv_path + logger.info(f"Loading spire data from {file_name}") + orm_data = [] + try: + df = pd.read_csv(file_name, sep=";") + #df['spire_update_statement']=pd.to_datetime(df['spire_update_statement'], format='%Y-%m-%d %H:%M:%S.%f %z') + df['spire_update_statement']=df['spire_update_statement'].apply(datetime.fromisoformat) + df['vessel_timestamp']=df['vessel_timestamp'].apply(datetime.fromisoformat) + df['vessel_update_timestamp']=df['vessel_update_timestamp'].apply(datetime.fromisoformat) + df['position_timestamp']=df['position_timestamp'].apply(datetime.fromisoformat) + df['position_update_timestamp']=df['position_update_timestamp'].apply(datetime.fromisoformat) + df['voyage_timestamp']=df['voyage_timestamp'].apply(datetime.fromisoformat) + df['voyage_update_timestamp']=df['voyage_update_timestamp'].apply(datetime.fromisoformat) + df['created_at']=df['created_at'].apply(datetime.fromisoformat) + #spire_ais_data = df.apply(self.map_to_domain, axis=1) + #with Path(file_name).open() as csv_data, db.session() as session: + # raw_vessels = json.load(json_data) + # spire_ais_data = map_raw_vessels_to_domain(raw_vessels) + # orm_data = spire_ais_data_repository.batch_create_ais_data(spire_ais_data, session) + # session.commit() + except ValidationError as e: + logger.error("Erreur de validation des données CSV") + logger.error(e.errors()) + logger.info(f"{len(orm_data)} vessel data loaded") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Load Segment data from file CSV file") + parser.add_argument( + "filename", + help="Path to CSV file to load", + ) + args = parser.parse_args() + time_start = perf_counter() + logger.info(f"DEBUT - Chargement des données CSV depuis le fichier {args.filename}") + ExtractSpireDataFromCsv(segment_data_csv_path=args.filename).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données CSV en {duration:.2f}s") diff --git a/src/tasks/facts/extract_spire_data_from_json.py b/src/tasks/facts/extract_spire_data_from_json.py new file mode 100644 index 00000000..576df1ae --- /dev/null +++ b/src/tasks/facts/extract_spire_data_from_json.py @@ -0,0 +1,46 @@ +import argparse +import json +from pathlib import Path +from time import perf_counter + +from bloom.container import UseCases +from bloom.infra.http.spire_api_utils import map_raw_vessels_to_domain +from bloom.logger import logger +from pydantic import ValidationError +from tasks.base import BaseTask + + +class ExtractSpireDataFromJson(BaseTask): + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + db = use_cases.db() + + logger.info(f"Loading spire data from {kwargs['file_name']}") + orm_data = [] + try: + with Path.open(kwargs['file_name']) as json_data, db.session() as session: + raw_vessels = json.load(json_data) + spire_ais_data = map_raw_vessels_to_domain(raw_vessels) + orm_data = spire_ais_data_repository.batch_create_ais_data(spire_ais_data, session) + session.commit() + except ValidationError as e: + logger.error("Erreur de validation des données JSON") + logger.error(e.errors()) + logger.info(f"{len(orm_data)} vessel data loaded") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Load Spire data from file JSON file") + parser.add_argument( + "filename", + help="Path to JSON file to load", + ) + args = parser.parse_args() + time_start = perf_counter() + logger.info(f"DEBUT - Chargement des données JSON depuis le fichier {args.filename}") + ExtractSpireDataFromJson(file_name=args.filename).start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Chargement des données JSON en {duration:.2f}s") diff --git a/src/tasks/facts/extract_vessel_positions_data.py b/src/tasks/facts/extract_vessel_positions_data.py new file mode 100644 index 00000000..22f54323 --- /dev/null +++ b/src/tasks/facts/extract_vessel_positions_data.py @@ -0,0 +1,21 @@ +import pandas as pd +from sqlalchemy import create_engine + +from bloom.config import settings +from tasks.base import BaseTask + + +class ExtractVesselPositionsDataTask(BaseTask): + def run(self, *args, **kwargs): + engine = create_engine(settings.db_url) + + df = pd.read_csv( + settings.vessel_positions_data_csv_path, + sep="," + ) + + df.to_sql("spire_vessel_positions", engine, if_exists="append", index=False) + + +if __name__ == "__main__": + ExtractVesselPositionsDataTask().start() diff --git a/src/tasks/init.py b/src/tasks/init.py new file mode 100644 index 00000000..c785ccf1 --- /dev/null +++ b/src/tasks/init.py @@ -0,0 +1,25 @@ +import sys + +from bloom.config import settings +from tasks.base import BaseTask +from tasks.load_dimensions import LoadDimensions +from tasks.load_facts import LoadFacts + + +class InitTask(BaseTask): + + def run(self, *args, **kwargs): + args = { + 'port_data_csv_path': settings.port_data_csv_path, + 'amp_data_csv_path': settings.amp_data_csv_path, + 'vessel_data_csv_path': settings.vessel_data_csv_path, + } + kwargs = {**args, **kwargs} + LoadDimensions(*args, **kwargs).start() + LoadFacts(*args, **kwargs).start() + + +if __name__ == "__main__": + task = InitTask(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0), + **dict(arg.split('=') for arg in sys.argv[1:] if arg.find('=') > 0)) + task.start() diff --git a/src/tasks/load_dimensions.py b/src/tasks/load_dimensions.py new file mode 100644 index 00000000..68f517e5 --- /dev/null +++ b/src/tasks/load_dimensions.py @@ -0,0 +1,30 @@ +import sys + +from bloom.logger import logger +from tasks.base import BaseTask +from tasks.dimensions import LoadDimPortFromCsv, LoadDimVesselFromCsv, LoadDimZoneAmpFromCsv,\ + ComputePortGeometryBuffer + +import hashlib +from datetime import datetime + + + +class LoadDimensions(BaseTask): + + def run(self, *args, **kwargs): + LoadDimZoneAmpFromCsv(*args, **kwargs).start() + ComputePortGeometryBuffer(*args, **kwargs).start() + LoadDimPortFromCsv(*args, **kwargs).start() + LoadDimVesselFromCsv(*args, **kwargs).start() + + def on_error(self, *args, **kwargs): + logger.error("LoadDimensions::on_error") + logger.error(f"batch:{kwargs['batch']}") + pass + + +if __name__ == "__main__": + task = LoadDimensions(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0), + **dict(arg.split('=') for arg in sys.argv[1:] if arg.find('=') > 0)) + task.start() diff --git a/src/tasks/load_facts.py b/src/tasks/load_facts.py new file mode 100644 index 00000000..d540db9e --- /dev/null +++ b/src/tasks/load_facts.py @@ -0,0 +1,16 @@ +import sys + +from tasks.base import BaseTask +from tasks.facts import CleanPositionsTask + + +class LoadFacts(BaseTask): + + def run(self, *args, **kwargs): + CleanPositionsTask(*args, **kwargs).start() + + +if __name__ == "__main__": + task = LoadFacts(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0), + **dict(arg.split('=') for arg in sys.argv[1:] if arg.find('=') > 0)) + task.start() diff --git a/src/tasks/process_ais_data.py b/src/tasks/process_ais_data.py new file mode 100644 index 00000000..242bff39 --- /dev/null +++ b/src/tasks/process_ais_data.py @@ -0,0 +1,18 @@ +import sys + +from tasks.base import BaseTask +from tasks.facts import CleanPositionsTask, LoadSpireDataFromApi +from tasks.transformations import UpdateVesselDataVoyage + + +class PipelinePorts(BaseTask): + + def run(self, *args, **kwargs): + UpdateVesselDataVoyage(*args, **kwargs).start() + CleanPositionsTask(*args, **kwargs).start() + + +if __name__ == "__main__": + task = PipelinePorts(*list(arg for arg in sys.argv[1:] if arg.find('=') <= 0), + **dict(arg.split('=') for arg in sys.argv[1:] if arg.find('=') > 0)) + task.start() diff --git a/src/tasks/transformations/__init__.py b/src/tasks/transformations/__init__.py new file mode 100644 index 00000000..86ae871e --- /dev/null +++ b/src/tasks/transformations/__init__.py @@ -0,0 +1,3 @@ +from .compute_port_geometry_buffer import * +from .convert_spire_vessels_to_spire_ais_data import * +from .update_vessel_data_voyage import * \ No newline at end of file diff --git a/src/tasks/transformations/convert_spire_vessels_to_spire_ais_data.py b/src/tasks/transformations/convert_spire_vessels_to_spire_ais_data.py new file mode 100644 index 00000000..635ec329 --- /dev/null +++ b/src/tasks/transformations/convert_spire_vessels_to_spire_ais_data.py @@ -0,0 +1,90 @@ +from time import perf_counter +from typing import Generator + +from bloom.container import UseCases +from bloom.domain.spire_ais_data import SpireAisData +from bloom.infra.database.sql_model import VesselPositionSpire +from bloom.logger import logger +from geoalchemy2.shape import to_shape +from shapely import Point +from sqlalchemy.orm.session import Session +from tasks.base import BaseTask + + +class ConvertSpireVesselsToSpireAisData(BaseTask): + batch_size = 1000 + vessel_repo = None + + def map_to_ais_spire_data(self, vessel_position: VesselPositionSpire) -> SpireAisData: + position: Point = to_shape(vessel_position.position) + return SpireAisData( + spire_update_statement=vessel_position.timestamp, + vessel_ais_class=None, + vessel_flag=None, + vessel_name=vessel_position.ship_name, + vessel_callsign=None, + vessel_timestamp=None, + vessel_update_timestamp=None, + vessel_ship_type=None, + vessel_sub_ship_type=None, + vessel_mmsi=vessel_position.mmsi, + vessel_imo=vessel_position.IMO, + vessel_width=vessel_position.vessel_width, + vessel_length=vessel_position.vessel_length, + position_accuracy=vessel_position.accuracy, + position_collection_type=vessel_position.position_sensors, + position_course=vessel_position.course, + position_heading=vessel_position.heading, + position_latitude=position.x, + position_longitude=position.y, + position_maneuver=None, + position_navigational_status=vessel_position.navigation_status, + position_rot=vessel_position.rot, + position_speed=vessel_position.speed, + position_timestamp=vessel_position.last_position_time, + position_update_timestamp=vessel_position.last_position_time, + voyage_destination=vessel_position.voyage_destination, + voyage_draught=vessel_position.voyage_draught, + voyage_eta=vessel_position.voyage_draught, + voyage_timestamp=None, + voyage_update_timestamp=None, + ) + + def batch_convert(self, session: Session) -> Generator[list[SpireAisData], None, None]: + batch = [] + for vessel_position in self.vessel_repo.get_all_spire_vessels_position(session, self.batch_size): + batch.append(map_to_ais_spire_data(vessel_position)) + if len(batch) >= self.batch_size: + yield batch + batch = [] + yield batch + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + self.vessel_repo = use_cases.vessel_repository() + spire_ais_data_repo = use_cases.spire_ais_data_repository() + db = use_cases.db() + count = 0 + try: + with db.session() as session: + for b in self.batch_convert(session): + count = count + len(b) + spire_ais_data_repo.batch_create_ais_data(b, session) + session.commit() + except Exception as e: + session.rollback() + logger.error("Erreur lors de conversion, transaction ROLLBACK") + logger.error(e) + logger.info(f"{count} enregistrements convertis") + + +if __name__ == "__main__": + time_start = perf_counter() + task = ConvertSpireVesselsToSpireAisData() + logger.info( + f"Conversion spire_vessel_positions -> spire_ais_data (taille des lots: {task.batch_size})" + ) + task.start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"Conversion spire_vessel_positions -> spire_ais_data en {duration:.2f}s") diff --git a/src/tasks/transformations/update_vessel_data_voyage.py b/src/tasks/transformations/update_vessel_data_voyage.py new file mode 100644 index 00000000..45d93e07 --- /dev/null +++ b/src/tasks/transformations/update_vessel_data_voyage.py @@ -0,0 +1,90 @@ +from datetime import datetime, timezone +from time import perf_counter + +from bloom.container import UseCases +from bloom.domain.spire_ais_data import SpireAisData +from bloom.domain.vessel import Vessel +from bloom.domain.vessel_data import VesselData +from bloom.domain.vessel_voyage import VesselVoyage +from bloom.infra.repositories.repository_spire_ais_data import SpireAisDataRepository +from bloom.infra.repositories.repository_task_execution import TaskExecutionRepository +from bloom.infra.repositories.repository_vessel_data import VesselDataRepository +from bloom.infra.repositories.repository_vessel_voyage import VesselVoyageRepository +from bloom.logger import logger +from tasks.base import BaseTask + + +class UpdateVesselDataVoyage(BaseTask): + + def map_ais_data_to_vessel_data(self, ais_data: SpireAisData, vessel: Vessel) -> VesselData: + return VesselData( + timestamp=ais_data.vessel_timestamp, + ais_class=ais_data.vessel_ais_class, + flag=ais_data.vessel_flag, + name=ais_data.vessel_name, + callsign=ais_data.vessel_callsign, + ship_type=ais_data.vessel_ship_type, + sub_ship_type=ais_data.vessel_sub_ship_type, + mmsi=ais_data.vessel_mmsi, + imo=ais_data.vessel_imo, + width=ais_data.vessel_width, + length=ais_data.vessel_length, + vessel_id=vessel.id + ) + + def map_ais_data_to_vessel_voyage(self, ais_data: SpireAisData, vessel: Vessel) -> VesselVoyage: + return VesselVoyage( + timestamp=ais_data.voyage_timestamp, + destination=ais_data.voyage_destination, + draught=ais_data.voyage_draught, + eta=ais_data.voyage_eta, + vessel_id=vessel.id, + ) + + def run(self, *args, **kwargs) -> None: + use_cases = UseCases() + spire_ais_data_repository = use_cases.spire_ais_data_repository() + vessel_repository = use_cases.vessel_repository() + db = use_cases.db() + with db.session() as session: + point_in_time = TaskExecutionRepository.get_point_in_time(session, "update_vessel_data_voyage") + logger.info(f"Point in time={point_in_time}") + now = datetime.now(timezone.utc) + nb_donnees = 0 + nb_insert_data = 0 + nb_insert_voyage = 0 + vessels = vessel_repository.load_vessel_metadata(session) + logger.info(f"{len(vessels)} bateaux à traiter") + # Foreach vessel + for vessel in vessels: + # Recheche des données AIS de chaque bateau créées depuis la dernière exécution du traitement (point in time) + spire_datas = spire_ais_data_repository.get_all_data_by_mmsi(session, vessel.mmsi, + SpireAisDataRepository.ORDER_BY_POSITION, + point_in_time) + for spire_data in spire_datas: + vessel_data = map_ais_data_to_vessel_data(spire_data, vessel) + vessel_voyage = map_ais_data_to_vessel_voyage(spire_data, vessel) + nb_donnees += 1 + last_data = VesselDataRepository.get_last_vessel_data(session, vessel.id) + last_voyage = VesselVoyageRepository.get_last_vessel_voyage(session, vessel.id) + # Foreach position + if not last_data or vessel_data.timestamp > last_data.timestamp: + VesselDataRepository.create_vessel_data(session, vessel_data) + nb_insert_data += 1 + if not last_voyage or vessel_voyage.timestamp > last_voyage.timestamp: + VesselVoyageRepository.create_vessel_voyage(session, vessel_voyage) + nb_insert_voyage += 1 + TaskExecutionRepository.set_point_in_time(session, "update_vessel_data_voyage", now) + session.commit() + logger.info(f"{nb_donnees} données SPIRE traitées") + logger.info(f"{nb_insert_data} données statiques mises à jour") + logger.info(f"{nb_insert_voyage} données de voyage mises à jour") + + +if __name__ == "__main__": + time_start = perf_counter() + logger.info("DEBUT - Traitement des données statiques AIS des bateaux") + UpdateVesselDataVoyage().start() + time_end = perf_counter() + duration = time_end - time_start + logger.info(f"FIN - Traitement des données statiques AIS des bateaux en {duration:.2f}s")