-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
/
Copy pathcheck_config.py
282 lines (237 loc) · 9.63 KB
/
check_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""Script to check the configuration file."""
from __future__ import annotations
import argparse
import asyncio
from collections import OrderedDict
from collections.abc import Callable, Mapping, Sequence
from glob import glob
import logging
import os
from typing import Any
from unittest.mock import patch
from homeassistant import core, loader
from homeassistant.config import get_default_config_dir
from homeassistant.config_entries import ConfigEntries
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import (
area_registry as ar,
device_registry as dr,
entity_registry as er,
issue_registry as ir,
)
from homeassistant.helpers.check_config import async_check_ha_config_file
from homeassistant.util.yaml import Secrets
import homeassistant.util.yaml.loader as yaml_loader
# mypy: allow-untyped-calls, allow-untyped-defs
REQUIREMENTS = ("colorlog==6.7.0",)
_LOGGER = logging.getLogger(__name__)
MOCKS: dict[str, tuple[str, Callable]] = {
"load": ("homeassistant.util.yaml.loader.load_yaml", yaml_loader.load_yaml),
"load*": ("homeassistant.config.load_yaml", yaml_loader.load_yaml),
"secrets": ("homeassistant.util.yaml.loader.secret_yaml", yaml_loader.secret_yaml),
}
PATCHES: dict[str, Any] = {}
C_HEAD = "bold"
ERROR_STR = "General Errors"
def color(the_color, *args, reset=None):
"""Color helper."""
# pylint: disable-next=import-outside-toplevel
from colorlog.escape_codes import escape_codes, parse_colors
try:
if not args:
assert reset is None, "You cannot reset if nothing being printed"
return parse_colors(the_color)
return parse_colors(the_color) + " ".join(args) + escape_codes[reset or "reset"]
except KeyError as k:
raise ValueError(f"Invalid color {k!s} in {the_color}") from k
def run(script_args: list) -> int:
"""Handle check config commandline script."""
parser = argparse.ArgumentParser(description="Check Home Assistant configuration.")
parser.add_argument("--script", choices=["check_config"])
parser.add_argument(
"-c",
"--config",
default=get_default_config_dir(),
help="Directory that contains the Home Assistant configuration",
)
parser.add_argument(
"-i",
"--info",
nargs="?",
default=None,
const="all",
help="Show a portion of the config",
)
parser.add_argument(
"-f", "--files", action="store_true", help="Show used configuration files"
)
parser.add_argument(
"-s", "--secrets", action="store_true", help="Show secret information"
)
args, unknown = parser.parse_known_args()
if unknown:
print(color("red", "Unknown arguments:", ", ".join(unknown)))
config_dir = os.path.join(os.getcwd(), args.config)
print(color("bold", "Testing configuration at", config_dir))
res = check(config_dir, args.secrets)
domain_info: list[str] = []
if args.info:
domain_info = args.info.split(",")
if args.files:
print(color(C_HEAD, "yaml files"), "(used /", color("red", "not used") + ")")
deps = os.path.join(config_dir, "deps")
yaml_files = [
f
for f in glob(os.path.join(config_dir, "**/*.yaml"), recursive=True)
if not f.startswith(deps)
]
for yfn in sorted(yaml_files):
the_color = "" if yfn in res["yaml_files"] else "red"
print(color(the_color, "-", yfn))
if res["except"]:
print(color("bold_white", "Failed config"))
for domain, config in res["except"].items():
domain_info.append(domain)
print(" ", color("bold_red", domain + ":"), color("red", "", reset="red"))
dump_dict(config, reset="red")
print(color("reset"))
if domain_info:
if "all" in domain_info:
print(color("bold_white", "Successful config (all)"))
for domain, config in res["components"].items():
print(" ", color(C_HEAD, domain + ":"))
dump_dict(config)
else:
print(color("bold_white", "Successful config (partial)"))
for domain in domain_info:
if domain == ERROR_STR:
continue
print(" ", color(C_HEAD, domain + ":"))
dump_dict(res["components"].get(domain))
if args.secrets:
flatsecret: dict[str, str] = {}
for sfn, sdict in res["secret_cache"].items():
sss = []
for skey in sdict:
if skey in flatsecret:
_LOGGER.error(
"Duplicated secrets in files %s and %s", flatsecret[skey], sfn
)
flatsecret[skey] = sfn
sss.append(color("green", skey) if skey in res["secrets"] else skey)
print(color(C_HEAD, "Secrets from", sfn + ":"), ", ".join(sss))
print(color(C_HEAD, "Used Secrets:"))
for skey, sval in res["secrets"].items():
if sval is None:
print(" -", skey + ":", color("red", "not found"))
continue
print(" -", skey + ":", sval)
return len(res["except"])
def check(config_dir, secrets=False):
"""Perform a check by mocking hass load functions."""
logging.getLogger("homeassistant.loader").setLevel(logging.CRITICAL)
res: dict[str, Any] = {
"yaml_files": OrderedDict(), # yaml_files loaded
"secrets": OrderedDict(), # secret cache and secrets loaded
"except": OrderedDict(), # exceptions raised (with config)
#'components' is a HomeAssistantConfig # noqa: E265
"secret_cache": {},
}
# pylint: disable-next=possibly-unused-variable
def mock_load(filename, secrets=None):
"""Mock hass.util.load_yaml to save config file names."""
res["yaml_files"][filename] = True
return MOCKS["load"][1](filename, secrets)
# pylint: disable-next=possibly-unused-variable
def mock_secrets(ldr, node):
"""Mock _get_secrets."""
try:
val = MOCKS["secrets"][1](ldr, node)
except HomeAssistantError:
val = None
res["secrets"][node.value] = val
return val
# Patches with local mock functions
for key, val in MOCKS.items():
if not secrets and key == "secrets":
continue
# The * in the key is removed to find the mock_function (side_effect)
# This allows us to use one side_effect to patch multiple locations
mock_function = locals()[f"mock_{key.replace('*', '')}"]
PATCHES[key] = patch(val[0], side_effect=mock_function)
# Start all patches
for pat in PATCHES.values():
pat.start()
if secrets:
# Ensure !secrets point to the patched function
yaml_loader.add_constructor("!secret", yaml_loader.secret_yaml)
def secrets_proxy(*args):
secrets = Secrets(*args)
res["secret_cache"] = secrets._cache # pylint: disable=protected-access
return secrets
try:
with patch.object(yaml_loader, "Secrets", secrets_proxy):
res["components"] = asyncio.run(async_check_config(config_dir))
res["secret_cache"] = {
str(key): val for key, val in res["secret_cache"].items()
}
for err in res["components"].errors:
domain = err.domain or ERROR_STR
res["except"].setdefault(domain, []).append(err.message)
if err.config:
res["except"].setdefault(domain, []).append(err.config)
except Exception as err: # pylint: disable=broad-except
print(color("red", "Fatal error while loading config:"), str(err))
res["except"].setdefault(ERROR_STR, []).append(str(err))
finally:
# Stop all patches
for pat in PATCHES.values():
pat.stop()
if secrets:
# Ensure !secrets point to the original function
yaml_loader.add_constructor("!secret", yaml_loader.secret_yaml)
return res
async def async_check_config(config_dir):
"""Check the HA config."""
hass = core.HomeAssistant(config_dir)
loader.async_setup(hass)
hass.config_entries = ConfigEntries(hass, {})
await ar.async_load(hass)
await dr.async_load(hass)
await er.async_load(hass)
await ir.async_load(hass, read_only=True)
components = await async_check_ha_config_file(hass)
await hass.async_stop(force=True)
return components
def line_info(obj, **kwargs):
"""Display line config source."""
if hasattr(obj, "__config_file__"):
return color(
"cyan", f"[source {obj.__config_file__}:{obj.__line__ or '?'}]", **kwargs
)
return "?"
def dump_dict(layer, indent_count=3, listi=False, **kwargs):
"""Display a dict.
A friendly version of print yaml_loader.yaml.dump(config).
"""
def sort_dict_key(val):
"""Return the dict key for sorting."""
key = str(val[0]).lower()
return "0" if key == "platform" else key
indent_str = indent_count * " "
if listi or isinstance(layer, list):
indent_str = indent_str[:-1] + "-"
if isinstance(layer, Mapping):
for key, value in sorted(layer.items(), key=sort_dict_key):
if isinstance(value, (dict, list)):
print(indent_str, str(key) + ":", line_info(value, **kwargs))
dump_dict(value, indent_count + 2)
else:
print(indent_str, str(key) + ":", value)
indent_str = indent_count * " "
if isinstance(layer, Sequence):
for i in layer:
if isinstance(i, dict):
dump_dict(i, indent_count + 2, True)
else:
print(" ", indent_str, i)