Skip to content

Commit

Permalink
Add EPA window support
Browse files Browse the repository at this point in the history
TimOrme committed Apr 16, 2023
1 parent d93e335 commit 310267e
Showing 11 changed files with 671 additions and 45 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -12,6 +12,12 @@ jobs:
uses: extractions/setup-just@v1
- name: Setup Elm
uses: jorelali/setup-elm@v5
- name: Install dev dependencies
run: poetry install --no-root
env:
POETRY_VIRTUALENVS_CREATE: false
- name: Run tests
run: just test
- name: Compile Elm
run: just compile_elm
- name: Build and publish to pypi
2 changes: 2 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -31,3 +31,5 @@ jobs:
POETRY_VIRTUALENVS_CREATE: false
- name: Check linting
run: just lint
- name: Run tests
run: just test
64 changes: 58 additions & 6 deletions aqimon/aqi_common.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,17 @@
Ranges here are ripped from public sites.
"""

from typing import List, Tuple
from typing import List, Tuple, Optional, Dict
from enum import Enum
import dataclasses


class Pollutant(Enum):
"""Enum of possible pollutans."""

PM_25 = 0
PM_10 = 1


AQI: List[Tuple[int, int]] = [
(0, 50),
@@ -24,6 +34,34 @@
(350.5, 500.4),
]

PM_10: List[Tuple[float, float]] = [
(0, 54),
(55, 154),
(155, 254),
(255, 354),
(355, 424),
(425, 504),
(505, 604),
]

AQI_LOOKUP_MAP: Dict[Pollutant, List[Tuple[float, float]]] = {Pollutant.PM_25: PM_25, Pollutant.PM_10: PM_10}


@dataclasses.dataclass(frozen=True)
class PollutantReading:
"""A reading for a given pollutant."""

reading: float
pollutant: Pollutant


@dataclasses.dataclass(frozen=True)
class EpaAqi:
"""An EPA AQI value, with the pollutant responsible for the value."""

reading: float
responsible_pollutant: Pollutant


def get_level_from_pm25(pm25: float) -> int:
"""Get the EPA level from a PM25 reading."""
@@ -33,14 +71,28 @@ def get_level_from_pm25(pm25: float) -> int:
raise ValueError("Invalid PM value")


def calculate_epa_aqi(pm_25_read: float) -> int:
"""Calculate the EPA AQI based on a PM25 reading."""
for i, pm_range in enumerate(PM_25):
if pm_range[0] <= pm_25_read <= pm_range[1]:
def calculate_epa_aqi(readings: List[PollutantReading]) -> Optional[EpaAqi]:
"""Calculate the EPA AQI from a list of pollutant readings.
The worst possible value will be reported.
"""
max_value: Optional[EpaAqi] = None
for reading in readings:
epa_value = calculate_epa_aqi_raw(reading)
if max_value is None or max_value.reading < epa_value:
max_value = EpaAqi(epa_value, reading.pollutant)
return max_value


def calculate_epa_aqi_raw(pollutant_reading: PollutantReading) -> int:
"""Calculate the EPA AQI based on a pollutant reading."""
ranges = AQI_LOOKUP_MAP[pollutant_reading.pollutant]
for i, pm_range in enumerate(ranges):
if pm_range[0] <= pollutant_reading.reading <= pm_range[1]:
aqi_low = AQI[i][0]
aqi_high = AQI[i][1]
pm_low = pm_range[0]
pm_high = pm_range[1]
epa = ((aqi_high - aqi_low) / (pm_high - pm_low)) * (pm_25_read - pm_low) + aqi_low
epa = ((aqi_high - aqi_low) / (pm_high - pm_low)) * (pollutant_reading.reading - pm_low) + aqi_low
return round(epa)
return -1
5 changes: 5 additions & 0 deletions aqimon/config.py
Original file line number Diff line number Diff line change
@@ -20,6 +20,9 @@ class Config:
usb_sleep_time_sec: int
sample_count_per_read: int

# EPA Properties
epa_lookback_minutes: int

# Server properties
server_port: int
server_host: str
@@ -29,6 +32,7 @@ class Config:
database_path=os.path.expanduser(DEFAULT_DB_PATH),
poll_frequency_sec=60 * 15, # Every 15 minutes
retention_minutes=60 * 24 * 7, # 1 week
epa_lookback_minutes=60 * 8, # 8 hours
reader_type="NOVAPM",
usb_path="/dev/ttyUSB0",
usb_sleep_time_sec=5,
@@ -50,4 +54,5 @@ def get_config_from_env() -> Config:
sample_count_per_read=int(os.environ.get("AQIMON_SAMPLE_COUNT_PER_READ", DEFAULT_CONFIG.sample_count_per_read)),
server_port=int(os.environ.get("AQIMON_SERVER_PORT", DEFAULT_CONFIG.server_port)),
server_host=os.environ.get("AQIMON_SERVER_HOST", DEFAULT_CONFIG.server_host),
epa_lookback_minutes=int(os.environ.get("AQIMON_EPA_LOOKBACK_MIN", DEFAULT_CONFIG.epa_lookback_minutes)),
)
181 changes: 159 additions & 22 deletions aqimon/database.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,200 @@
"""Database operations."""
from datetime import datetime, timedelta
from typing import Optional
from typing import Optional, List
import databases
from dataclasses import dataclass


async def get_latest_stats(dbconn: databases.Database):
@dataclass(frozen=True)
class AveragedRead:
"""An averaged read result.
Includes the number of reads that went into the average, as well as the oldest timestamp of those reads.
"""

avg_pm25: float
avg_pm10: float
count: int
oldest_read_time: datetime


@dataclass(frozen=True)
class ReadLogEntry:
"""A read log entry."""

event_time: datetime
pm25: float
pm10: float


@dataclass(frozen=True)
class EpaAqiLogEntry:
"""An EPA Aqi log entry."""

event_time: datetime
epa_aqi: float
pollutant: str
read_count: int
oldest_read_time: datetime


async def get_latest_read(dbconn: databases.Database) -> ReadLogEntry:
"""Get the most recent read from the database."""
result = await dbconn.fetch_one("SELECT * FROM aqi_log ORDER BY event_time DESC LIMIT 1")
result = await dbconn.fetch_one("SELECT event_time, pm25, pm10 FROM read_log ORDER BY event_time DESC LIMIT 1")
if result:
return result[0], result[1], result[2], result[3]
return ReadLogEntry(datetime.fromtimestamp(result[0]), result[1], result[2])
else:
return ReadLogEntry(datetime.now(), 0, 0)


async def get_latest_epa_aqi(dbconn: databases.Database) -> EpaAqiLogEntry:
"""Get the most recent EPA AQI from the database."""
result = await dbconn.fetch_one(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log ORDER BY event_time DESC LIMIT 1"
)
if result:
return EpaAqiLogEntry(
event_time=datetime.fromtimestamp(result[0]),
epa_aqi=result[1],
pollutant=result[2],
read_count=result[3],
oldest_read_time=datetime.fromtimestamp(result[4]),
)
else:
return EpaAqiLogEntry(
event_time=datetime.now(), epa_aqi=0, pollutant="NA", read_count=0, oldest_read_time=datetime.now()
)


async def get_all_reads(dbconn: databases.Database, lookback: Optional[datetime]) -> List[ReadLogEntry]:
"""Retrieve all read stats, for a given time window.
If no window is specified, all results are returned.
"""
if lookback:
data = await dbconn.fetch_all(
"SELECT event_time, pm10, pm25 FROM read_log WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": int(lookback.timestamp())},
)
else:
return 0, 0, 0, 0
data = await dbconn.fetch_all("SELECT event_time, pm10, pm25 FROM read_log ORDER BY event_time ASC")

return [ReadLogEntry(event_time=datetime.fromtimestamp(x[0]), pm10=x[1], pm25=x[2]) for x in data]

async def get_all_stats(dbconn: databases.Database, window_delta: Optional[timedelta]):

async def get_all_epa_aqis(dbconn: databases.Database, lookback: Optional[datetime]) -> List[EpaAqiLogEntry]:
"""Retrieve all read stats, for a given time window.
If no window is specified, all results are returned.
"""
if window_delta:
lookback = int((datetime.now() - window_delta).timestamp())
return await dbconn.fetch_all(
"SELECT * FROM aqi_log WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": lookback},
if lookback:
data = await dbconn.fetch_all(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log "
"WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": int(lookback.timestamp())},
)
else:
return await dbconn.fetch_all("SELECT * FROM aqi_log ORDER BY event_time ASC")
data = await dbconn.fetch_all(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log ORDER BY event_time ASC"
)

return [
EpaAqiLogEntry(
event_time=datetime.fromtimestamp(x[0]),
epa_aqi=x[1],
pollutant=x[2],
read_count=x[3],
oldest_read_time=datetime.fromtimestamp(x[4]),
)
for x in data
]


async def get_averaged_reads(dbconn: databases.Database, lookback_to: datetime) -> Optional[AveragedRead]:
"""Get the average read values, looking back to a certain time.
Note that the lookback will include one additional value outside of the window if it exists. This allows for us to
ensure full coverage of the lookback window.
"""
lookback = int(lookback_to.timestamp())
result = await dbconn.fetch_one(
"SELECT "
"AVG(pm25) as avg_pm25, AVG(pm10) as avg_pm10, COUNT(*) as count, MIN(event_time) as oldest_time "
"FROM read_log "
"WHERE (event_time >= :lookback) OR "
"(event_time = (SELECT MAX(event_time) FROM read_log WHERE event_time <= :lookback)) ORDER BY event_time ASC",
values={"lookback": lookback},
)

async def clean_old(dbconn: databases.Database, retention_minutes: int):
if result is None:
return None
else:
return AveragedRead(
avg_pm25=result[0],
avg_pm10=result[1],
count=result[2],
oldest_read_time=datetime.fromtimestamp(result[3]),
)


async def clean_old(dbconn: databases.Database, retention_minutes: int) -> None:
"""Remove expired database entries.
This is used to keep the database from going infinitely, and allows us to define a retention period.
"""
last_week = datetime.now() - timedelta(minutes=retention_minutes)
last_week_timestamp = int(last_week.timestamp())
await dbconn.execute(
"DELETE FROM aqi_log WHERE event_time < :last_week_timestamp",
"DELETE FROM read_log WHERE event_time < :last_week_timestamp",
values={"last_week_timestamp": last_week_timestamp},
)


async def add_entry(dbconn: databases.Database, event_time, epa_aqi_pm25, raw_pm25, raw_pm10):
"""Add a read entry to the database."""
async def add_epa_read(
dbconn: databases.Database,
event_time: datetime,
epa_aqi: float,
pollutant: str,
read_count: int,
oldest_read_time: datetime,
):
"""Add an EPA read entry to the database."""
formatted_time = int(event_time.timestamp())
formatted_oldest_read_time = int(oldest_read_time.timestamp())
await dbconn.execute(
query="INSERT INTO epa_aqi_log VALUES (:formatted_time, :epa_aqi, :pollutant, :read_count, :oldest_read_time)",
values={
"formatted_time": formatted_time,
"epa_aqi": epa_aqi,
"pollutant": pollutant,
"read_count": read_count,
"oldest_read_time": formatted_oldest_read_time,
},
)


async def add_read(dbconn: databases.Database, event_time: datetime, pm25: float, pm10: float):
"""Add a raw read entry to the database."""
formatted_time = int(event_time.timestamp())
await dbconn.execute(
query="INSERT INTO aqi_log VALUES (:formatted_time, :epa_aqi_pm25, :raw_pm25, :raw_pm10)",
query="INSERT INTO read_log VALUES (:formatted_time, :pm25, :pm10)",
values={
"formatted_time": formatted_time,
"epa_aqi_pm25": epa_aqi_pm25,
"raw_pm25": raw_pm25,
"raw_pm10": raw_pm10,
"pm25": pm25,
"pm10": pm10,
},
)


async def create_tables(dbconn: databases.Database):
"""Create database tables, if they don't already exist."""
await dbconn.execute("""CREATE TABLE IF NOT EXISTS read_log (event_time integer, pm25 real, pm10 real)""")
await dbconn.execute("""CREATE INDEX IF NOT EXISTS read_eventtime ON read_log (event_time)""")
await dbconn.execute(
"""CREATE TABLE IF NOT EXISTS aqi_log (event_time integer, epa_aqi_pm25 real, raw_pm25 real, raw_pm10 real)"""
"""CREATE TABLE IF NOT EXISTS epa_aqi_log
(event_time integer, epa_aqi real, pollutant text, read_count integer, oldest_read_time integer)"""
)
await dbconn.execute("""CREATE INDEX IF NOT EXISTS aqi_eventtime ON aqi_log (event_time)""")
await dbconn.execute("""CREATE INDEX IF NOT EXISTS eqpaqi_eventtime ON epa_aqi_log (event_time)""")
19 changes: 19 additions & 0 deletions aqimon/read/novapm.py
Original file line number Diff line number Diff line change
@@ -69,4 +69,23 @@ def _read(self) -> AqiRead:
data = ser.read(10)
pmtwofive = int.from_bytes(data[2:4], byteorder="little") / 10
pmten = int.from_bytes(data[4:6], byteorder="little") / 10

