-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
62 lines (48 loc) · 1.82 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from os import stat
import inotify.adapters
import subprocess
import paho.mqtt.client as mqtt
#from constants import SERVER, PORT, TIMEOUT
statuses = ["NOT_BUSY", "BUSY", "APP_EXIT"]
mqttclient = mqtt.Client()
def mqtt_send(current_status):
mqttclient.connect("localhost", port=1883, keepalive=60)
mqttclient.publish("openbusylight/statuscode", payload=current_status, retain=True)
def main_notifier(listt):
from_function = listt[0]
current_status = listt[1]
mqtt_send(current_status)
print(f"{from_function}: {current_status}")
def cold_run_status_check():
command = ["cat", "/proc/asound/card0/pcm0c/sub0/status"]
command_run = subprocess.Popen(command, stdout=subprocess.PIPE)
try:
subprocess.check_output(('grep', 'RUNNING'), stdin=command_run.stdout)
current_status = statuses[1] # Busy
main_notifier(["Cold Run", current_status])
except subprocess.CalledProcessError:
current_status = statuses[0] # Not Busy
main_notifier(["Cold Run", current_status])
return current_status
def inotify_check():
i = inotify.adapters.Inotify()
i.add_watch('/dev/snd/pcmC0D0c')
for event in i.event_gen(yield_nones=False):
(_, type_names, path, filename) = event
#print(f"PATH=[{path}] EVENT_TYPES={type_names}")
if type_names == ['IN_OPEN']:
current_status = statuses[1]
main_notifier([path, current_status])
elif type_names == ['IN_CLOSE_WRITE']:
current_status = statuses[0]
main_notifier([path, current_status])
# type_names 'IN_OPEN' 'IN_CLOSE_WRITE'
def main():
cold_run_status_check()
inotify_check()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
mqtt_send(statuses[2])
print("CtrlC Interrupt. Exiting...")