-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcode.py
210 lines (173 loc) · 6.2 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from time import sleep, time, localtime, monotonic
from ulab.numpy import mean
from random import choice
from rtc import RTC
from wifi import radio
from socketpool import SocketPool
from ssl import create_default_context
import board
import neopixel
from adafruit_led_animation.animation.solid import Solid
from adafruit_led_animation.animation.rainbowchase import RainbowChase
from adafruit_requests import Session
# Get wifi details and more from a secrets.py file
try:
import settings
except ImportError:
print("WiFi secrets are kept in settings.py, please add them there!")
raise
class HTTPRequestError(Exception):
"""Exception used when making a HTTP request"""
pass
def init():
"""Initialize the system"""
global requests
connect_to_wifi()
requests = Session(SocketPool(radio), create_default_context())
_now = update_clock()
pixels.fill(settings.BLACK)
pixels.show()
return _now.tm_mday
def deque(iterable, value, max_values=10):
"""Simple implementation of collections.deque"""
iterable.append(value)
if len(iterable) > max_values:
iterable.pop()
def connect_to_wifi():
"""Establish connection to WiFi network"""
radio.hostname = settings.WIFI_HOSTNAME
print("My MAC addr:", [hex(i) for i in radio.mac_address])
print("Available WiFi networks:")
pixels.fill(settings.AQUA)
pixels.show()
print("Connecting to %s ..." % settings.WIFI_SSID)
pixels.fill(settings.JADE)
pixels.show()
radio.connect(settings.WIFI_SSID, settings.WIFI_PASSWORD)
print("Connected to %s!" % settings.WIFI_SSID)
pixels.fill(settings.PURPLE)
pixels.show()
print("My IP address is", radio.ipv4_address)
def http_request(method, url, retry_attempts: int = 0, *args, **kwargs):
"""Execute a HTTP Request"""
print(f"{method}: {url}")
kwargs.setdefault('timeout', 1)
_now = monotonic()
try:
resp = requests.request(method, url, *args, **kwargs)
except (RuntimeError, ) as ex:
if retry_attempts < settings.MAX_RETRY_ATTEMPTS:
print(f"{ex}: sleep {settings.SLEEP_INTERVAL}s")
sleep(settings.SLEEP_INTERVAL)
return http_request(method=method, url=url, retry_attempts=retry_attempts+1, *args, **kwargs)
raise ex
deque(rolling_request_duration, monotonic() - _now)
if 429 == resp.status_code >= 500:
if retry_attempts < settings.MAX_RETRY_ATTEMPTS:
sleep(settings.SLEEP_INTERVAL)
return http_request(method=method, url=url, retry_attempts=retry_attempts+1, *args, **kwargs)
if 400 <= resp.status_code > 500:
print(f"Unhandled status_code for url: {url}")
raise HTTPRequestError(resp.content)
return resp
def update_clock():
"""Fetch and set the microcontroller's current UTC time"""
_now = None
try:
print("Updating time from Adafruit IO")
data = http_request(
method='GET',
url="https://io.adafruit.com/api/v2/time/seconds"
).text
print(f"Timestamp: {data}")
_now = localtime(int(data) + settings.TZ_OFFSET * 3600)
clock.datetime = _now
except (HTTPRequestError, OverflowError, ) as ex:
print(ex)
return _now
def get_user_status():
"""Fetch the users status"""
_status = None
try:
data = http_request(
method='GET',
url=f"{settings.WEBEX_BASE_URL}/people/{settings.WEBEX_USER_ID}",
headers={
'Authorization': f'Bearer {settings.WEBEX_BOT_TOKEN}'
}
).json()
_status = data["status"].upper()
print(f"Status: {_status}")
except (HTTPRequestError, ) as ex:
print(ex)
except (KeyError, AttributeError, ) as ex:
print(f"Unable to retrieve status!")
print(ex)
return _status
def animate(sequence):
"""Get an animation object"""
if sequence.animation == 'Solid':
s = Solid(pixels, sequence.color)
elif sequence.animation == 'RainbowChase':
s = RainbowChase(pixels, sequence.speed, size=2, spacing=3, reverse=True)
else:
s = None
print(f"Unknown animation: {sequence.animation}")
return s
def main():
global clock, pixels, rolling_request_duration, timer_check_status, current_status, prev_status, today, seq, requests
# Setup global variables
clock = RTC()
try:
pixels = neopixel.NeoPixel(
getattr(board, settings.PIXEL_PIN), settings.PIXEL_NUM, brightness=settings.PIXEL_BRIGHTNESS, auto_write=False
)
except ValueError:
pixels.deinit()
pixels = neopixel.NeoPixel(
getattr(board, settings.PIXEL_PIN), settings.PIXEL_NUM, brightness=settings.PIXEL_BRIGHTNESS, auto_write=False
)
rolling_request_duration = []
timer_check_status = 0
current_status = ''
prev_status = ''
today = None
seq = None
requests = None
# Start system
today = init()
while True:
loop()
def loop():
global current_status, prev_status, today, timer_check_status, seq
if isinstance(seq, Solid) and seq.colors[-1] == settings.BLACK:
sleep(settings.SLEEP_INTERVAL - mean(rolling_request_duration))
ts_now = time()
now = localtime(ts_now)
# update the clock every day
if now.tm_mday != today:
today = update_clock().tm_mday
# don't query during after-hours
if settings.HOUR_START_OF_DAY > now.tm_hour > settings.HOUR_END_OF_DAY:
print("Outside business hours, will not query")
return
# check if there's a new status every so often
if timer_check_status < ts_now:
timer_check_status = ts_now + settings.CHECK_STATUS_EVERY_N_SEC
current_status = get_user_status()
if not hasattr(settings, f'STATUS_{current_status}'):
print(f"Unknown status: {current_status}")
return
if current_status != prev_status:
seq = animate(sequence=getattr(settings, f'STATUS_{current_status}'))
prev_status = current_status
if hasattr(seq, 'animate'):
if hasattr(seq, 'reverse'):
seq.reverse = round(monotonic() / 60, 0) % 2 == 0
seq.animate()
while True:
try:
main()
except Exception as ex:
print(ex)
sleep(10)