Skip to content

Commit

Permalink
update values
Browse files Browse the repository at this point in the history
  • Loading branch information
notdodo committed Oct 13, 2024
1 parent 2a0f670 commit dcce59e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 265 deletions.
5 changes: 1 addition & 4 deletions app/erfiume/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from __future__ import annotations

from .apis import Stazione, Valore, enrich_data, fetch_latest_time, fetch_stations_data
from .apis import Stazione
from .logging import logger
from .storage import AsyncDynamoDB
from .tgbot import bot
Expand All @@ -13,9 +13,6 @@
"AsyncDynamoDB",
"Stazione",
"Valore",
"enrich_data",
"fetch_latest_time",
"fetch_stations_data",
"logger",
"bot",
]
117 changes: 0 additions & 117 deletions app/erfiume/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@

from __future__ import annotations

from asyncio import gather as async_gather
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from inspect import cleandoc

from httpx import AsyncClient, HTTPStatusError
from zoneinfo import ZoneInfo

from .logging import logger

UNKNOWN_VALUE = -9999.0

KNOWN_STATIONS = [
Expand Down Expand Up @@ -354,116 +350,3 @@ def create_station_message(self) -> str:
Soglia Rossa: {red}
Ultimo rilevamento: {timestamp}"""
)


@dataclass
class Valore:
"""
Single value from the sensor for a station.
"""

t: int
v: float

def __post_init__(self) -> None:
"""
Ensure that `t` is always converted to an int.
"""
self.t = int(self.t)

def to_dict(self) -> dict[str, int | Decimal]:
"""Convert dataclass to dictionary, suitable for DynamoDB storage."""
return {"t": self.t, "v": Decimal(str(self.v))}


async def fetch_latest_time(client: AsyncClient) -> int:
"""
Fetch the latest updated time.
"""
base_time = "1726667100000"
base_url = "https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-sensor-values-no-time?variabile=254,0,0/1,-,-,-/B13215&time={}"
url = base_url.format(base_time)
try:
response = await client.get(url)
response.raise_for_status()
data = response.json()
return int(data[0]["time"])
except HTTPStatusError as e:
logger.exception("Error fetching latest time: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching latest time: KeyError or IndexError")
raise


async def fetch_stations_data(client: AsyncClient, time: int) -> list[Stazione]:
"""
Fetch the list of all stations from the latest update timestamp.
"""
base_url = "https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-sensor-values-no-time?variabile=254,0,0/1,-,-,-/B13215&time={}"
url = base_url.format(time)

try:
response = await client.get(url)
response.raise_for_status()
data = response.json()
return [
Stazione(
timestamp=time,
**{
k: v if v is not None else UNKNOWN_VALUE
for k, v in stazione.items()
},
)
for stazione in data
if "time" not in stazione
]
except HTTPStatusError as e:
logger.exception("Error fetching stations data: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching stations data: KeyError or IndexError")
raise


async def fetch_time_series(
client: AsyncClient,
stazione: Stazione,
) -> list[Valore]:
"""
Fetch additional data (time series) for a station.
"""
url = f"https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-time-series/?stazione={stazione.idstazione}&variabile=254,0,0/1,-,-,-/B13215"
try:
response = await client.get(url)
response.raise_for_status()
return [Valore(**valore) for valore in response.json()]
except HTTPStatusError:
logger.exception(
"Error fetching time series for stagiong %s %s: %s",
stazione.nomestaz,
stazione.idstazione,
response.status_code,
)
raise
except KeyError:
logger.exception(
"Error fetching time series for station %s %s: KeyError",
stazione.nomestaz,
stazione.idstazione,
)
raise


async def enrich_data(client: AsyncClient, stations: list[Stazione]) -> None:
"""Enrich station data with time series values."""
tasks = [fetch_time_series(client, stazione) for stazione in stations]
results = await async_gather(*tasks, return_exceptions=True)

for stazione, dati in zip(stations, results):
if isinstance(dati, BaseException):
logger.error("Failed to fetch time series for station %s", stazione)
elif len(dati) > 0:
max_value = max(dati, key=lambda x: x.t)
stazione.value = max_value.v
stazione.timestamp = max_value.t
49 changes: 0 additions & 49 deletions app/erfiume/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from __future__ import annotations

from datetime import datetime, timedelta
from decimal import Decimal
from os import getenv
from typing import TYPE_CHECKING, Self

Expand Down Expand Up @@ -58,54 +57,6 @@ async def __aexit__(
"""Close the client on exit."""
await self.dynamodb.__aexit__(exc_type, exc_val, exc_tb)

async def check_and_update_stazioni(self, station: Stazione) -> None:
"""
Check if the station data in DynamoDB is outdated compared to the given station object.
If outdated or non-existent, update it with the new data.
"""
try:
response = await self.table.get_item(
Key={"nomestaz": station.nomestaz},
ProjectionExpression="#tsp",
ExpressionAttributeNames={"#tsp": "timestamp"},
)

# Get the latest timestamp from the DynamoDB response
latest_timestamp = (
int(response["Item"].get("timestamp")) # type: ignore[arg-type]
if "Item" in response
else 0
)

# If the provided station has newer data or the record doesn't exist, update DynamoDB
if station.timestamp > latest_timestamp:
await self.table.update_item(
Key={"nomestaz": station.nomestaz},
UpdateExpression="SET #ts = :new_timestamp, #vl = :new_value",
ExpressionAttributeValues={
":new_timestamp": station.timestamp,
":new_value": (
Decimal(str(station.value))
if station.value is not None
else Decimal(str(UNKNOWN_VALUE))
),
},
ExpressionAttributeNames={
"#ts": "timestamp",
"#vl": "value",
},
)
elif not response["Item"]:
await self.table.put_item(Item=station.to_dict())
except ClientError as e:
logger.exception(
"Error while checking or updating station %s: %s", station.nomestaz, e
)
raise
except Exception as e:
logger.exception("Unexpected error: %s", e)
raise

async def get_matching_station(self, station_name: str) -> Stazione | None:
"""
Retrieve a station from the DynamoDB table by its idstazione.
Expand Down
58 changes: 0 additions & 58 deletions app/erfiume_fetcher.py

This file was deleted.

Loading

0 comments on commit dcce59e

Please sign in to comment.