-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
/
Copy path__init__.py
331 lines (264 loc) · 11.1 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
"""The Search integration."""
from __future__ import annotations
from collections import defaultdict, deque
import logging
from typing import Any
import voluptuous as vol
from homeassistant.components import automation, group, person, script, websocket_api
from homeassistant.components.homeassistant import scene
from homeassistant.core import HomeAssistant, callback, split_entity_id
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
entity_registry as er,
)
from homeassistant.helpers.entity import (
EntityInfo,
entity_sources as get_entity_sources,
)
from homeassistant.helpers.typing import ConfigType
DOMAIN = "search"
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Search component."""
websocket_api.async_register_command(hass, websocket_search_related)
return True
@websocket_api.websocket_command(
{
vol.Required("type"): "search/related",
vol.Required("item_type"): vol.In(
(
"area",
"automation",
"automation_blueprint",
"config_entry",
"device",
"entity",
"group",
"person",
"scene",
"script",
"script_blueprint",
)
),
vol.Required("item_id"): str,
}
)
@callback
def websocket_search_related(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Handle search."""
searcher = Searcher(
hass,
dr.async_get(hass),
er.async_get(hass),
get_entity_sources(hass),
)
connection.send_result(
msg["id"], searcher.async_search(msg["item_type"], msg["item_id"])
)
class Searcher:
"""Find related things.
Few rules:
Scenes, scripts, automations and config entries will only be expanded if they are
the entry point. They won't be expanded if we process them. This is because they
turn the results into garbage.
"""
# These types won't be further explored. Config entries + Output types.
DONT_RESOLVE = {
"area",
"automation",
"automation_blueprint",
"config_entry",
"group",
"scene",
"script",
"script_blueprint",
}
# These types exist as an entity and so need cleanup in results
EXIST_AS_ENTITY = {"automation", "group", "person", "scene", "script"}
def __init__(
self,
hass: HomeAssistant,
device_reg: dr.DeviceRegistry,
entity_reg: er.EntityRegistry,
entity_sources: dict[str, EntityInfo],
) -> None:
"""Search results."""
self.hass = hass
self._device_reg = device_reg
self._entity_reg = entity_reg
self._sources = entity_sources
self.results: defaultdict[str, set[str]] = defaultdict(set)
self._to_resolve: deque[tuple[str, str]] = deque()
@callback
def async_search(self, item_type, item_id):
"""Find results."""
_LOGGER.debug("Searching for %s/%s", item_type, item_id)
self.results[item_type].add(item_id)
self._to_resolve.append((item_type, item_id))
while self._to_resolve:
search_type, search_id = self._to_resolve.popleft()
getattr(self, f"_resolve_{search_type}")(search_id)
# Clean up entity_id items, from the general "entity" type result,
# that are also found in the specific entity domain type.
for result_type in self.EXIST_AS_ENTITY:
self.results["entity"] -= self.results[result_type]
# Remove entry into graph from search results.
to_remove_item_type = item_type
if item_type == "entity":
domain = split_entity_id(item_id)[0]
if domain in self.EXIST_AS_ENTITY:
to_remove_item_type = domain
self.results[to_remove_item_type].remove(item_id)
# Filter out empty sets.
return {key: val for key, val in self.results.items() if val}
@callback
def _add_or_resolve(self, item_type, item_id):
"""Add an item to explore."""
if item_id in self.results[item_type]:
return
self.results[item_type].add(item_id)
if item_type not in self.DONT_RESOLVE:
self._to_resolve.append((item_type, item_id))
@callback
def _resolve_area(self, area_id) -> None:
"""Resolve an area."""
for device in dr.async_entries_for_area(self._device_reg, area_id):
self._add_or_resolve("device", device.id)
for entity_entry in er.async_entries_for_area(self._entity_reg, area_id):
self._add_or_resolve("entity", entity_entry.entity_id)
for entity_id in script.scripts_with_area(self.hass, area_id):
self._add_or_resolve("entity", entity_id)
for entity_id in automation.automations_with_area(self.hass, area_id):
self._add_or_resolve("entity", entity_id)
@callback
def _resolve_automation(self, automation_entity_id) -> None:
"""Resolve an automation.
Will only be called if automation is an entry point.
"""
for entity in automation.entities_in_automation(
self.hass, automation_entity_id
):
self._add_or_resolve("entity", entity)
for device in automation.devices_in_automation(self.hass, automation_entity_id):
self._add_or_resolve("device", device)
for area in automation.areas_in_automation(self.hass, automation_entity_id):
self._add_or_resolve("area", area)
if blueprint := automation.blueprint_in_automation(
self.hass, automation_entity_id
):
self._add_or_resolve("automation_blueprint", blueprint)
@callback
def _resolve_automation_blueprint(self, blueprint_path) -> None:
"""Resolve an automation blueprint.
Will only be called if blueprint is an entry point.
"""
for entity_id in automation.automations_with_blueprint(
self.hass, blueprint_path
):
self._add_or_resolve("automation", entity_id)
@callback
def _resolve_config_entry(self, config_entry_id) -> None:
"""Resolve a config entry.
Will only be called if config entry is an entry point.
"""
for device_entry in dr.async_entries_for_config_entry(
self._device_reg, config_entry_id
):
self._add_or_resolve("device", device_entry.id)
for entity_entry in er.async_entries_for_config_entry(
self._entity_reg, config_entry_id
):
self._add_or_resolve("entity", entity_entry.entity_id)
@callback
def _resolve_device(self, device_id) -> None:
"""Resolve a device."""
device_entry = self._device_reg.async_get(device_id)
# Unlikely entry doesn't exist, but let's guard for bad data.
if device_entry is not None:
if device_entry.area_id:
self._add_or_resolve("area", device_entry.area_id)
for config_entry_id in device_entry.config_entries:
self._add_or_resolve("config_entry", config_entry_id)
# We do not resolve device_entry.via_device_id because that
# device is not related data-wise inside HA.
for entity_entry in er.async_entries_for_device(self._entity_reg, device_id):
self._add_or_resolve("entity", entity_entry.entity_id)
for entity_id in script.scripts_with_device(self.hass, device_id):
self._add_or_resolve("entity", entity_id)
for entity_id in automation.automations_with_device(self.hass, device_id):
self._add_or_resolve("entity", entity_id)
@callback
def _resolve_entity(self, entity_id) -> None:
"""Resolve an entity."""
# Extra: Find automations and scripts that reference this entity.
for entity in scene.scenes_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
for entity in group.groups_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
for entity in automation.automations_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
for entity in script.scripts_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
for entity in person.persons_with_entity(self.hass, entity_id):
self._add_or_resolve("entity", entity)
# Find devices
entity_entry = self._entity_reg.async_get(entity_id)
if entity_entry is not None:
if entity_entry.device_id:
self._add_or_resolve("device", entity_entry.device_id)
if entity_entry.config_entry_id is not None:
self._add_or_resolve("config_entry", entity_entry.config_entry_id)
else:
source = self._sources.get(entity_id)
if source is not None and "config_entry" in source:
self._add_or_resolve("config_entry", source["config_entry"])
domain = split_entity_id(entity_id)[0]
if domain in self.EXIST_AS_ENTITY:
self._add_or_resolve(domain, entity_id)
@callback
def _resolve_group(self, group_entity_id) -> None:
"""Resolve a group.
Will only be called if group is an entry point.
"""
for entity_id in group.get_entity_ids(self.hass, group_entity_id):
self._add_or_resolve("entity", entity_id)
@callback
def _resolve_person(self, person_entity_id) -> None:
"""Resolve a person.
Will only be called if person is an entry point.
"""
for entity in person.entities_in_person(self.hass, person_entity_id):
self._add_or_resolve("entity", entity)
@callback
def _resolve_scene(self, scene_entity_id) -> None:
"""Resolve a scene.
Will only be called if scene is an entry point.
"""
for entity in scene.entities_in_scene(self.hass, scene_entity_id):
self._add_or_resolve("entity", entity)
@callback
def _resolve_script(self, script_entity_id) -> None:
"""Resolve a script.
Will only be called if script is an entry point.
"""
for entity in script.entities_in_script(self.hass, script_entity_id):
self._add_or_resolve("entity", entity)
for device in script.devices_in_script(self.hass, script_entity_id):
self._add_or_resolve("device", device)
for area in script.areas_in_script(self.hass, script_entity_id):
self._add_or_resolve("area", area)
if blueprint := script.blueprint_in_script(self.hass, script_entity_id):
self._add_or_resolve("script_blueprint", blueprint)
@callback
def _resolve_script_blueprint(self, blueprint_path) -> None:
"""Resolve a script blueprint.
Will only be called if blueprint is an entry point.
"""
for entity_id in script.scripts_with_blueprint(self.hass, blueprint_path):
self._add_or_resolve("script", entity_id)