-
Notifications
You must be signed in to change notification settings - Fork 9
/
solarflow-topic-mapper.py
149 lines (126 loc) · 5.32 KB
/
solarflow-topic-mapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import random, json, time, logging, sys, getopt, os
import string
from paho.mqtt import client as mqtt_client
from functools import reduce
FORMAT = '%(asctime)s:%(levelname)s: %(message)s'
logging.basicConfig(stream=sys.stdout, level="INFO", format=FORMAT)
log = logging.getLogger("")
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
sf_device_id = os.environ.get('SF_DEVICE_ID',None)
sf_product_id = os.environ.get('SF_PRODUCT_ID',"73bkTV")
mqtt_user = os.environ.get('MQTT_USER',None)
mqtt_pwd = os.environ.get('MQTT_PWD',None)
mqtt_host = os.environ.get('MQTT_HOST',None)
mqtt_port = os.environ.get('MQTT_PORT',1883)
report_topic = None
smartmeter_topic = os.environ.get('SMARTMETER_TOPIC',"/tele/E220/SENSOR")
devices = set()
def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
def on_message(client, userdata, msg):
global devices
global sf_product_id
if sf_product_id in msg.topic:
device_id = msg.topic.split('/')[2]
devices.add(device_id)
payload = json.loads(msg.payload.decode())
if "properties" in payload:
props = payload["properties"]
for prop, val in props.items():
client.publish(f'solarflow-hub/{device_id}/telemetry/{prop}',val)
if "packData" in payload:
packdata = payload["packData"]
if len(packdata) > 0:
for pack in packdata:
sn = pack.pop('sn')
for prop, val in pack.items():
client.publish(f'solarflow-hub/{device_id}/telemetry/batteries/{sn}/{prop}',val)
if msg.topic == smartmeter_topic:
payload = json.loads(msg.payload.decode())
value = None
if type(payload) is float or type(payload) is int:
value = payload
else:
try:
value = deep_get(payload,"MT175.P",None)
value = deep_get(payload,"Power.Power_curr",value)
except:
log.error(f'Could not get value from specified topic payload: {sys.exc_info()}')
if value:
client.publish(f'solarflow-hub/control/smartmeter',int(value))
def on_connect(client, userdata, flags, rc):
if rc == 0:
log.info("Connected to MQTT Broker!")
subscribe(client)
else:
log.error("Failed to connect, return code %d\n", rc)
def connect_mqtt() -> mqtt_client:
id = ''.join(random.choices(string.ascii_lowercase, k=5))
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1,f'zen-topic-remap-{id}')
if mqtt_user is not None and mqtt_pwd is not None:
client.username_pw_set(mqtt_user, mqtt_pwd)
client.on_connect = on_connect
client.connect(mqtt_host, mqtt_port)
return client
def subscribe(client: mqtt_client):
client.subscribe(report_topic)
client.subscribe(smartmeter_topic)
client.on_message = on_message
#client.publish(f'iot/{sf_product_id}/{sf_device_id}/properties/read','{"properties": ["getAll"]}')
def run():
global devices
global sf_product_id
client = connect_mqtt()
client.loop_start()
while True:
for device in devices:
client.publish(f'iot/{sf_product_id}/{device}/properties/read','{"properties": ["getAll"]}')
time.sleep(60)
def main(argv):
global mqtt_host, mqtt_port, mqtt_user, mqtt_pwd
global smartmeter_topic
global sf_device_id
global sf_product_id
global report_topic
opts, args = getopt.getopt(argv,"hb:p:u:s:d:s:",["broker=","port=","user=","password=","device=","smartmeter="])
for opt, arg in opts:
if opt == '-h':
log.info('solarflow-control.py -b <MQTT Broker Host> -p <MQTT Broker Port>')
sys.exit()
elif opt in ("-b", "--broker"):
mqtt_host = arg
elif opt in ("-p", "--port"):
mqtt_port = arg
elif opt in ("-u", "--user"):
mqtt_user = arg
elif opt in ("-s", "--password"):
mqtt_pwd = arg
elif opt in ("-d", "--device"):
sf_device_id = arg
elif opt in ("-s", "--smartmeter"):
smartmeter_topic = arg
if mqtt_host is None:
log.error("You need to provide a local MQTT broker (environment variable MQTT_HOST or option --broker)!")
sys.exit(0)
else:
log.info(f'MQTT Host: {mqtt_host}:{mqtt_port}')
if mqtt_user is None or mqtt_pwd is None:
log.info(f'MQTT User is not set, assuming authentication not needed')
else:
log.info(f'MQTT User: {mqtt_user}/{mqtt_pwd}')
if smartmeter_topic is None:
log.info(f'Smartmeter is not set or configured. For steering the SolarFlow it makes sense to set this value --smartmeter=')
else:
smartmeter_topic = f'/{smartmeter_topic}'
log.info(f'Smartmeter topic: {smartmeter_topic}')
if sf_device_id is None:
log.error(f'You need to provide a SF_DEVICE_ID (environment variable SF_DEVICE_ID or option --device)!')
sys.exit()
else:
log.info(f'Solarflow Hub: {sf_product_id}/{sf_device_id}')
report_topic = f'/{sf_product_id}/+/properties/report'
log.info(f'Reporting topic: {report_topic}')
run()
if __name__ == '__main__':
main(sys.argv[1:])