Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enh: support messaged in groups, close #10 #21

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions app/erfiume/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from __future__ import annotations

import asyncio
from dataclasses import asdict, dataclass
from dataclasses import dataclass
from decimal import Decimal

import httpx

from .logging import logger

UNKNOWN_VALUE = -9999.0


@dataclass
class Stazione:
Expand All @@ -28,21 +30,25 @@ class Stazione:
soglia1: float
soglia2: float
soglia3: float
value: float
value: float | None

def to_dict(self) -> dict[str, str | Decimal]:
"""
Convert dataclass to dictionary, suitable for DynamoDB storage.
"""
data = asdict(self)
data["soglia1"] = Decimal(str(self.soglia1))
data["soglia2"] = Decimal(str(self.soglia2))
data["soglia3"] = Decimal(str(self.soglia3))
data["value"] = (
Decimal(str(self.value)) if self.value is not None else Decimal("-1.0")
)
def __post_init__(self) -> None:
self.value = self.value or UNKNOWN_VALUE

return data
def to_dict(self) -> dict[str, str | Decimal | int]:
"""Convert dataclass to dictionary, suitable for DynamoDB storage."""
return {
"timestamp": self.timestamp,
"idstazione": self.idstazione,
"ordinamento": self.ordinamento,
"nomestaz": self.nomestaz,
"lon": self.lon,
"lat": self.lat,
"soglia1": Decimal(str(self.soglia1)),
"soglia2": Decimal(str(self.soglia2)),
"soglia3": Decimal(str(self.soglia3)),
"value": Decimal(str(self.value)),
}


@dataclass
Expand Down Expand Up @@ -76,15 +82,13 @@ async def fetch_latest_time(client: httpx.AsyncClient) -> int:
response = await client.get(url)
response.raise_for_status()
data = response.json()
latest_time = int(data[0]["time"])
return int(data[0]["time"])
except httpx.HTTPStatusError as e:
logger.exception("Error fetching latest time: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching latest time: KeyError or IndexError")
raise
else:
return latest_time


