Skip to content

Commit

Permalink
enh: support messaged in groups, close #10
Browse files Browse the repository at this point in the history
  • Loading branch information
notdodo committed Sep 23, 2024
1 parent 7fd968a commit 69bbb3b
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 46 deletions.
72 changes: 37 additions & 35 deletions app/erfiume/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from __future__ import annotations

import asyncio
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from decimal import Decimal

import httpx

from .logging import logger

UNKNOWN_VALUE = -9999.0


@dataclass
class Stazione:
Expand All @@ -28,21 +30,25 @@ class Stazione:
soglia1: float
soglia2: float
soglia3: float
value: float
value: float | None

def to_dict(self) -> dict[str, str | Decimal]:
"""
Convert dataclass to dictionary, suitable for DynamoDB storage.
"""
data = asdict(self)
data["soglia1"] = Decimal(str(self.soglia1))
data["soglia2"] = Decimal(str(self.soglia2))
data["soglia3"] = Decimal(str(self.soglia3))
data["value"] = (
Decimal(str(self.value)) if self.value is not None else Decimal("-1.0")
)
def __post_init__(self) -> None:
self.value = self.value or UNKNOWN_VALUE

return data
def to_dict(self) -> dict[str, str | Decimal | int]:
"""Convert dataclass to dictionary, suitable for DynamoDB storage."""
return {
"timestamp": self.timestamp,
"idstazione": self.idstazione,
"ordinamento": self.ordinamento,
"nomestaz": self.nomestaz,
"lon": self.lon,
"lat": self.lat,
"soglia1": Decimal(str(self.soglia1)),
"soglia2": Decimal(str(self.soglia2)),
"soglia3": Decimal(str(self.soglia3)),
"value": Decimal(str(self.value)),
}


@dataclass
Expand Down Expand Up @@ -76,15 +82,13 @@ async def fetch_latest_time(client: httpx.AsyncClient) -> int:
response = await client.get(url)
response.raise_for_status()
data = response.json()
latest_time = int(data[0]["time"])
return int(data[0]["time"])
except httpx.HTTPStatusError as e:
logger.exception("Error fetching latest time: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching latest time: KeyError or IndexError")
raise
else:
return latest_time


async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Stazione]:
Expand All @@ -98,25 +102,23 @@ async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Staz
response = await client.get(url)
response.raise_for_status()
data = response.json()
stazioni = []
for stazione in data:
if "time" not in stazione:
if not stazione["value"]:
stazione["value"] = "-1.0"
stazioni.append(
Stazione(
timestamp=time,
**stazione,
)
)
return [
Stazione(
timestamp=time,
**{
k: v if v is not None else UNKNOWN_VALUE
for k, v in stazione.items()
},
)
for stazione in data
if "time" not in stazione
]
except httpx.HTTPStatusError as e:
logger.exception("Error fetching stations data: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching stations data: KeyError or IndexError")
raise
else:
return stazioni