checksum = data[8]
checksum_vals = sum([data[x] for x in range(2, 8)]) & 255

if data[0:1] != b"\xaa":
raise Exception("Incorrect header read.")

if data[9:10] != b"\xab":
raise Exception("Incorrect footer read.")

if checksum_vals != checksum:
raise Exception(f"Expected read checksum of {checksum}, but got {checksum_vals}")

if pmten > 999:
raise Exception("PM10 value out of range!")

if pmtwofive > 999:
raise Exception("PM2.5 value out of range!")

return AqiRead(pmtwofive, pmten)
56 changes: 41 additions & 15 deletions aqimon/server.py
Original file line number Diff line number Diff line change
@@ -8,12 +8,7 @@
import databases
from pathlib import Path
from datetime import datetime, timedelta
from .database import (
get_all_stats,
add_entry,
create_tables,
clean_old,
)
from .database import get_all_reads, add_read, add_epa_read, get_averaged_reads, create_tables, clean_old, ReadLogEntry
from .read import AqiRead, Reader
from .read.mock import MockReader
from .read.novapm import NovaPmReader
@@ -22,7 +17,7 @@
import logging
from functools import lru_cache
from dataclasses import dataclass
from typing import Optional
from typing import Optional, List

log = logging.getLogger(__name__)