async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Stazione]:
Expand All @@ -98,25 +102,23 @@ async def fetch_stations_data(client: httpx.AsyncClient, time: int) -> list[Staz
response = await client.get(url)
response.raise_for_status()
data = response.json()
stazioni = []
for stazione in data:
if "time" not in stazione:
if not stazione["value"]:
stazione["value"] = "-1.0"
stazioni.append(
Stazione(
timestamp=time,
**stazione,
)
)
return [
Stazione(
timestamp=time,
**{
k: v if v is not None else UNKNOWN_VALUE
for k, v in stazione.items()
},
)
for stazione in data
if "time" not in stazione
]
except httpx.HTTPStatusError as e:
logger.exception("Error fetching stations data: %s", e.response.status_code)
raise
except (KeyError, IndexError):
logger.exception("Error fetching stations data: KeyError or IndexError")
raise
else:
return stazioni


async def fetch_time_series(
Expand All @@ -130,7 +132,7 @@ async def fetch_time_series(
try:
response = await client.get(url)
response.raise_for_status()
valori = [Valore(**valore) for valore in response.json()]
return [Valore(**valore) for valore in response.json()]
except httpx.HTTPStatusError:
logger.exception(
"Error fetching time series for stagiong %s %s: %s",
Expand All @@ -146,17 +148,17 @@ async def fetch_time_series(
stazione.idstazione,
)
raise
else:
return valori


async def enrich_data(client: httpx.AsyncClient, stations: list[Stazione]) -> None:
"""Enrich station data with time series values."""
tasks = [fetch_time_series(client, stazione) for stazione in stations]
results = await asyncio.gather(*tasks)
results = await asyncio.gather(*tasks, return_exceptions=True)

for stazione, dati in zip(stations, results):
if dati:
if isinstance(dati, BaseException):
logger.error("Failed to fetch time series for station %s", stazione)
else:
max_value = max(dati, key=lambda x: x.t)
stazione.value = max_value.v
stazione.timestamp = max_value.t
28 changes: 23 additions & 5 deletions app/erfiume/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import os
from decimal import Decimal
from typing import TYPE_CHECKING, Self

import aioboto3
Expand All @@ -17,6 +18,8 @@
if TYPE_CHECKING:
from types import TracebackType

UNKNOWN_VALUE = -9999.0


class AsyncDynamoDB:
"""
Expand Down Expand Up @@ -68,19 +71,34 @@ async def check_and_update_stazioni(self, station: Stazione) -> None:
)

# If the provided station has newer data or the record doesn't exist, update DynamoDB
if station.timestamp > latest_timestamp or not response["Item"]:
logger.info(
"Updating data for station %s (%s)",
station.nomestaz,
station.idstazione,
if station.timestamp > latest_timestamp:
logger.info("Updating data for station %s", station.nomestaz)
await self.table.update_item(
Key={"nomestaz": station.nomestaz},
UpdateExpression="SET #ts = :new_timestamp, #vl = :new_value",
ExpressionAttributeValues={
":new_timestamp": station.timestamp,
":new_value": (
Decimal(str(station.value))
if station.value is not None
else Decimal(str(UNKNOWN_VALUE))
),
},
ExpressionAttributeNames={
"#ts": "timestamp",
"#vl": "value",
},
)
elif not response["Item"]:
logger.info("Creating data for station %s", station.nomestaz)
await self.table.put_item(Item=station.to_dict())
except ClientError as e:
logger.exception(
"Error while checking or updating station %s: %s", station.nomestaz, e
)
raise
except Exception as e:
logger.info("Stazione: %s", station)
logger.exception("Unexpected error: %s", e)
raise

Expand Down
45 changes: 43 additions & 2 deletions app/erfiume/tgbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from .logging import logger
from .storage import AsyncDynamoDB

UNKNOWN_VALUE = -9999.0


async def fetch_bot_token() -> str:
"""
Expand Down Expand Up @@ -55,7 +57,7 @@ def create_station_message(station: Stazione) -> str:
.replace(tzinfo=None)
.strftime("%d-%m-%Y %H:%M")
)
value = float(station.value)
value = float(station.value) # type: ignore [arg-type]
yellow = station.soglia1
orange = station.soglia2
red = station.soglia3
Expand All @@ -66,6 +68,10 @@ def create_station_message(station: Stazione) -> str:
alarm = "🟡"
elif value >= orange and value <= red:
alarm = "🟠"

if value == UNKNOWN_VALUE:
value = "non disponibile" # type: ignore[assignment]
alarm = ""
return cleandoc(
f"""Stazione: {station.nomestaz}
Valore: {value!r} {alarm}
Expand Down Expand Up @@ -104,7 +110,35 @@ async def handle_private_message(
if update.message and update.effective_chat and update.message.text:
logger.info("Received private message: %s", update.message.text)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(update.message.text)
stazione = await dynamo.get_matching_station(
update.message.text.replace("/", "").strip()
)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
chat_id=update.effective_chat.id,
text=message,
)


async def handle_group_message(
update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""
Handle messages writte from private chat to match a specific station
"""

message = cleandoc(
"""Stazione non trovata!
Inserisci esattamente il nome che vedi dalla pagina https://allertameteo.regione.emilia-romagna.it/livello-idrometrico
Ad esempio 'Cesena', 'Lavino di Sopra' o 'S. Carlo'"""
)
if update.message and update.effective_chat and update.message.text:
logger.info("Received group message: %s", update.message.text)
async with AsyncDynamoDB(table_name="Stazioni") as dynamo:
stazione = await dynamo.get_matching_station(
update.message.text.replace("/", "").replace("erfiume_bot", "").strip()
)
if stazione and update.message:
message = create_station_message(stazione)
await context.bot.send_message(
Expand All @@ -125,6 +159,13 @@ async def bot(event: dict[str, Any]) -> None:
handle_private_message,
)
)
application.add_handler(
MessageHandler(
(filters.ChatType.SUPERGROUP | filters.ChatType.GROUP)
& (filters.COMMAND | filters.Regex("@erfiume_bot")),
handle_group_message,
)
)

# Decode the incoming Telegram message
if event.get("body"):
Expand Down
31 changes: 27 additions & 4 deletions pulumi/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@
],
)

utenti_table = dynamodb.Table(
f"{RESOURCES_PREFIX}-users",
name="Utenti",
billing_mode="PAY_PER_REQUEST",
hash_key="chatid",
attributes=[
dynamodb.TableAttributeArgs(
name="chatid",
type="S",
),
],
ttl=dynamodb.TableTtlArgs(
attribute_name="ttl",
enabled=True,
),
)

telegram_token_secret = secretsmanager.Secret(
f"{RESOURCES_PREFIX}-telegram-bot-token",
name="telegram-bot-token",
Expand Down Expand Up @@ -64,14 +81,15 @@
],
inline_policies=[
iam.RoleInlinePolicyArgs(
name="DynamoDBStazioniRW",
name="FetcherRole",
policy=iam.get_policy_document_output(
statements=[
{
"Effect": "Allow",
"Actions": [
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:UpdateItem",
"dynamodb:GetItem",
],
"Resources": [stazioni_table.arn],
Expand Down Expand Up @@ -104,7 +122,7 @@
],
inline_policies=[
iam.RoleInlinePolicyArgs(
name="DynamoSMReadOnly",
name="BotRole",
policy=iam.get_policy_document_output(
statements=[
{
Expand All @@ -113,9 +131,14 @@
"dynamodb:Query",
"dynamodb:GetItem",
],
"Resources": [
f"arn:aws:dynamodb:eu-west-1:{get_caller_identity().account_id}:table/Stazioni"
"Resources": [stazioni_table.arn, utenti_table.arn],
},
{
"Effect": "Allow",
"Actions": [
"dynamodb:PutItem",
],
"Resources": [utenti_table.arn],
},
{
"Effect": "Allow",
Expand Down
Loading