diff --git a/app/erfiume/apis.py b/app/erfiume/apis.py index f3ed41c..30d3ffc 100644 --- a/app/erfiume/apis.py +++ b/app/erfiume/apis.py @@ -5,13 +5,15 @@ from __future__ import annotations import asyncio -from dataclasses import asdict, dataclass +from dataclasses import dataclass from decimal import Decimal import httpx from .logging import logger +UNKNOWN_VALUE = -9999.0 + @dataclass class Stazione: @@ -28,21 +30,25 @@ class Stazione: soglia1: float soglia2: float soglia3: float - value: float + value: float | None - def to_dict(self) -> dict[str, str | Decimal]: - """ - Convert dataclass to dictionary, suitable for DynamoDB storage. - """ - data = asdict(self) - data["soglia1"] = Decimal(str(self.soglia1)) - data["soglia2"] = Decimal(str(self.soglia2)) - data["soglia3"] = Decimal(str(self.soglia3)) - data["value"] = ( - Decimal(str(self.value)) if self.value is not None else Decimal("-1.0") - ) + def __post_init__(self) -> None: + self.value = self.value or UNKNOWN_VALUE - return data + def to_dict(self) -> dict[str, str | Decimal | int]: + """Convert dataclass to dictionary, suitable for DynamoDB storage.""" + return { + "timestamp": self.timestamp, + "idstazione": self.idstazione, + "ordinamento": self.ordinamento, + "nomestaz": self.nomestaz, + "lon": self.lon, + "lat": self.lat, + "soglia1": Decimal(str(self.soglia1)), + "soglia2": Decimal(str(self.soglia2)), + "soglia3": Decimal(str(self.soglia3)), + "value": Decimal(str(self.value)), + } @dataclass @@ -76,15 +82,13 @@ async def fetch_latest_time(client: httpx.AsyncClient) -> int: response = await client.get(url) response.raise_for_status() data = response.json() - latest_time = int(data[0]["time"]) + return int(data[0]["time"]) except httpx.HTTPStatusError as e: logger.exception("Error fetching latest time: %s", e.response.status_code) raise except (KeyError, IndexError): logger.exception("Error fetching latest time: KeyError or IndexError") raise - else: - return latest_time async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Stazione]: @@ -98,25 +102,23 @@ async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Staz response = await client.get(url) response.raise_for_status() data = response.json() - stazioni = [] - for stazione in data: - if "time" not in stazione: - if not stazione["value"]: - stazione["value"] = "-1.0" - stazioni.append( - Stazione( - timestamp=time, - **stazione, - ) - ) + return [ + Stazione( + timestamp=time, + **{ + k: v if v is not None else UNKNOWN_VALUE + for k, v in stazione.items() + }, + ) + for stazione in data + if "time" not in stazione + ] except httpx.HTTPStatusError as e: logger.exception("Error fetching stations data: %s", e.response.status_code) raise except (KeyError, IndexError): logger.exception("Error fetching stations data: KeyError or IndexError") raise - else: - return stazioni async def fetch_time_series( @@ -130,7 +132,7 @@ async def fetch_time_series( try: response = await client.get(url) response.raise_for_status() - valori = [Valore(**valore) for valore in response.json()] + return [Valore(**valore) for valore in response.json()] except httpx.HTTPStatusError: logger.exception( "Error fetching time series for stagiong %s %s: %s", @@ -146,17 +148,17 @@ async def fetch_time_series( stazione.idstazione, ) raise - else: - return valori async def enrich_data(client: httpx.AsyncClient, stations: list[Stazione]) -> None: """Enrich station data with time series values.""" tasks = [fetch_time_series(client, stazione) for stazione in stations] - results = await asyncio.gather(*tasks) + results = await asyncio.gather(*tasks, return_exceptions=True) for stazione, dati in zip(stations, results): - if dati: + if isinstance(dati, BaseException): + logger.error("Failed to fetch time series for station %s", stazione) + else: max_value = max(dati, key=lambda x: x.t) stazione.value = max_value.v stazione.timestamp = max_value.t diff --git a/app/erfiume/storage.py b/app/erfiume/storage.py index 13d7498..e7fa576 100644 --- a/app/erfiume/storage.py +++ b/app/erfiume/storage.py @@ -5,6 +5,7 @@ from __future__ import annotations import os +from decimal import Decimal from typing import TYPE_CHECKING, Self import aioboto3 @@ -17,6 +18,8 @@ if TYPE_CHECKING: from types import TracebackType +UNKNOWN_VALUE = -9999.0 + class AsyncDynamoDB: """ @@ -68,12 +71,26 @@ async def check_and_update_stazioni(self, station: Stazione) -> None: ) # If the provided station has newer data or the record doesn't exist, update DynamoDB - if station.timestamp > latest_timestamp or not response["Item"]: - logger.info( - "Updating data for station %s (%s)", - station.nomestaz, - station.idstazione, + if station.timestamp > latest_timestamp: + logger.info("Updating data for station %s", station.nomestaz) + await self.table.update_item( + Key={"nomestaz": station.nomestaz}, + UpdateExpression="SET #ts = :new_timestamp, #vl = :new_value", + ExpressionAttributeValues={ + ":new_timestamp": station.timestamp, + ":new_value": ( + Decimal(str(station.value)) + if station.value is not None + else Decimal(str(UNKNOWN_VALUE)) + ), + }, + ExpressionAttributeNames={ + "#ts": "timestamp", + "#vl": "value", + }, ) + elif not response["Item"]: + logger.info("Creating data for station %s", station.nomestaz) await self.table.put_item(Item=station.to_dict()) except ClientError as e: logger.exception( @@ -81,6 +98,7 @@ async def check_and_update_stazioni(self, station: Stazione) -> None: ) raise except Exception as e: + logger.info("Stazione: %s", station) logger.exception("Unexpected error: %s", e) raise diff --git a/app/erfiume/tgbot.py b/app/erfiume/tgbot.py index 17945d6..5733146 100644 --- a/app/erfiume/tgbot.py +++ b/app/erfiume/tgbot.py @@ -27,6 +27,8 @@ from .logging import logger from .storage import AsyncDynamoDB +UNKNOWN_VALUE = -9999.0 + async def fetch_bot_token() -> str: """ @@ -55,7 +57,7 @@ def create_station_message(station: Stazione) -> str: .replace(tzinfo=None) .strftime("%d-%m-%Y %H:%M") ) - value = float(station.value) + value = float(station.value) # type: ignore [arg-type] yellow = station.soglia1 orange = station.soglia2 red = station.soglia3 @@ -66,6 +68,10 @@ def create_station_message(station: Stazione) -> str: alarm = "🟡" elif value >= orange and value <= red: alarm = "🟠" + + if value == UNKNOWN_VALUE: + value = "non disponibile" # type: ignore[assignment] + alarm = "" return cleandoc( f"""Stazione: {station.nomestaz} Valore: {value!r} {alarm} @@ -104,7 +110,35 @@ async def handle_private_message( if update.message and update.effective_chat and update.message.text: logger.info("Received private message: %s", update.message.text) async with AsyncDynamoDB(table_name="Stazioni") as dynamo: - stazione = await dynamo.get_matching_station(update.message.text) + stazione = await dynamo.get_matching_station( + update.message.text.replace("/", "").strip() + ) + if stazione and update.message: + message = create_station_message(stazione) + await context.bot.send_message( + chat_id=update.effective_chat.id, + text=message, + ) + + +async def handle_group_message( + update: Update, context: ContextTypes.DEFAULT_TYPE +) -> None: + """ + Handle messages writte from private chat to match a specific station + """ + + message = cleandoc( + """Stazione non trovata! + Inserisci esattamente il nome che vedi dalla pagina https://allertameteo.regione.emilia-romagna.it/livello-idrometrico + Ad esempio 'Cesena', 'Lavino di Sopra' o 'S. Carlo'""" + ) + if update.message and update.effective_chat and update.message.text: + logger.info("Received group message: %s", update.message.text) + async with AsyncDynamoDB(table_name="Stazioni") as dynamo: + stazione = await dynamo.get_matching_station( + update.message.text.replace("/", "").replace("erfiume_bot", "").strip() + ) if stazione and update.message: message = create_station_message(stazione) await context.bot.send_message( @@ -125,6 +159,13 @@ async def bot(event: dict[str, Any]) -> None: handle_private_message, ) ) + application.add_handler( + MessageHandler( + (filters.ChatType.SUPERGROUP | filters.ChatType.GROUP) + & (filters.COMMAND | filters.Regex("@erfiume_bot")), + handle_group_message, + ) + ) # Decode the incoming Telegram message if event.get("body"): diff --git a/pulumi/__main__.py b/pulumi/__main__.py index 6555445..0bb5e85 100644 --- a/pulumi/__main__.py +++ b/pulumi/__main__.py @@ -35,6 +35,23 @@ ], ) +utenti_table = dynamodb.Table( + f"{RESOURCES_PREFIX}-users", + name="Utenti", + billing_mode="PAY_PER_REQUEST", + hash_key="chatid", + attributes=[ + dynamodb.TableAttributeArgs( + name="chatid", + type="S", + ), + ], + ttl=dynamodb.TableTtlArgs( + attribute_name="ttl", + enabled=True, + ), +) + telegram_token_secret = secretsmanager.Secret( f"{RESOURCES_PREFIX}-telegram-bot-token", name="telegram-bot-token", @@ -64,7 +81,7 @@ ], inline_policies=[ iam.RoleInlinePolicyArgs( - name="DynamoDBStazioniRW", + name="FetcherRole", policy=iam.get_policy_document_output( statements=[ { @@ -72,6 +89,7 @@ "Actions": [ "dynamodb:PutItem", "dynamodb:Query", + "dynamodb:UpdateItem", "dynamodb:GetItem", ], "Resources": [stazioni_table.arn], @@ -104,7 +122,7 @@ ], inline_policies=[ iam.RoleInlinePolicyArgs( - name="DynamoSMReadOnly", + name="BotRole", policy=iam.get_policy_document_output( statements=[ { @@ -113,9 +131,14 @@ "dynamodb:Query", "dynamodb:GetItem", ], - "Resources": [ - f"arn:aws:dynamodb:eu-west-1:{get_caller_identity().account_id}:table/Stazioni" + "Resources": [stazioni_table.arn, utenti_table.arn], + }, + { + "Effect": "Allow", + "Actions": [ + "dynamodb:PutItem", ], + "Resources": [utenti_table.arn], }, { "Effect": "Allow",