@@ -32,6 +27,8 @@
static_dir = project_root / "static"
app.mount("/static", StaticFiles(directory=static_dir), name="static")

POLLUTANT_MAP = {aqi_common.Pollutant.PM_10: "PM10", aqi_common.Pollutant.PM_25: "PM25"}


@dataclass
class ScheduledReader:
@@ -123,14 +120,40 @@ async def read_function() -> None:
scheduled_reader.next_schedule = datetime.now() + timedelta(seconds=config.poll_frequency_sec)
result: AqiRead = await scheduled_reader.reader.read()
event_time = datetime.now()
epa_aqi_pm25 = aqi_common.calculate_epa_aqi(result.pmtwofive)
await add_entry(
await add_read(
dbconn=database,
event_time=event_time,
epa_aqi_pm25=epa_aqi_pm25,
raw_pm25=result.pmtwofive,
raw_pm10=result.pmten,
pm25=result.pmtwofive,
pm10=result.pmten,
)

averaged_reads = await get_averaged_reads(
dbconn=database, lookback_to=event_time - timedelta(minutes=config.epa_lookback_minutes)
)
if averaged_reads:
read_list = [
aqi_common.PollutantReading(averaged_reads.avg_pm25, aqi_common.Pollutant.PM_25),
aqi_common.PollutantReading(averaged_reads.avg_pm10, aqi_common.Pollutant.PM_10),
]
epa_aqi = aqi_common.calculate_epa_aqi(read_list)

if epa_aqi:
pollutant = POLLUTANT_MAP.get(epa_aqi.responsible_pollutant)

if pollutant is None:
raise Exception(f"Invalid Pollutant! {epa_aqi.responsible_pollutant}")

await add_epa_read(
dbconn=database,
event_time=event_time,
epa_aqi=epa_aqi.reading,
pollutant=pollutant,
read_count=averaged_reads.count,
oldest_read_time=averaged_reads.oldest_read_time,
)
else:
log.warning("No EPA Value was calculated.")

await clean_old(dbconn=database, retention_minutes=config.retention_minutes)
except Exception as e:
log.exception("Failed to retrieve data from reader", e)
@@ -141,9 +164,9 @@ async def read_function() -> None:
await repeater(read_function)()


def convert_all_to_view_dict(results):
def convert_all_to_view_dict(results: List[ReadLogEntry]):
"""Convert data result to dictionary for view."""
view = [{"t": int(x[0]), "epa": x[1], "pm25": x[2], "pm10": x[3]} for x in results]
view = [{"t": int(x.event_time.timestamp()), "epa": 123.0, "pm25": x.pm25, "pm10": x.pm10} for x in results]
return view


@@ -166,7 +189,10 @@ async def all_data(
window_delta = timedelta(days=1)
elif window == "week":
window_delta = timedelta(weeks=1)
all_stats = await get_all_stats(database, window_delta)
if window_delta:
all_stats = await get_all_reads(database, datetime.now() - window_delta)
else:
all_stats = await get_all_reads(database, None)
all_json = convert_all_to_view_dict(all_stats)
return all_json

9 changes: 8 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Compile client code
build: install_deps compile_elm

# Install dependencies for project
install_deps:
poetry install --no-root

# Compile elm code for production
compile_elm:
cd elm && elm make src/Main.elm --optimize --output=../aqimon/static/elm.js

# Compile elm code in development mode
compile_elm_dev:
cd elm && elm make src/Main.elm --output=../aqimon/static/elm.js

@@ -20,4 +23,8 @@ lint:
# Format code
format:
black .
elm-format --yes elm/
elm-format --yes elm/

# Run tests
test:
python -m pytest tests/
87 changes: 86 additions & 1 deletion poetry.lock
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -19,6 +19,8 @@ fastapi-utils = "^0.2.1"
ruff = "^0.0.256"
black = "^23.1.0"
mypy = "^1.1.1"
pytest = "^7.3.1"
pytest-asyncio = "^0.21.0"

[build-system]
requires = ["poetry-core"]
@@ -33,6 +35,9 @@ select = ["E", "F", "D"]
ignore = ["D203", "D213"]
line-length = 120

[tool.ruff.per-file-ignores]
"tests/*" = ["D103", "D104", "D100"] # We dont need docstrings in tests

[tool.black]
line-length = 120

282 changes: 282 additions & 0 deletions tests/test_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
import pytest
import pytest_asyncio
import databases

from aqimon import database
from datetime import timedelta, datetime


@pytest_asyncio.fixture
async def database_conn():
"""Fixture to set up the in-memory database with test data."""
dbconn = databases.Database("sqlite+aiosqlite://:memory:", force_rollback=True)
await dbconn.connect()
await database.create_tables(dbconn)
yield dbconn
await dbconn.disconnect()


@pytest.mark.asyncio
async def test_get_latest_read(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=4), pm10=2, pm25=3)

result = await database.get_latest_read(database_conn)
assert result.pm10 == 1.0
assert result.pm25 == 2.0
assert result.event_time == current_time - timedelta(hours=2)


@pytest.mark.asyncio
async def test_get_latest_read_no_data(database_conn):
result = await database.get_latest_read(database_conn)
assert result.pm10 == 0.0
assert result.pm25 == 0.0
assert result.event_time is not None


@pytest.mark.asyncio
async def test_get_latest_epa_aqi(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=2),
epa_aqi=3.5,
read_count=3,
pollutant="PM25",
oldest_read_time=current_time - timedelta(days=3),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=4),
epa_aqi=3.7,
read_count=5,
pollutant="PM25",
oldest_read_time=current_time - timedelta(days=3),
)

