-
Notifications
You must be signed in to change notification settings - Fork 12
/
juicebox_udpcupdater.py
201 lines (185 loc) · 7.71 KB
/
juicebox_udpcupdater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import asyncio
import logging
import time
from const import (
ERROR_LOOKBACK_MIN,
MAX_ERROR_COUNT,
MAX_RETRY_ATTEMPT,
UDPC_UPDATE_CHECK_TIMEOUT,
)
from juicebox_telnet import JuiceboxTelnet
_LOGGER = logging.getLogger(__name__)
class JuiceboxUDPCUpdater:
def __init__(
self,
juicebox_host,
jpp_host,
udpc_port=8047,
telnet_timeout=None,
loglevel=None,
):
if loglevel is not None:
_LOGGER.setLevel(loglevel)
self._juicebox_host = juicebox_host
self._jpp_host = jpp_host
self._udpc_port = udpc_port
self._telnet_timeout = telnet_timeout
self._default_sleep_interval = 30
self._udpc_update_loop_task = None
self._telnet = None
self._error_count = 0
self._error_timestamp_list = []
async def start(self):
_LOGGER.info("Starting JuiceboxUDPCUpdater")
await self._connect()
async def close(self):
if self._telnet is not None:
await self._telnet.close()
self._telnet = None
async def _connect(self):
connect_attempt = 1
while (
self._telnet is None
and connect_attempt <= MAX_RETRY_ATTEMPT
and self._error_count < MAX_ERROR_COUNT
):
_LOGGER.debug(
f"Telnet connection attempt {connect_attempt} of {MAX_RETRY_ATTEMPT}"
)
connect_attempt += 1
self._telnet = JuiceboxTelnet(
self._juicebox_host,
loglevel=_LOGGER.getEffectiveLevel(),
timeout=self._telnet_timeout,
)
try:
await self._telnet.open()
except TimeoutError as e:
_LOGGER.warning(
"JuiceboxUDPCUpdater Telnet Timeout. Reconnecting. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
pass
except ConnectionResetError as e:
_LOGGER.warning(
"JuiceboxUDPCUpdater Telnet Connection Error. Reconnecting. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
pass
if self._telnet is None:
raise ChildProcessError("JuiceboxUDPCUpdater: Unable to connect to Telnet.")
if self._udpc_update_loop_task is None or self._udpc_update_loop_task.done():
self._udpc_update_loop_task = await self._udpc_update_loop()
self._loop.create_task(self._udpc_update_loop_task)
_LOGGER.info("JuiceboxUDPCUpdater Connected to Juicebox Telnet")
async def _udpc_update_loop(self):
_LOGGER.debug("Starting JuiceboxUDPCUpdater Loop")
while self._error_count < MAX_ERROR_COUNT:
sleep_interval = self._default_sleep_interval
if self._telnet is None:
_LOGGER.warning(
"JuiceboxUDPCUpdater Telnet Connection Lost. Reconnecting."
)
await self._connect()
continue
try:
async with asyncio.timeout(UDPC_UPDATE_CHECK_TIMEOUT):
sleep_interval = await self._udpc_update_handler(sleep_interval)
except TimeoutError as e:
_LOGGER.warning(
f"UDPC Update Check timeout after {UDPC_UPDATE_CHECK_TIMEOUT} sec. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
sleep_interval = 3
await asyncio.sleep(sleep_interval)
raise ChildProcessError(
f"JuiceboxUDPCUpdater: More than {self._error_count} "
f"errors in the last {ERROR_LOOKBACK_MIN} min."
)
async def _udpc_update_handler(self, default_sleep_interval):
sleep_interval = default_sleep_interval
try:
_LOGGER.info("JuiceboxUDPCUpdater Check Starting")
connections = await self._telnet.get_udpc_list()
update_required = True
udpc_streams_to_close = {} # Key = Connection id, Value = list id
udpc_stream_to_update = 0
# _LOGGER.debug(f"connections: {connections}")
for i, connection in enumerate(connections):
if connection["type"] == "UDPC":
udpc_streams_to_close.update({int(connection["id"]): i})
if self._jpp_host not in connection["dest"]:
udpc_stream_to_update = int(connection["id"])
# _LOGGER.debug(f"udpc_streams_to_close: {udpc_streams_to_close}")
if udpc_stream_to_update == 0 and len(udpc_streams_to_close) > 0:
udpc_stream_to_update = int(max(udpc_streams_to_close, key=int))
_LOGGER.debug(f"Active UDPC Stream: {udpc_stream_to_update}")
for stream in list(udpc_streams_to_close):
if stream < udpc_stream_to_update:
udpc_streams_to_close.pop(stream, None)
if len(udpc_streams_to_close) == 0:
_LOGGER.info("UDPC IP not found, updating")
elif (
self._jpp_host
not in connections[udpc_streams_to_close[udpc_stream_to_update]]["dest"]
):
_LOGGER.info("UDPC IP incorrect, updating")
elif len(udpc_streams_to_close) == 1:
_LOGGER.info("UDPC IP correct")
update_required = False
if update_required:
for id in udpc_streams_to_close:
_LOGGER.debug(f"Closing UDPC stream: {id}")
await self._telnet.close_udpc_stream(id)
await self._telnet.write_udpc_stream(self._jpp_host, self._udpc_port)
await self._telnet.save_udpc()
_LOGGER.info("UDPC IP Saved")
except ConnectionResetError as e:
_LOGGER.warning(
"Telnet connection to JuiceBox lost. "
"Nothing to worry about unless this happens a lot. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
sleep_interval = 3
except TimeoutError as e:
_LOGGER.warning(
"Telnet connection to JuiceBox has timed out. "
"Nothing to worry about unless this happens a lot. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
sleep_interval = 3
except OSError as e:
_LOGGER.warning(
"Could not route Telnet connection to JuiceBox. "
"Nothing to worry about unless this happens a lot. "
f"({e.__class__.__qualname__}: {e})"
)
await self._add_error()
await self._telnet.close()
self._telnet = None
sleep_interval = 3
return sleep_interval
async def _add_error(self):
self._error_timestamp_list.append(time.time())
time_cutoff = time.time() - (ERROR_LOOKBACK_MIN * 60)
temp_list = list(
filter(lambda el: el > time_cutoff, self._error_timestamp_list)
)
self._error_timestamp_list = temp_list
self._error_count = len(self._error_timestamp_list)
_LOGGER.debug(f"Errors in last {ERROR_LOOKBACK_MIN} min: {self._error_count}")