-
Notifications
You must be signed in to change notification settings - Fork 1
/
esp32_sgp30.py
100 lines (87 loc) · 3.62 KB
/
esp32_sgp30.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import time
from micropython import const
_SGP30_DEFAULT_I2C_ADDR = const(0x58)
_SGP30_FEATURESETS = (0x0020, 0x0022)
_SGP30_CRC8_POLYNOMIAL = const(0x31)
_SGP30_CRC8_INIT = const(0xFF)
_SGP30_WORD_LEN = const(2)
class SGP30:
"""
A driver for the SGP30 gas sensor.
:param i2c: The I2C object to use. This is the only required parameter.
:param int address: (optional) The I2C address of the device.
"""
def __init__(self, i2c, address=_SGP30_DEFAULT_I2C_ADDR):
"""Initialize the sensor, get the serial # and verify that we found a proper SGP30"""
self._i2c = i2c
self._addr = address
self.serial = self._i2c_read_words_from_cmd(command=[0x36, 0x82], reply_size=3, delay=0.01)
featureset = self._i2c_read_words_from_cmd([0x20, 0x2f], 1, 0.01)
if featureset[0] != _SGP30_FEATURESETS:
raise RuntimeError('SGP30 Not detected')
self.initialise_indoor_air_quality()
def total_organic_compound(self):
"""Total Volatile Organic Compound in parts per billion."""
return self.indoor_air_quality[1]
def baseline_total_organic_compound(self):
"""Total Volatile Organic Compound baseline value"""
return self.indoor_air_quality_baseline[1]
def co2_equivalent(self):
"""Carbon Dioxide Equivalent in parts per million"""
return self.indoor_air_quality[0]
def baseline_co2_equivilant(self):
"""Carbon Dioxide Equivalent baseline value"""
return self.indoor_air_quality_baseline[0]
def initialise_indoor_air_quality(self):
"""Initialize the IAQ algorithm"""
# name, command, signals, delay
self._i2c_read_words_from_cmd(command=[0x20, 0x03], reply_size=0, delay=0.01)
@property
def indoor_air_quality(self):
"""Measure the CO2eq and TVOC"""
# name, command, signals, delay
return self._i2c_read_words_from_cmd(command=[0x20, 0x08], reply_size=2, delay=0.05)
@property
def indoor_air_quality_baseline(self):
"""Get the IAQ algorithm baseline for CO2eq and TVOC"""
# name, command, signals, delay
return self._i2c_read_words_from_cmd(command=[0x20, 0x15], reply_size=2, delay=0.01)
def set_indoor_air_quality_baseline(self, co2_equivalent, total_volatile_organic_compounds):
"""Set the previously recorded IAQ algorithm baseline for CO2eq and TVOC"""
if co2_equivalent == 0 and total_volatile_organic_compounds == 0:
raise RuntimeError('Invalid baseline')
buffer = []
for value in [total_volatile_organic_compounds, co2_equivalent]:
arr = [value >> 8, value & 0xFF]
arr.append(self.generate_crc(arr))
buffer += arr
self._i2c_read_words_from_cmd(command=[0x20, 0x1e] + buffer, reply_size=0, delay=0.01)
# Low level command functions
def _i2c_read_words_from_cmd(self, command, reply_size, delay):
"""Run an SGP command query, get a reply and CRC results if necessary"""
self._i2c.writeto(self._addr, bytes(command))
time.sleep(delay)
if not reply_size:
return None
crc_result = bytearray(reply_size * (_SGP30_WORD_LEN + 1))
self._i2c.readfrom_into(self._addr, crc_result)
result = []
for i in range(reply_size):
word = [crc_result[3*i], crc_result[3*i+1]]
crc = crc_result[3*i+2]
if self.generate_crc(word) != crc:
raise RuntimeError('CRC Error')
result.append(word[0] << 8 | word[1])
return result
def generate_crc(self, data):
"""8-bit CRC algorithm for checking data"""
crc = _SGP30_CRC8_INIT
# calculates 8-Bit checksum with given polynomial
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ _SGP30_CRC8_POLYNOMIAL
else:
crc <<= 1
return crc & 0xFF