result = await database.get_latest_epa_aqi(database_conn)
assert result.epa_aqi == 3.5
assert result.event_time == current_time - timedelta(hours=2)


@pytest.mark.asyncio
async def test_get_latest_epq_aqi_no_data(database_conn):
result = await database.get_latest_epa_aqi(database_conn)
assert result.epa_aqi == 0
assert result.event_time is not None


@pytest.mark.asyncio
async def test_get_all_reads(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_read(database_conn, current_time - timedelta(hours=1), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=2, pm25=3)
await database.add_read(database_conn, current_time - timedelta(hours=3), pm10=3, pm25=4)

result = await database.get_all_reads(database_conn, lookback=None)
assert len(result) == 3
assert result[2].pm10 == 1.0
assert result[2].pm25 == 2.0
assert result[2].event_time == current_time - timedelta(hours=1)

assert result[0].pm10 == 3.0
assert result[0].pm25 == 4.0
assert result[0].event_time == current_time - timedelta(hours=3)


@pytest.mark.asyncio
async def test_get_all_reads_with_window(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_read(database_conn, current_time - timedelta(hours=1), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=2, pm25=3)
await database.add_read(database_conn, current_time - timedelta(hours=3), pm10=3, pm25=4)

result = await database.get_all_reads(database_conn, current_time - timedelta(hours=2, minutes=30))
assert len(result) == 2
assert result[1].pm10 == 1.0
assert result[1].pm25 == 2.0
assert result[1].event_time == current_time - timedelta(hours=1)

assert result[0].pm10 == 2.0
assert result[0].pm25 == 3.0
assert result[0].event_time == current_time - timedelta(hours=2)


@pytest.mark.asyncio
async def test_get_all_epa_aqi(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=1),
epa_aqi=2,
pollutant="PM25",
read_count=5,
oldest_read_time=current_time - timedelta(days=3),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=2),
epa_aqi=3,
pollutant="PM10",
read_count=20,
oldest_read_time=current_time - timedelta(days=60),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=3),
epa_aqi=4,
pollutant="PM25",
read_count=10,
oldest_read_time=current_time - timedelta(days=30),
)

