-
-
Notifications
You must be signed in to change notification settings - Fork 31.7k
/
__init__.py
293 lines (252 loc) · 9.41 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""The go2rtc component."""
import logging
import shutil
from aiohttp.client_exceptions import ClientConnectionError, ServerConnectionError
from awesomeversion import AwesomeVersion
from go2rtc_client import Go2RtcRestClient
from go2rtc_client.exceptions import Go2RtcClientError, Go2RtcVersionError
from go2rtc_client.ws import (
Go2RtcWsClient,
ReceiveMessages,
WebRTCAnswer,
WebRTCCandidate,
WebRTCOffer,
WsError,
)
import voluptuous as vol
from webrtc_models import RTCIceCandidateInit
from homeassistant.components.camera import (
Camera,
CameraWebRTCProvider,
WebRTCAnswer as HAWebRTCAnswer,
WebRTCCandidate as HAWebRTCCandidate,
WebRTCError,
WebRTCMessage,
WebRTCSendMessage,
async_register_webrtc_provider,
)
from homeassistant.components.default_config import DOMAIN as DEFAULT_CONFIG_DOMAIN
from homeassistant.config_entries import SOURCE_SYSTEM, ConfigEntry
from homeassistant.const import CONF_URL, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import (
config_validation as cv,
discovery_flow,
issue_registry as ir,
)
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
from homeassistant.util.package import is_docker_env
from .const import (
CONF_DEBUG_UI,
DEBUG_UI_URL_MESSAGE,
DOMAIN,
HA_MANAGED_URL,
RECOMMENDED_VERSION,
)
from .server import Server
_LOGGER = logging.getLogger(__name__)
_SUPPORTED_STREAMS = frozenset(
(
"bubble",
"dvrip",
"expr",
"ffmpeg",
"gopro",
"homekit",
"http",
"https",
"httpx",
"isapi",
"ivideon",
"kasa",
"nest",
"onvif",
"roborock",
"rtmp",
"rtmps",
"rtmpx",
"rtsp",
"rtsps",
"rtspx",
"tapo",
"tcp",
"webrtc",
"webtorrent",
)
)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Exclusive(CONF_URL, DOMAIN, DEBUG_UI_URL_MESSAGE): cv.url,
vol.Exclusive(CONF_DEBUG_UI, DOMAIN, DEBUG_UI_URL_MESSAGE): cv.boolean,
}
)
},
extra=vol.ALLOW_EXTRA,
)
_DATA_GO2RTC: HassKey[str] = HassKey(DOMAIN)
_RETRYABLE_ERRORS = (ClientConnectionError, ServerConnectionError)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up WebRTC."""
url: str | None = None
if DOMAIN not in config and DEFAULT_CONFIG_DOMAIN not in config:
await _remove_go2rtc_entries(hass)
return True
if not (configured_by_user := DOMAIN in config) or not (
url := config[DOMAIN].get(CONF_URL)
):
if not is_docker_env():
if not configured_by_user:
# Remove config entry if it exists
await _remove_go2rtc_entries(hass)
return True
_LOGGER.warning("Go2rtc URL required in non-docker installs")
return False
if not (binary := await _get_binary(hass)):
_LOGGER.error("Could not find go2rtc docker binary")
return False
# HA will manage the binary
server = Server(
hass, binary, enable_ui=config.get(DOMAIN, {}).get(CONF_DEBUG_UI, False)
)
try:
await server.start()
except Exception: # noqa: BLE001
_LOGGER.warning("Could not start go2rtc server", exc_info=True)
return False
async def on_stop(event: Event) -> None:
await server.stop()
hass.bus.async_listen(EVENT_HOMEASSISTANT_STOP, on_stop)
url = HA_MANAGED_URL
hass.data[_DATA_GO2RTC] = url
discovery_flow.async_create_flow(
hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={}
)
return True
async def _remove_go2rtc_entries(hass: HomeAssistant) -> None:
"""Remove go2rtc config entries, if any."""
for entry in hass.config_entries.async_entries(DOMAIN):
await hass.config_entries.async_remove(entry.entry_id)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up go2rtc from a config entry."""
url = hass.data[_DATA_GO2RTC]
# Validate the server URL
try:
client = Go2RtcRestClient(async_get_clientsession(hass), url)
version = await client.validate_server_version()
if version < AwesomeVersion(RECOMMENDED_VERSION):
ir.async_create_issue(
hass,
DOMAIN,
"recommended_version",
is_fixable=False,
is_persistent=False,
severity=ir.IssueSeverity.WARNING,
translation_key="recommended_version",
translation_placeholders={
"recommended_version": RECOMMENDED_VERSION,
"current_version": str(version),
},
)
except Go2RtcClientError as err:
if isinstance(err.__cause__, _RETRYABLE_ERRORS):
raise ConfigEntryNotReady(
f"Could not connect to go2rtc instance on {url}"
) from err
_LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err)
return False
except Go2RtcVersionError as err:
raise ConfigEntryNotReady(
f"The go2rtc server version is not supported, {err}"
) from err
except Exception as err: # noqa: BLE001
_LOGGER.warning("Could not connect to go2rtc instance on %s (%s)", url, err)
return False
provider = WebRTCProvider(hass, url)
async_register_webrtc_provider(hass, provider)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a go2rtc config entry."""
return True
async def _get_binary(hass: HomeAssistant) -> str | None:
"""Return the binary path if found."""
return await hass.async_add_executor_job(shutil.which, "go2rtc")
class WebRTCProvider(CameraWebRTCProvider):
"""WebRTC provider."""
def __init__(self, hass: HomeAssistant, url: str) -> None:
"""Initialize the WebRTC provider."""
self._hass = hass
self._url = url
self._session = async_get_clientsession(hass)
self._rest_client = Go2RtcRestClient(self._session, url)
self._sessions: dict[str, Go2RtcWsClient] = {}
@property
def domain(self) -> str:
"""Return the integration domain of the provider."""
return DOMAIN
@callback
def async_is_supported(self, stream_source: str) -> bool:
"""Return if this provider is supports the Camera as source."""
return stream_source.partition(":")[0] in _SUPPORTED_STREAMS
async def async_handle_async_webrtc_offer(
self,
camera: Camera,
offer_sdp: str,
session_id: str,
send_message: WebRTCSendMessage,
) -> None:
"""Handle the WebRTC offer and return the answer via the provided callback."""
self._sessions[session_id] = ws_client = Go2RtcWsClient(
self._session, self._url, source=camera.entity_id
)
if not (stream_source := await camera.stream_source()):
send_message(
WebRTCError("go2rtc_webrtc_offer_failed", "Camera has no stream source")
)
return
streams = await self._rest_client.streams.list()
if (stream := streams.get(camera.entity_id)) is None or not any(
stream_source == producer.url for producer in stream.producers
):
await self._rest_client.streams.add(
camera.entity_id,
[
stream_source,
# We are setting any ffmpeg rtsp related logs to debug
# Connection problems to the camera will be logged by the first stream
# Therefore setting it to debug will not hide any important logs
f"ffmpeg:{camera.entity_id}#audio=opus#query=log_level=debug",
],
)
@callback
def on_messages(message: ReceiveMessages) -> None:
"""Handle messages."""
value: WebRTCMessage
match message:
case WebRTCCandidate():
value = HAWebRTCCandidate(RTCIceCandidateInit(message.candidate))
case WebRTCAnswer():
value = HAWebRTCAnswer(message.sdp)
case WsError():
value = WebRTCError("go2rtc_webrtc_offer_failed", message.error)
send_message(value)
ws_client.subscribe(on_messages)
config = camera.async_get_webrtc_client_configuration()
await ws_client.send(WebRTCOffer(offer_sdp, config.configuration.ice_servers))
async def async_on_webrtc_candidate(
self, session_id: str, candidate: RTCIceCandidateInit
) -> None:
"""Handle the WebRTC candidate."""
if ws_client := self._sessions.get(session_id):
await ws_client.send(WebRTCCandidate(candidate.candidate))
else:
_LOGGER.debug("Unknown session %s. Ignoring candidate", session_id)
@callback
def async_close_session(self, session_id: str) -> None:
"""Close the session."""
ws_client = self._sessions.pop(session_id)
self._hass.async_create_task(ws_client.close())