async def fetch_time_series(
Expand All @@ -130,7 +132,7 @@ async def fetch_time_series(
try:
response = await client.get(url)
response.raise_for_status()
valori = [Valore(**valore) for valore in response.json()]
return [Valore(**valore) for valore in response.json()]
except httpx.HTTPStatusError:
logger.exception(
"Error fetching time series for stagiong %s %s: %s",
Expand All @@ -146,17 +148,17 @@ async def fetch_time_series(
stazione.idstazione,
)
raise
else:
return valori


async def enrich_data(client: httpx.AsyncClient, stations: list[Stazione]) -> None:
"""Enrich station data with time series values."""
tasks = [fetch_time_series(client, stazione) for stazione in stations]
results = await asyncio.gather(*tasks)
results = await asyncio.gather(*tasks, return_exceptions=True)

for stazione, dati in zip(stations, results):
if dati:
if isinstance(dati, BaseException):
logger.error("Failed to fetch time series for station %s", stazione)
else:
max_value = max(dati, key=lambda x: x.t)
stazione.value = max_value.v
stazione.timestamp = max_value.t
28 changes: 23 additions & 5 deletions app/erfiume/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import os
from decimal import Decimal
from typing import TYPE_CHECKING, Self

import aioboto3
Expand All @@ -17,6 +18,8 @@
if TYPE_CHECKING:
from types import TracebackType

UNKNOWN_VALUE = -9999.0


class AsyncDynamoDB:
"""
Expand Down Expand Up @@ -68,19 +71,34 @@ async def check_and_update_stazioni(self, station: Stazione) -> None:
)

# If the provided station has newer data or the record doesn't exist, update DynamoDB
if station.timestamp > latest_timestamp or not response["Item"]:
logger.info(
"Updating data for station %s (%s)",
station.nomestaz,
station.idstazione,
if station.timestamp > latest_timestamp:
logger.info("Updating data for station %s", station.nomestaz)
await self.table.update_item(
Key={"nomestaz": station.nomestaz},
UpdateExpression="SET #ts = :new_timestamp, #vl = :new_value",
ExpressionAttributeValues={
":new_timestamp": station.timestamp,
":new_value": (
Decimal(str(station.value))
if station.value is not None
else Decimal(str(UNKNOWN_VALUE))
),
},
ExpressionAttributeNames={
"#ts": "timestamp",
"#vl": "value",
},
)
elif not response["Item"]:
logger.info("Creating data for station %s", station.nomestaz)
await self.table.put_item(Item=station.to_dict())
except ClientError as e:
logger.exception(
"Error while checking or updating station %s: %s", station.nomestaz, e
)
raise
except Exception as e:
logger.info("Stazione: %s", station)
logger.exception("Unexpected error: %s", e)
raise

Expand Down
45 changes: 43 additions & 2 deletions app/erfiume/tgbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from .logging import logger
from .storage import AsyncDynamoDB

UNKNOWN_VALUE = -9999.0


async def fetch_bot_token() -> str:
"""
Expand Down Expand Up @@ -55,7 +57,7 @@ def create_station_message(station: Stazione) -> str:
.replace(tzinfo=None)
.strftime("%d-%m-%Y %H:%M")
)
value = float(station.value)
value = float(station.value) # type: ignore [arg-type]
yellow = station.soglia1
orange = station.soglia2
red = station.soglia3
Expand All @@ -66,6 +68,10 @@ def create_station_message(station: Stazione) -> str:
alarm = "🟡"
elif value >= orange and value <= red:
alarm = "🟠"

if value == UNKNOWN_VALUE:
value = "non disponibile" # type: ignore[assignment]
alarm = ""
return cleandoc(
f"""Stazione: {station.nomestaz}
Valore: {value!r} {alarm}
Expand Down Expand Up @@ -104,7 +110,35 @@ async def handle_private_message(
if update.message and update.effective_chat and update.message.text:
logger.info("Received private message: %s", update.message.text)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(update.message.text)
stazione = await dynamo.get_matching_station(
update.message.text.replace("/", "").strip()
)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)


async def handle_group_message(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""
Handle messages writte from private chat to match a specific station
"""

message = cleandoc(
"""Stazione non trovata!
Inserisci esattamente il nome che vedi dalla pagina https://allertameteo.regione.emilia-romagna.it/livello-idrometrico
Ad esempio 'Cesena', 'Lavino di Sopra' o 'S. Carlo'"""
)
if update.message and update.effective_chat and update.message.text:
logger.info("Received group message: %s", update.message.text)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(
update.message.text.replace("/", "").replace("erfiume_bot", "").strip()
)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
Expand All @@ -125,6 +159,13 @@ async def bot(event: dict[str, Any]) -> None:
handle_private_message,
)
)
application.add_handler(
MessageHandler(
(filters.ChatType.SUPERGROUP | filters.ChatType.GROUP)
& (filters.COMMAND | filters.Regex("@erfiume_bot")),
handle_group_message,
)
)

# Decode the incoming Telegram message
if event.get("body"):
Expand Down
31 changes: 27 additions & 4 deletions pulumi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@
],
)

utenti_table = dynamodb.Table(
f"{RESOURCES_PREFIX}-users",
name="Utenti",
billing_mode="PAY_PER_REQUEST",
hash_key="chatid",
attributes=[
dynamodb.TableAttributeArgs(
name="chatid",
type="S",
),
],
ttl=dynamodb.TableTtlArgs(
attribute_name="ttl",
enabled=True,
),
)

telegram_token_secret = secretsmanager.Secret(
f"{RESOURCES_PREFIX}-telegram-bot-token",
name="telegram-bot-token",
Expand Down Expand Up @@ -64,14 +81,15 @@
],
inline_policies=[
iam.RoleInlinePolicyArgs(
name="DynamoDBStazioniRW",
name="FetcherRole",
policy=iam.get_policy_document_output(
statements=[
{
"Effect": "Allow",
"Actions": [
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
],
"Resources": [stazioni_table.arn],
Expand Down Expand Up @@ -104,7 +122,7 @@
],
inline_policies=[
iam.RoleInlinePolicyArgs(
name="DynamoSMReadOnly",
name="BotRole",
policy=iam.get_policy_document_output(
statements=[
{
Expand All @@ -113,9 +131,14 @@
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [
f"arn:aws:dynamodb:eu-west-1:{get_caller_identity().account_id}:table/Stazioni"
"Resources": [stazioni_table.arn, utenti_table.arn],
},
{
"Effect": "Allow",
"Actions": [
"dynamodb:PutItem",
],
"Resources": [utenti_table.arn],
},
{
"Effect": "Allow",
Expand Down

0 comments on commit 69bbb3b

Please sign in to comment.