result = await database.get_all_epa_aqis(database_conn, lookback=None)
assert len(result) == 3
assert result[2].epa_aqi == 2.0
assert result[2].read_count == 5
assert result[2].pollutant == "PM25"
assert result[2].oldest_read_time == current_time - timedelta(days=3)
assert result[2].event_time == current_time - timedelta(hours=1)

assert result[0].epa_aqi == 4.0
assert result[0].read_count == 10
assert result[0].pollutant == "PM25"
assert result[0].oldest_read_time == current_time - timedelta(days=30)
assert result[0].event_time == current_time - timedelta(hours=3)


@pytest.mark.asyncio
async def test_get_all_epa_aqi_with_window(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=1),
epa_aqi=2,
pollutant="PM25",
read_count=5,
oldest_read_time=current_time - timedelta(days=3),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=2),
epa_aqi=3,
pollutant="PM10",
read_count=20,
oldest_read_time=current_time - timedelta(days=60),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=3),
epa_aqi=4,
pollutant="PM25",
read_count=10,
oldest_read_time=current_time - timedelta(days=30),
)

result = await database.get_all_epa_aqis(database_conn, current_time - timedelta(hours=2, minutes=30))
assert len(result) == 2
assert result[1].epa_aqi == 2.0
assert result[1].read_count == 5
assert result[1].pollutant == "PM25"
assert result[1].oldest_read_time == current_time - timedelta(days=3)
assert result[1].event_time == current_time - timedelta(hours=1)

