-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsnifferbuddyreadings_code.py
78 lines (68 loc) · 2.82 KB
/
snifferbuddyreadings_code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#############################################################################
# SPDX-FileCopyrightText: 2023 Margaret Johnson
#
# SPDX-License-Identifier: MIT
#
#############################################################################
from logginghandler import LoggingHandler
import json
class SnifferBuddyReadings:
def __init__(self, mqtt_payload):
# Set up logging. LoggingHandler gives stack trace information.
self.logger = LoggingHandler()
# self.logger.debug("-> Initializing SnifferBuddy class.")
self.valid_packet = True
# The air quality sensor's name in the mqtt message. If it isn't there, we assume
# This payload is not from a SnifferBuddy.
try:
# Convert the JSON string into a dictionary
mqtt_payload_dict = json.loads(mqtt_payload)
self.dict = self._make_dict(mqtt_payload_dict)
except Exception as e:
self.logger.error(f"Could not identify the air quality sensor. Error: {e}")
self.valid_packet = False
def _find_sensor_name_in_payload(self, mqtt_payload_dict) -> str:
sensor_names_known_set = set(["scd30", "scd40", "scd41"])
# Get the first key in the dictionary (which is the sensor name)
sensor_name_in_mqtt = next(iter(mqtt_payload_dict))
if sensor_name_in_mqtt in sensor_names_known_set:
return sensor_name_in_mqtt
else:
# Create a comma-separated string of the sensor names
sensor_names_str = ", ".join(sensor_names_known_set)
raise NameError(
f"Could not find a valid sensor. Current valid sensors include {sensor_names_str}"
)
def _make_dict(self, mqtt_payload_dict) -> dict:
# check to make sure the dictionary is valid.
try:
sensor_name = self._find_sensor_name_in_payload(mqtt_payload_dict)
except Exception as e:
self.logger.error(f" Error: {e}")
# Simplify the dictionary to the core properties.
sensor_data = mqtt_payload_dict[sensor_name]
return {
"name": sensor_data["name"],
"version": sensor_data["version"],
"co2": sensor_data["co2"],
"humidity": sensor_data["humidity"],
"light": sensor_data["light"],
"temperature": sensor_data["temperature"],
"unit": sensor_data["unit"],
"vpd": sensor_data["vpd"],
}
@property
def temperature(self) -> float:
return self.dict["temperature"]
@property
def humidity(self) -> float:
return self.dict["humidity"]
@property
def co2(self) -> float:
return self.dict["co2"]
@property
def vpd(self) -> float:
return self.dict["vpd"]
@property
def light_level(self) -> str:
return self.dict["light"]