Skip to content

Commit

Permalink
Merge pull request #28 from TimOrme/epa_windows
Browse files Browse the repository at this point in the history
Add EPA window support
  • Loading branch information
TimOrme authored Apr 16, 2023
2 parents d93e335 + cc21868 commit 8304083
Show file tree
Hide file tree
Showing 13 changed files with 844 additions and 83 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ jobs:
uses: extractions/setup-just@v1
- name: Setup Elm
uses: jorelali/setup-elm@v5
- name: Install dev dependencies
run: poetry install --no-root
env:
POETRY_VIRTUALENVS_CREATE: false
- name: Run tests
run: just test
- name: Compile Elm
run: just compile_elm
- name: Build and publish to pypi
Expand Down
6 changes: 2 additions & 4 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
name: PR Checks
on:
push:
branches:
- '**' # matches every branch
- '!main' # excludes main
pull_request:
types: [opened, reopened, synchronize]

Expand Down Expand Up @@ -31,3 +27,5 @@ jobs:
POETRY_VIRTUALENVS_CREATE: false
- name: Check linting
run: just lint
- name: Run tests
run: just test
64 changes: 58 additions & 6 deletions aqimon/aqi_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@
Ranges here are ripped from public sites.
"""

from typing import List, Tuple
from typing import List, Tuple, Optional, Dict
from enum import Enum
import dataclasses


class Pollutant(Enum):
"""Enum of possible pollutans."""

PM_25 = 0
PM_10 = 1


AQI: List[Tuple[int, int]] = [
(0, 50),
Expand All @@ -24,6 +34,34 @@
(350.5, 500.4),
]

PM_10: List[Tuple[float, float]] = [
(0, 54),
(55, 154),
(155, 254),
(255, 354),
(355, 424),
(425, 504),
(505, 604),
]

AQI_LOOKUP_MAP: Dict[Pollutant, List[Tuple[float, float]]] = {Pollutant.PM_25: PM_25, Pollutant.PM_10: PM_10}


@dataclasses.dataclass(frozen=True)
class PollutantReading:
"""A reading for a given pollutant."""

reading: float
pollutant: Pollutant


@dataclasses.dataclass(frozen=True)
class EpaAqi:
"""An EPA AQI value, with the pollutant responsible for the value."""

reading: float
responsible_pollutant: Pollutant


def get_level_from_pm25(pm25: float) -> int:
"""Get the EPA level from a PM25 reading."""
Expand All @@ -33,14 +71,28 @@ def get_level_from_pm25(pm25: float) -> int:
raise ValueError("Invalid PM value")


def calculate_epa_aqi(pm_25_read: float) -> int:
"""Calculate the EPA AQI based on a PM25 reading."""
for i, pm_range in enumerate(PM_25):
if pm_range[0] <= pm_25_read <= pm_range[1]:
def calculate_epa_aqi(readings: List[PollutantReading]) -> Optional[EpaAqi]:
"""Calculate the EPA AQI from a list of pollutant readings.
The worst possible value will be reported.
"""
max_value: Optional[EpaAqi] = None
for reading in readings:
epa_value = calculate_epa_aqi_raw(reading)
if max_value is None or max_value.reading < epa_value:
max_value = EpaAqi(epa_value, reading.pollutant)
return max_value


def calculate_epa_aqi_raw(pollutant_reading: PollutantReading) -> int:
"""Calculate the EPA AQI based on a pollutant reading."""
ranges = AQI_LOOKUP_MAP[pollutant_reading.pollutant]
for i, pm_range in enumerate(ranges):
if pm_range[0] <= pollutant_reading.reading <= pm_range[1]:
aqi_low = AQI[i][0]
aqi_high = AQI[i][1]
pm_low = pm_range[0]
pm_high = pm_range[1]
epa = ((aqi_high - aqi_low) / (pm_high - pm_low)) * (pm_25_read - pm_low) + aqi_low
epa = ((aqi_high - aqi_low) / (pm_high - pm_low)) * (pollutant_reading.reading - pm_low) + aqi_low
return round(epa)
return -1
5 changes: 5 additions & 0 deletions aqimon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class Config:
usb_sleep_time_sec: int
sample_count_per_read: int

# EPA Properties
epa_lookback_minutes: int

# Server properties
server_port: int
server_host: str
Expand All @@ -29,6 +32,7 @@ class Config:
database_path=os.path.expanduser(DEFAULT_DB_PATH),
poll_frequency_sec=60 * 15, # Every 15 minutes
retention_minutes=60 * 24 * 7, # 1 week
epa_lookback_minutes=60 * 8, # 8 hours
reader_type="NOVAPM",
usb_path="/dev/ttyUSB0",
usb_sleep_time_sec=5,
Expand All @@ -50,4 +54,5 @@ def get_config_from_env() -> Config:
sample_count_per_read=int(os.environ.get("AQIMON_SAMPLE_COUNT_PER_READ", DEFAULT_CONFIG.sample_count_per_read)),
server_port=int(os.environ.get("AQIMON_SERVER_PORT", DEFAULT_CONFIG.server_port)),
server_host=os.environ.get("AQIMON_SERVER_HOST", DEFAULT_CONFIG.server_host),
epa_lookback_minutes=int(os.environ.get("AQIMON_EPA_LOOKBACK_MIN", DEFAULT_CONFIG.epa_lookback_minutes)),
)
181 changes: 159 additions & 22 deletions aqimon/database.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,200 @@
"""Database operations."""
from datetime import datetime, timedelta
from typing import Optional
from typing import Optional, List
import databases
from dataclasses import dataclass


async def get_latest_stats(dbconn: databases.Database):
@dataclass(frozen=True)
class AveragedRead:
"""An averaged read result.
Includes the number of reads that went into the average, as well as the oldest timestamp of those reads.
"""

avg_pm25: float
avg_pm10: float
count: int
oldest_read_time: datetime


@dataclass(frozen=True)
class ReadLogEntry:
"""A read log entry."""

event_time: datetime
pm25: float
pm10: float


@dataclass(frozen=True)
class EpaAqiLogEntry:
"""An EPA Aqi log entry."""

event_time: datetime
epa_aqi: float
pollutant: str
read_count: int
oldest_read_time: datetime


async def get_latest_read(dbconn: databases.Database) -> ReadLogEntry:
"""Get the most recent read from the database."""
result = await dbconn.fetch_one("SELECT * FROM aqi_log ORDER BY event_time DESC LIMIT 1")
result = await dbconn.fetch_one("SELECT event_time, pm25, pm10 FROM read_log ORDER BY event_time DESC LIMIT 1")
if result:
return result[0], result[1], result[2], result[3]
return ReadLogEntry(datetime.fromtimestamp(result[0]), result[1], result[2])
else:
return ReadLogEntry(datetime.now(), 0, 0)


async def get_latest_epa_aqi(dbconn: databases.Database) -> EpaAqiLogEntry:
"""Get the most recent EPA AQI from the database."""
result = await dbconn.fetch_one(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log ORDER BY event_time DESC LIMIT 1"
)
if result:
return EpaAqiLogEntry(
event_time=datetime.fromtimestamp(result[0]),
epa_aqi=result[1],
pollutant=result[2],
read_count=result[3],
oldest_read_time=datetime.fromtimestamp(result[4]),
)
else:
return EpaAqiLogEntry(
event_time=datetime.now(), epa_aqi=0, pollutant="NA", read_count=0, oldest_read_time=datetime.now()
)


async def get_all_reads(dbconn: databases.Database, lookback: Optional[datetime]) -> List[ReadLogEntry]:
"""Retrieve all read stats, for a given time window.
If no window is specified, all results are returned.
"""
if lookback:
data = await dbconn.fetch_all(
"SELECT event_time, pm10, pm25 FROM read_log WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": int(lookback.timestamp())},
)
else:
return 0, 0, 0, 0
data = await dbconn.fetch_all("SELECT event_time, pm10, pm25 FROM read_log ORDER BY event_time ASC")

return [ReadLogEntry(event_time=datetime.fromtimestamp(x[0]), pm10=x[1], pm25=x[2]) for x in data]

async def get_all_stats(dbconn: databases.Database, window_delta: Optional[timedelta]):

async def get_all_epa_aqis(dbconn: databases.Database, lookback: Optional[datetime]) -> List[EpaAqiLogEntry]:
"""Retrieve all read stats, for a given time window.
If no window is specified, all results are returned.
"""
if window_delta:
lookback = int((datetime.now() - window_delta).timestamp())
return await dbconn.fetch_all(
"SELECT * FROM aqi_log WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": lookback},
if lookback:
data = await dbconn.fetch_all(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log "
"WHERE event_time >= :lookback ORDER BY event_time ASC",
values={"lookback": int(lookback.timestamp())},
)
else:
return await dbconn.fetch_all("SELECT * FROM aqi_log ORDER BY event_time ASC")
data = await dbconn.fetch_all(
"SELECT event_time, epa_aqi, pollutant, read_count, oldest_read_time "
"FROM epa_aqi_log ORDER BY event_time ASC"
)

return [
EpaAqiLogEntry(
event_time=datetime.fromtimestamp(x[0]),
epa_aqi=x[1],
pollutant=x[2],
read_count=x[3],
oldest_read_time=datetime.fromtimestamp(x[4]),
)
for x in data
]


async def get_averaged_reads(dbconn: databases.Database, lookback_to: datetime) -> Optional[AveragedRead]:
"""Get the average read values, looking back to a certain time.
Note that the lookback will include one additional value outside of the window if it exists. This allows for us to
ensure full coverage of the lookback window.
"""
lookback = int(lookback_to.timestamp())
result = await dbconn.fetch_one(
"SELECT "
"AVG(pm25) as avg_pm25, AVG(pm10) as avg_pm10, COUNT(*) as count, MIN(event_time) as oldest_time "
"FROM read_log "
"WHERE (event_time >= :lookback) OR "
"(event_time = (SELECT MAX(event_time) FROM read_log WHERE event_time <= :lookback)) ORDER BY event_time ASC",
values={"lookback": lookback},
)

async def clean_old(dbconn: databases.Database, retention_minutes: int):
if result is None:
return None
else:
return AveragedRead(
avg_pm25=result[0],
avg_pm10=result[1],
count=result[2],
oldest_read_time=datetime.fromtimestamp(result[3]),
)


async def clean_old(dbconn: databases.Database, retention_minutes: int) -> None:
"""Remove expired database entries.
This is used to keep the database from going infinitely, and allows us to define a retention period.
"""
last_week = datetime.now() - timedelta(minutes=retention_minutes)
last_week_timestamp = int(last_week.timestamp())
await dbconn.execute(
"DELETE FROM aqi_log WHERE event_time < :last_week_timestamp",
"DELETE FROM read_log WHERE event_time < :last_week_timestamp",
values={"last_week_timestamp": last_week_timestamp},
)


async def add_entry(dbconn: databases.Database, event_time, epa_aqi_pm25, raw_pm25, raw_pm10):
"""Add a read entry to the database."""
async def add_epa_read(
dbconn: databases.Database,
event_time: datetime,
epa_aqi: float,
pollutant: str,
read_count: int,
oldest_read_time: datetime,
):
"""Add an EPA read entry to the database."""
formatted_time = int(event_time.timestamp())
formatted_oldest_read_time = int(oldest_read_time.timestamp())
await dbconn.execute(
query="INSERT INTO epa_aqi_log VALUES (:formatted_time, :epa_aqi, :pollutant, :read_count, :oldest_read_time)",
values={
"formatted_time": formatted_time,
"epa_aqi": epa_aqi,
"pollutant": pollutant,
"read_count": read_count,
"oldest_read_time": formatted_oldest_read_time,
},
)


async def add_read(dbconn: databases.Database, event_time: datetime, pm25: float, pm10: float):
"""Add a raw read entry to the database."""
formatted_time = int(event_time.timestamp())
await dbconn.execute(
query="INSERT INTO aqi_log VALUES (:formatted_time, :epa_aqi_pm25, :raw_pm25, :raw_pm10)",
query="INSERT INTO read_log VALUES (:formatted_time, :pm25, :pm10)",
values={
"formatted_time": formatted_time,
"epa_aqi_pm25": epa_aqi_pm25,
"raw_pm25": raw_pm25,
"raw_pm10": raw_pm10,
"pm25": pm25,
"pm10": pm10,
},
)


async def create_tables(dbconn: databases.Database):
"""Create database tables, if they don't already exist."""
await dbconn.execute("""CREATE TABLE IF NOT EXISTS read_log (event_time integer, pm25 real, pm10 real)""")
await dbconn.execute("""CREATE INDEX IF NOT EXISTS read_eventtime ON read_log (event_time)""")
await dbconn.execute(
"""CREATE TABLE IF NOT EXISTS aqi_log (event_time integer, epa_aqi_pm25 real, raw_pm25 real, raw_pm10 real)"""
"""CREATE TABLE IF NOT EXISTS epa_aqi_log
(event_time integer, epa_aqi real, pollutant text, read_count integer, oldest_read_time integer)"""
)
await dbconn.execute("""CREATE INDEX IF NOT EXISTS aqi_eventtime ON aqi_log (event_time)""")
await dbconn.execute("""CREATE INDEX IF NOT EXISTS eqpaqi_eventtime ON epa_aqi_log (event_time)""")
19 changes: 19 additions & 0 deletions aqimon/read/novapm.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,23 @@ def _read(self) -> AqiRead:
data = ser.read(10)
pmtwofive = int.from_bytes(data[2:4], byteorder="little") / 10
pmten = int.from_bytes(data[4:6], byteorder="little") / 10

checksum = data[8]
checksum_vals = sum([data[x] for x in range(2, 8)]) & 255

if data[0:1] != b"\xaa":
raise Exception("Incorrect header read.")

if data[9:10] != b"\xab":
raise Exception("Incorrect footer read.")

if checksum_vals != checksum:
raise Exception(f"Expected read checksum of {checksum}, but got {checksum_vals}")

if pmten > 999:
raise Exception("PM10 value out of range!")

if pmtwofive > 999:
raise Exception("PM2.5 value out of range!")

return AqiRead(pmtwofive, pmten)
Loading

0 comments on commit 8304083

Please sign in to comment.