assert result[0].epa_aqi == 3.0
assert result[0].read_count == 20
assert result[0].pollutant == "PM10"
assert result[0].oldest_read_time == current_time - timedelta(days=60)
assert result[0].event_time == current_time - timedelta(hours=2)


@pytest.mark.asyncio
async def test_get_averaged_reads(database_conn):
# Add reads every two hours
current_time = datetime(2020, 1, 1, 12, 0, 0)
lookback_to = current_time - timedelta(hours=8)
await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=4), pm10=2, pm25=3)
await database.add_read(database_conn, current_time - timedelta(hours=6), pm10=3, pm25=4)
await database.add_read(database_conn, current_time - timedelta(hours=8), pm10=4, pm25=5)

result = await database.get_averaged_reads(database_conn, lookback_to)
assert result.count == 4
assert result.avg_pm10 == 2.5
assert result.avg_pm25 == 3.5
assert result.oldest_read_time == current_time - timedelta(hours=8)


@pytest.mark.asyncio
async def test_get_averaged_reads_looks_past(database_conn):
current_time = datetime(2020, 1, 1, 12, 0, 0)
lookback_to = current_time - timedelta(hours=8)
await database.add_read(database_conn, current_time - timedelta(hours=6), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=7), pm10=2, pm25=3)
# Should be included since its the read just after the lookback
await database.add_read(database_conn, current_time - timedelta(hours=8, minutes=5), pm10=3, pm25=4)
# Should be excluded
await database.add_read(database_conn, current_time - timedelta(hours=9), pm10=4, pm25=5)

