This repository has been archived by the owner on Mar 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
executable file
·133 lines (109 loc) · 4.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""The broadlink component."""
import asyncio
from base64 import b64decode, b64encode
from binascii import unhexlify
from datetime import timedelta
import logging
import re
import socket
import voluptuous as vol
from homeassistant.const import CONF_HOST
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv
from homeassistant.util.dt import utcnow
from .const import CONF_PACKET, DOMAIN, SERVICE_LEARN, SERVICE_SEND
_LOGGER = logging.getLogger(__name__)
DEFAULT_RETRY = 3
def data_packet(value):
"""Decode a data packet given for broadlink."""
value = cv.string(value)
extra = len(value) % 4
if extra > 0:
value = value + ("=" * (4 - extra))
return b64decode(value)
def hostname(value):
"""Validate a hostname."""
host = str(value).lower()
if len(host) > 253:
raise ValueError
if host[-1] == ".":
host = host[:-1]
allowed = re.compile(r"(?!-)[a-z\d-]{1,63}(?<!-)$")
if not all(allowed.match(elem) for elem in host.split(".")):
raise ValueError
return host
def mac_address(value):
"""Validate and coerce a 48-bit MAC address."""
mac = str(value).lower()
if len(mac) == 17:
mac = mac[0:2] + mac[3:5] + mac[6:8] + mac[9:11] + mac[12:14] + mac[15:17]
elif len(mac) == 14:
mac = mac[0:2] + mac[2:4] + mac[5:7] + mac[7:9] + mac[10:12] + mac[12:14]
elif len(mac) != 12:
raise ValueError
return unhexlify(mac)
SERVICE_SEND_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PACKET): vol.All(cv.ensure_list, [data_packet]),
}
)
SERVICE_LEARN_SCHEMA = vol.Schema({vol.Required(CONF_HOST): cv.string})
@callback
def async_setup_service(hass, host, device):
"""Register a device for given host for use in services."""
hass.data.setdefault(DOMAIN, {})[host] = device
if hass.services.has_service(DOMAIN, SERVICE_LEARN):
return
async def _learn_command(call):
"""Learn a packet from remote."""
device = hass.data[DOMAIN][call.data[CONF_HOST]]
for retry in range(DEFAULT_RETRY):
try:
await hass.async_add_executor_job(device.enter_learning)
break
except (socket.timeout, ValueError):
try:
await hass.async_add_executor_job(device.auth)
except socket.timeout:
if retry == DEFAULT_RETRY - 1:
_LOGGER.error("Failed to enter learning mode")
return
_LOGGER.info("Press the key you want Home Assistant to learn")
start_time = utcnow()
while (utcnow() - start_time) < timedelta(seconds=20):
packet = await hass.async_add_executor_job(device.check_data)
if packet:
data = b64encode(packet).decode("utf8")
log_msg = f"Received packet is: {data}"
_LOGGER.info(log_msg)
hass.components.persistent_notification.async_create(
log_msg, title="Broadlink switch"
)
return
await asyncio.sleep(1)
_LOGGER.error("No signal was received")
hass.components.persistent_notification.async_create(
"No signal was received", title="Broadlink switch"
)
hass.services.async_register(
DOMAIN, SERVICE_LEARN, _learn_command, schema=SERVICE_LEARN_SCHEMA
)
async def _send_packet(call):
"""Send a packet."""
device = hass.data[DOMAIN][call.data[CONF_HOST]]
packets = call.data[CONF_PACKET]
for packet in packets:
for retry in range(DEFAULT_RETRY):
try:
await hass.async_add_executor_job(device.send_data, packet)
break
except (socket.timeout, ValueError):
try:
await hass.async_add_executor_job(device.auth)
except socket.timeout:
if retry == DEFAULT_RETRY - 1:
_LOGGER.error("Failed to send packet to device")
hass.services.async_register(
DOMAIN, SERVICE_SEND, _send_packet, schema=SERVICE_SEND_SCHEMA
)