-
-
Notifications
You must be signed in to change notification settings - Fork 443
/
Copy pathconfig.py
153 lines (128 loc) · 5.33 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from inspect import stack
from pathlib import Path
import py_common.log as log
def get_config(default: str | None = None) -> "CustomConfig":
"""
Gets the config for the currently executing script, taking a default config as a fallback:
This allows scrapers to define their own configuration options in a way that lets them
persist across reinstalls
The default config must have the same format as a simple .ini config file consisting of
key-value pairs separated by an equals sign, and can optionally contain comments and blank lines
for readability
"""
config = CustomConfig(default)
if not default:
log.warning("No config specified")
return config
# Note: chained configs were removed until we find a use case for them
# The paths of every script in the callstack: in the above example this would be:
# this script the api script the site script
# "/scrapers/py_common/util.py", "/scrapers/api/scraper.py", "/scrapers/site/site.py"
# In a single script scraper this would just be:
# this script the site script
# "/scrapers/py_common/util.py", "/scrapers/site/site.py"
paths = [frame.filename for frame in stack() if not frame.filename.startswith("<")]
if len(paths) < 2:
log.warning(
"Expected at least 2 paths in the stack: "
"the current file and the script that called it"
)
log.warning("Not persisting config")
return config
# We can output the path of the script that called this function
# to help with debugging config issues
current_path = Path(paths[1]).absolute()
prefix = str(Path(current_path.parent.name, current_path.name))
configs = [Path(p).parent / ("config.ini") for p in paths][1:]
# See git history if you want the chained configs version
config_path = configs[0]
if not config_path.exists():
log.debug(f"[{prefix}] First run, creating default config at {config_path}")
config_path.write_text(str(config))
else:
log.debug(f"[{prefix}] Reading config from {config_path}")
config.update(config_path.read_text())
return config
class Chunk:
def __init__(self, raw: list[str]):
self.comments = []
self.key = self.value = None
for line in raw:
if not line or line.startswith("#"):
self.comments.append(line)
elif "=" in line:
key, value = [x.strip() for x in line.split("=", 1)]
if not key.isidentifier():
log.warning(f"Config key '{key}' is not a valid identifier")
self.key = key
self.value = self.__parse_value(value)
else:
log.warning(f"Ignoring invalid config line: {line}")
def __parse_value(self, value):
if value.lower() == "true":
return True
elif value.lower() == "false":
return False
elif "." in value:
try:
return float(value)
except ValueError:
return value
elif value.isdigit():
return int(value)
else:
return value
def chunkify(config_string):
chunks = []
current_chunk = []
if not config_string:
return chunks, current_chunk
for lineno, line in enumerate(config_string.strip().splitlines()):
line = line.strip()
current_chunk.append(line)
if "=" in line:
chunks.append(Chunk(current_chunk))
current_chunk = []
elif not line.startswith("#") and line:
log.warning(f"Ignoring invalid config line {lineno}: {line}")
return chunks, current_chunk
class CustomConfig:
"""
Custom config parser that stores comments associated with each key
Settings must be in the format:
```ini
# optional comment
key = value
```
"""
def __init__(self, config_string: str | None = None):
chunks, trailing_comments = chunkify(config_string)
self.config_dict = {chunk.key: chunk.value for chunk in reversed(chunks)}
self.comments = {chunk.key: chunk.comments for chunk in chunks}
self.trailing_comments = trailing_comments
def update(self, config_string: str):
new_chunks, new_trailing_comments = chunkify(config_string)
for chunk in new_chunks:
if chunk.key not in self.config_dict:
self.comments[chunk.key] = chunk.comments
self.config_dict[chunk.key] = chunk.value
for line in new_trailing_comments:
if line not in self.trailing_comments:
self.trailing_comments.append(line)
def __getattr__(self, name):
if name in self.config_dict:
return self.config_dict[name]
raise AttributeError(
f"'{type(self).__name__}' object has no attribute '{name}'"
)
def __getitem__(self, name):
return self.config_dict[name]
def __str__(self):
"Generate a string representation of the configuration"
lines = []
for key, value in reversed(self.config_dict.items()):
# Add comments associated with the key
lines.extend(self.comments[key])
lines.append(f"{key} = {value}")
lines.extend(reversed(self.trailing_comments))
return "\n".join(lines)