result = await database.get_averaged_reads(database_conn, lookback_to)
assert result.count == 3
assert result.avg_pm10 == 2.0
assert result.avg_pm25 == 3.0
assert result.oldest_read_time == current_time - timedelta(hours=8, minutes=5)


@pytest.mark.asyncio
async def test_clean_old(database_conn):
current_time = datetime.now()
await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=4), pm10=2, pm25=3)
# These should be deleted
await database.add_read(database_conn, current_time - timedelta(hours=6), pm10=3, pm25=4)
await database.add_read(database_conn, current_time - timedelta(hours=8), pm10=4, pm25=5)

await database.clean_old(database_conn, retention_minutes=(60 * 4) + 30)

result = await database.get_all_reads(database_conn, lookback=None)
assert len(result) == 2
assert result[0].event_time == (current_time - timedelta(hours=4)).replace(microsecond=0)
assert result[1].event_time == (current_time - timedelta(hours=2)).replace(microsecond=0)


@pytest.mark.asyncio
async def test_add_read(database_conn):
current_time = datetime.now()

await database.add_read(database_conn, current_time - timedelta(hours=2), pm10=1, pm25=2)
await database.add_read(database_conn, current_time - timedelta(hours=4), pm10=2, pm25=3)

result = await database.get_all_reads(database_conn, lookback=None)
assert len(result) == 2
assert result[0].event_time == (current_time - timedelta(hours=4)).replace(microsecond=0)
assert result[1].event_time == (current_time - timedelta(hours=2)).replace(microsecond=0)


@pytest.mark.asyncio
async def test_add_epa_read(database_conn):
current_time = datetime.now()

await database.add_epa_read(
database_conn,
current_time - timedelta(hours=2),
epa_aqi=2,
pollutant="PM25",
read_count=5,
oldest_read_time=current_time - timedelta(days=3),
)
await database.add_epa_read(
database_conn,
current_time - timedelta(hours=4),
epa_aqi=3,
pollutant="PM10",
read_count=20,
oldest_read_time=current_time - timedelta(days=60),
)

result = await database.get_all_epa_aqis(database_conn, lookback=None)
assert len(result) == 2
assert result[0].event_time == (current_time - timedelta(hours=4)).replace(microsecond=0)
assert result[1].event_time == (current_time - timedelta(hours=2)).replace(microsecond=0)

0 comments on commit 310267e

Please sign in to comment.