-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathilliplib.py
300 lines (261 loc) · 12.1 KB
/
illiplib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
"""
Interface module for Lutron Homeworks Illumination.
Author:
Daniel Dulitz (https://github.com/dulitz)
with interface and overall organization inspired by
upsert (https://github.com/upsert)
see http://www.lutron.com/TechnicalDocumentLibrary/HWI%20RS232%20Protocol.pdf
Note that Illumination only allows a small number of simultaneous telnet connections --
on the order of 1 or 2. Beyond that, it will close connections silently with no
error message. This shows up in our logs as many "Empty read from the bridge" messages
and fewer "connection opened" messages.
"""
import asyncio, logging, re
from enum import IntEnum
CONF_ID = "id"
CONF_NAME = "name"
CONF_TYPE = "type"
CONF_SCENE_ID = "scene_id"
CONF_AREA_NAME = "area_name"
CONF_BUTTONS = "buttons"
_LOGGER = logging.getLogger(__name__)
class IlluminationClient:
"""Communicate with a Lutron Homeworks Illumination controller."""
READ_SIZE = 1024
DEFAULT_USER = b"lutron"
DEFAULT_PASSWORD = b"integration"
LOGIN_PROMPT = b"LOGIN: "
# The responses that we might see from Illumination include these,
# which we handle explicitly in open():
# login successful
# Keypad button monitoring enabled
# And these, generated by rst and rst2, which we handle in OOB_RESPONSE_RE:
# Processor Time: 15:22
# Processor Time: 15:22:33
OOB_RESPONSE_RE = re.compile(b'Processor ([^\r])*\r\n')
# And these, which we handle in RESPONSE_RE:
# KBP, [01:06:12], 5 [also KBR, KBH, KBDT, DBP, DBR, DBH, DBDT, SVBP, SVBR, SVBH, SVBDT]
# DL, [01:04:02:06], 0.00
# KLS, [01:06:12], 000011100000000000000000
# GSS, [01:04:03], 1
RESPONSE_RE = re.compile(b'([A-Z]+), *\\[([0-9.:]+)\\], *([0-9.]+)(, *([0-9.]+))? *\r\n')
# this we simply don't handle (yet) -- RESPONSE_RE will reject this
# SVS, [01:06:03], S, MOVING
# There are other responses which we do not handle! If you send an unusual
# command and elicit a response that causes us to flag an error, please
# send a pull request.
# We send RDL to request dimmer levels. We do not send the following, but nothing
# is stopping you from sending them:
# FADEDIM to set dimmer levels
# FRPM, FV, GSS, or SVSS to set scenes using different scene controller add-ons
# CCOPULSE, CCOCLOSE, CCOOPEN to control contact closure outputs
class Action(IntEnum):
"""Action numbers for the OUTPUT command in the Lutron Integration Protocol."""
SET = 1 # Get or Set Zone Level
RAISING = 2 # Start Raising
LOWERING = 3 # Start Lowering
STOP = 4 # Stop Raising/Lowering
PRESET = 6 # SHADEGRP for Homeworks QS
class Button(IntEnum):
"""Action numbers for the DEVICE command in the Lutron Integration Protocol."""
PRESS = 3
RELEASE = 4
HOLD = 5 # not returned by Caseta or Radio Ra 2 Select
DOUBLETAP = 6 # not returned by Caseta or Radio Ra 2 Select
LEDSTATE = 9 # "Button" is a misnomer; this queries LED state
class State(IntEnum):
"""Connection state values."""
Closed = 1
Opening = 2
Opened = 3
def __init__(self):
"""Initialize the library."""
self._read_buffer = b""
self._read_lock = asyncio.Lock()
self._write_lock = asyncio.Lock()
self._state = IlluminationClient.State.Closed
self._host = None
self._port = 23
self._username = IlluminationClient.DEFAULT_USER
self._password = IlluminationClient.DEFAULT_PASSWORD
self.reader, self.writer = None, None
def is_connected(self) -> bool:
"""Return if the connection is open."""
return self._state == IlluminationClient.State.Opened
async def open(self, host, port=23, username=DEFAULT_USER,
password=DEFAULT_PASSWORD):
"""Open a telnet connection to the controller."""
async with self._read_lock:
async with self._write_lock:
if self._state != IlluminationClient.State.Closed:
return
self._state = IlluminationClient.State.Opening
self._host = host
self._port = port
self._username = username
self._password = password
def cleanup(err):
_LOGGER.warning(f'error opening connection to Illumination {host}:{port}: {err}')
self._state = IlluminationClient.State.Closed
# open connection
try:
connection = await asyncio.open_connection(host, port)
except OSError as err:
return cleanup(err)
self.reader = connection[0]
self.writer = connection[1]
# do login
if await self._read_until(self.LOGIN_PROMPT) is False:
return cleanup('no login prompt')
self.writer.write(username + b',' + password + b'\r\n')
await self.writer.drain()
if await self._read_until(b'login successful\r\n') is False:
return cleanup('login failed')
for mon in [b'kbmon\r\n', b'dlmon\r\n', b'klmon\r\n', b'gsmon\r\n']:
self.writer.write(mon) # turn on monitoring
await self.writer.drain()
if await self._read_until(b'monitoring enabled\r\n') is False:
return cleanup('set monitoring failed')
_LOGGER.info(f'opened Homeworks Illumination connection {host}:{port}')
self._state = IlluminationClient.State.Opened
async def _read_until(self, value):
"""Read until a given value is reached. Value may be regex or bytes."""
while True:
if hasattr(value, "search"):
# detected regular expression
match = value.search(self._read_buffer)
if match:
self._read_buffer = self._read_buffer[match.end():]
return match
else:
assert isinstance(value, bytes), value
where = self._read_buffer.find(value)
if where != -1:
until = self._read_buffer[:where+len(value)]
self._read_buffer = self._read_buffer[where + len(value):]
return until
try:
read_data = await self.reader.read(IlluminationClient.READ_SIZE)
if not len(read_data):
_LOGGER.info('controller disconnected')
return False
self._read_buffer += read_data
except OSError as err:
_LOGGER.warning(f'error reading from controller: {err}')
return False
RAWMAP = {
'KBP': ('DEVICE', Button.PRESS),
'KBR': ('DEVICE', Button.RELEASE),
'KBH': ('DEVICE', Button.HOLD),
'KBDT': ('DEVICE', Button.DOUBLETAP),
'DBP': ('DEVICE', Button.PRESS),
'DBR': ('DEVICE', Button.RELEASE),
'DBH': ('DEVICE', Button.HOLD),
'DBDT': ('DEVICE', Button.DOUBLETAP),
'SVBP': ('DEVICE', Button.PRESS),
'SVBR': ('DEVICE', Button.RELEASE),
'SVBH': ('DEVICE', Button.HOLD),
'SVBDT': ('DEVICE', Button.DOUBLETAP),
}
async def read(self):
"""maps the result of read_raw() to correspond to the result of read() on liplib"""
a, b, c, d = await self.read_raw()
if a is None:
return a, b, c, d
if a == 'DL' or a == 'GSS':
return 'OUTPUT', b, IlluminationClient.Action.SET, c
(newa, newd) = self.RAWMAP.get(a, (None, None))
if newa is not None:
if d is not None:
_LOGGER.warning(f'unexpected final field in {a} {b} {c} {d} with {newa} {newd}')
return newa, b, c, newd
# we pass through this without change:
# KLS, [01:06:12], 000011100000000000000000
# and read_raw() will have failed on this:
# SVS, [01:06:03], 1, MOVING
return a, b, c, d
async def read_raw(self):
"""Return a list of values read from the Telnet interface."""
async with self._read_lock:
if self._state != IlluminationClient.State.Opened:
return None, None, None, None
match = await self._read_until(IlluminationClient.RESPONSE_RE)
if match is not False:
# 1 = mode, 2 = integration number [address],
# 3 = button number, 4 = value
fourth = match.group(4).decode('ascii') if match.group(4) else None
try:
address = int(match.group(2).decode('ascii').replace(':', ''))
return (match.group(1).decode('ascii'),
address, float(match.group(3)),
fourth)
except ValueError:
_LOGGER.warning(f'could not parse {match.group(0)}')
if match is False:
# attempt to reconnect
_LOGGER.info(f'reconnecting to controller {self._host}')
self._state = IlluminationClient.State.Closed
await self.open(self._host, self._port, self._username,
self._password)
return None, None, None, None
async def write(self, mode, integration, action, *args, value=None):
"""Write a list of values to the controller."""
if hasattr(action, "value"):
action = action.value
async with self._write_lock:
if self._state != IlluminationClient.State.Opened:
return
mode = mode.upper()
if mode == 'FADEDIM':
data = f'{mode}'
assert int(action) == 1, action
else: # e.g. DBP
data = f'{mode},{self.to_illumination_address(integration)},{action}'
if value is not None:
data += f',{value}'
for arg in args:
if arg is not None:
data += f',{arg}'
if mode == 'FADEDIM':
data += f',{self.to_illumination_address(integration)}'
try:
self.writer.write((data + "\r\n").encode("ascii"))
await self.writer.drain()
except OSError as err:
_LOGGER.warning(f'Error writing to the controller: {err}')
async def query(self, mode, integration, action, *ignored):
"""Query a device to get its current state."""
if hasattr(action, "value"):
action = action.value
_LOGGER.debug(f"Sending query {mode}, integration {integration}, action {action}, ignoring {ignored}")
async with self._write_lock:
if self._state != IlluminationClient.State.Opened:
return
if action == IlluminationClient.Button.LEDSTATE:
self.writer.write(f'rkls,{self.to_illumination_address(integration)}\r\n'.encode())
elif action == IlluminationClient.Action.SET:
self.writer.write(f'rdl,{self.to_illumination_address(integration)}\r\n'.encode())
else:
_LOGGER.warning(f'query(): unknown action number {action} for {integration}')
return
await self.writer.drain()
def to_illumination_address(self, integration):
s = str(integration)
if len(s) % 2 == 1:
s = f'0{s}'
return ':'.join([s[i:i+2] for i in range(0, len(s), 2)])
async def ping(self):
"""Ping the interface to keep the connection alive."""
async with self._write_lock:
if self._state != IlluminationClient.State.Opened:
return
self.writer.write(b"rst\r\n")
await self.writer.drain()
async def logout(self):
"""Close the connection to the bridge."""
async with self._write_lock:
if self._state != IlluminationClient.State.Opened:
return
self.writer.write(b"quit\r\n")
await self.writer.drain()
self._state = IlluminationClient.State.Closed