-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraspberry_thing.py
189 lines (167 loc) · 8.07 KB
/
raspberry_thing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# To 'mock' the calls to grovepi script, change this line to 'import grovepi_mock as grovepi'
import base64
import threading
import time
# from grove_buzzer import Buzzer
# from grove_light_sensor import LightSensor
# from grove_temp_sensor import TemperatureHumiditySensor
# User and password needed for providing new sensor values
THING_USER = "ditto"
THING_PASSWORD = "ditto"
# The id of our raspberry Thing
THING_ID = "org.eclipse.ditto.example/raspberry"
# Message / Event paths.
THING_EVENT_TOPIC = THING_ID + "/things/twin/events"
THING_COMMAND_TOPIC = THING_ID + "/things/twin/commands"
THING_MESSAGE_TOPIC = THING_ID + "/things/live/messages"
THING_MODIFY_COMMAND_TOPIC = THING_COMMAND_TOPIC + "/modify"
TRUST_AGENT_PROPERTIES_PATH = "/features/TrustAgent/properties"
# ILLUMINANCE_SENSOR_PROPERTIES_PATH = "/features/IlluminanceSensor_0/properties"
# TEMPERATURE_SENSOR_PROPERTIES_PATH = "/features/TemperatureHumiditySensor_0/properties"
# ILLUMINANCE_SENSOR_SAMPLINGRATE_PATH = ILLUMINANCE_SENSOR_PROPERTIES_PATH + "/samplingRate"
# TEMPERATURE_SENSOR_SAMPLINGRATE_PATH = TEMPERATURE_SENSOR_PROPERTIES_PATH + "/samplingRate"
# BUZZER_ENABLE_MESSAGE_TOPIC = THING_MESSAGE_TOPIC + "/doEnable"
# BUZZER_ENABLE_MESSAGE_PATH = "/features/Buzzer_0/inbox/messages/doEnable"
FREQUENCY_CHANGE_EVENT_TOPIC = THING_ID + "/things/twin/events/modified"
# Digital Port D8 on the GrovePi+ is connected to the buzzer
# BUZZER_PORT = 8
# Analog Port A0 on the GrovePi+ is connected to the light sensor
# LIGHT_SENSOR_PORT = 0
# Digital Port D4 on the GrovePi+ is connected to the temp sensor
# TEMP_SENSOR_PORT = 4
def get_b64_auth():
"""
Get the base 64 auth string for the thin user.
"""
userpw_string = THING_USER + ":" + THING_PASSWORD
utf_str = userpw_string.encode("utf-8")
return base64.b64encode(utf_str).decode('utf-8')
class RaspberryThing:
def __init__(self):
pass
# self.buzzer = Buzzer(BUZZER_PORT)
# self.lightSensor = LightSensor(LIGHT_SENSOR_PORT)
# self.tempSensor = TemperatureHumiditySensor(TEMP_SENSOR_PORT)
def handle_websocket_message(self, message):
if message and 'topic' in message:
if message['topic'].startswith(THING_EVENT_TOPIC):
# handle event message
self.__handle_event(message)
elif message['topic'].startswith(THING_MESSAGE_TOPIC):
self.__handle_message(message)
# def create_illumination_change_message(self, illumination):
# """
# Create the modify message that is used to notify Ditto about a new sensor value.
# :return: The message as a json object.
# """
# return {
# "topic": THING_MODIFY_COMMAND_TOPIC,
# "path": ILLUMINANCE_SENSOR_PROPERTIES_PATH,
# "value": self.lightSensor.get_properties_json(illumination)
# }
# def create_temperature_change_message(self, temperature, humidity):
# """
# Create the modify message that is used to notify Ditto about a new sensor value.
# :return: The message as a json object.
# """
# return {
# "topic": THING_MODIFY_COMMAND_TOPIC,
# "path": TEMPERATURE_SENSOR_PROPERTIES_PATH,
# "value": self.tempSensor.get_properties_json(temperature, humidity)
# }
# def start_polling_illumination(self, callback):
# """
# this function will repeatedly queries illumination values and send them back to the callback.
# :param callback: callback function for new sensor values
# :return: None
# """
# threading._start_new_thread(self.__poll_illumination_values, (callback,))
# def start_polling_temperatures(self, callback):
# """
# this function will repeatedly queries temperature values and send them back to the callback.
# :param callback: callback function for new sensor values
# :return: None
# """
# threading._start_new_thread(self.__poll_temperature_values, (callback,))
# def __poll_illumination_values(self, callback):
# while True:
# try:
# # read sensor
# illumination = self.lightSensor.get_illumination()
# # call callback
# callback(illumination)
# except Exception:
# print('Error when providing illumination values. Trying again')
# finally:
# # wait for next read
# time.sleep(1 / self.lightSensor.get_sampling_rate())
# def __poll_temperature_values(self, callback):
# while True:
# try:
# # read sensor
# [temperature, illumination] = self.tempSensor.get_temperature_and_humidity()
# # call callback
# callback(temperature, illumination)
# except Exception:
# print('Error when providing illumination values. Trying again')
# finally:
# # wait for next read
# time.sleep(1 / self.tempSensor.get_sampling_rate())
def __handle_event(self, event):
if self.__is_illuminance_sampling_rate_change(event):
"""
update the frequency in which the sensor values of the thing should be queried
:param message: the modify event that should contain the new frequency
"""
samplingRate = event['value']
print('setting samplingRate of light sensor to {} Hz'.format(samplingRate))
self.lightSensor.set_sampling_rate(samplingRate)
elif self.__is_temperature_sampling_rate_change(event):
"""
update the frequency in which the sensor values of the thing should be queried
:param message: the modify event that should contain the new frequency
"""
samplingRate = event['value']
print('setting samplingRate of temperature sensor to {} Hz'.format(samplingRate))
self.tempSensor.set_sampling_rate(samplingRate)
def __handle_message(self, message):
if self.__is_buzzer_property_change(message):
"""
Activate/Deactivate the buzzer.
:param message: The message that should contain if the buzzer should be active oder inactive
"""
buzz = message['value']
print('{} buzzer'.format('enabling' if buzz else 'disabling'))
self.buzzer.set_enabled(buzz)
def __is_trust_agent_property_change(self, message):
"""
Check if the message is for changing the trust agent properties.
:return: True if it is such kind of a message
"""
return TRUST_AGENT_PROPERTIES_PATH == message['topic'] \
# and BUZZER_ENABLE_MESSAGE_PATH == message['path']
# def __is_buzzer_property_change(self, message):
# """
# Check if the message is for activating or deactivating the buzzer.
# :return: true if should be activated.
# """
# return BUZZER_ENABLE_MESSAGE_TOPIC == message['topic'] \
# and BUZZER_ENABLE_MESSAGE_PATH == message['path']
# def __is_illuminance_sampling_rate_change(self, message):
# """
# Check if the message is for changing the frequency in which sensor values are requested.
# :return: True if it is such kind of a message
# """
# return 'path' in message \
# and 'value' in message \
# and FREQUENCY_CHANGE_EVENT_TOPIC == message['topic'] \
# and ILLUMINANCE_SENSOR_SAMPLINGRATE_PATH == message['path']
# def __is_temperature_sampling_rate_change(self, message):
# """
# Check if the message is for changing the frequency in which sensor values are requested.
# :return: True if it is such kind of a message
# """
# return 'path' in message \
# and 'value' in message \
# and FREQUENCY_CHANGE_EVENT_TOPIC == message['topic'] \
# and TEMPERATURE_SENSOR_SAMPLINGRATE_PATH == message['path']