-
Notifications
You must be signed in to change notification settings - Fork 1
/
launcher.py
executable file
·390 lines (304 loc) · 14 KB
/
launcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
"""
The porthouse launcher script can be used to launch a larger set of porthouse and monitor their state.
The launcher script takes an YARML configuration file as an input is described in YAML-file. Modules and params...
The modules can be selectively launched from the launcher file using ``--include`` and ``--exclude`` command line arguments.
If no filter arguments is provided all the modules defined in the file will be launched.
More about the launcher file can be read from `BLAA`<>.
"""
import os
import amqp
import yaml
import time
import argparse
import inspect
import re
from multiprocessing import Process, RLock
from importlib import import_module
import logging, logging.handlers
from functools import reduce
from urllib.parse import urlparse
from typing import Any, Dict, List, Optional, NoReturn, Tuple
from porthouse.core.config import load_globals
from porthouse.core.log.amqp_handler import AMQPLogHandler
from porthouse.core.amqp_tools import check_exchange_exists
class ModuleValidationError(RuntimeError):
""" """
class Launcher:
"""
Module for launching multiple porthouse modules.
"""
log: logging.Logger
def __init__(self,
cfg_file: str,
includes: Optional[List[str]]=None,
excludes: Optional[List[str]]=None,
declare_exchanges: bool=False,
debug: bool=False):
"""
Args:
cfg_file: Configuration file path
includes: Modules to be included from the launcher module list. If none then all.
excludes: Modules to be excluded from the launcher module list
declare_exchanges: Initialize exchanges in start
debug: Enable global debugging
"""
self.threads = []
self.debug = debug
self.prefix = None
self.rlock = RLock()
self.globals = load_globals()
# Read launch configuration and replace
has_variables = re.compile(r'.*\$\{([^}^{]+)\}.*')
match_single_variable = re.compile(r'\$\{([^}^{]+)\}')
def variable_constructor(loader, node):
""" Replace a ${variable} with corresponding value """
def _get_value(match):
return self.globals.get(match.group(1), match.group())
return match_single_variable.sub(_get_value, node.value)
yaml.add_implicit_resolver('!var', has_variables, None, yaml.SafeLoader)
yaml.add_constructor('!var', variable_constructor, yaml.SafeLoader)
with open(cfg_file, "r") as cfg_fd:
cfg = yaml.safe_load(cfg_fd)
self.validate_launch_specification(cfg)
self.exchanges = cfg.get("exchanges", {})
self.modules = cfg["modules"]
# Connect to message broker
amqp_url = urlparse(self.globals["amqp_url"])
self.connection = amqp.Connection(host=amqp_url.hostname, userid=amqp_url.username, password=amqp_url.password)
self.connection.connect()
self.channel = self.connection.channel()
# Setup exchanges
if declare_exchanges:
self.create_log_handlers(self.globals["log_path"], cfg.get("name", "Launcher"), log_to_amqp=False)
self.declare_exchanges(self.exchanges.items())
return
# Setup logging
self.create_log_handlers(self.globals["log_path"], cfg.get("name", "Launcher"), log_to_amqp=False)
self.log.info("Launching modules from %s!", cfg_file)
# Check exchange are present
#self.check_exchanges(self.exchanges.items())
# Setup modules
self.setup_modules(self.modules, includes, excludes)
self.wait()
self.log.critical("Core shutdown!")
self.__del__() # Ugly!
def __del__(self):
"""
Kill all modules
"""
# Kill all child processes/threads
for t in self.threads:
t.terminate()
t.join()
self.threads = []
def create_log_handlers(self, log_path: str, module_name: str, log_to_amqp: bool=True, log_to_stderr: bool=True) -> None:
"""
Create AMQP and file (+ stdout) log handlers for logging
params:
log_path: Directory for log file
module_name: Name used to identify module
log_to_amqp:
log_to_stderr:
"""
assert(os.path.isdir(log_path))
file_path = os.path.join(log_path,module_name+".log")
self.log = logging.getLogger(module_name)
self.log.setLevel(logging.INFO)
if log_to_amqp:
# AMQP log handler
amqp_handler = AMQPLogHandler(module_name, self.channel)
amqp_handler.setLevel(logging.INFO)
self.log.addHandler(amqp_handler)
# File log handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler = logging.handlers.RotatingFileHandler(file_path , maxBytes=int(2e6), backupCount=5)
file_handler.setFormatter(formatter)
self.log.addHandler(file_handler)
if log_to_stderr:
# Create stdout log handler
stdout_handler = logging.StreamHandler()
stdout_handler.setFormatter(formatter)
self.log.addHandler(stdout_handler)
def setup_modules(self,
modules: List[Dict[str, Any]],
includes: Optional[List[str]]=None,
excludes: Optional[List[str]]=None) -> None:
"""
Read the modules from the configure file and start them.
Args.
modules: List of module definitions for the setup
includes: Modules to be included from the list. (If None all the modules will be loaded)
excludes: MOdules to be enxcluded from the list.
"""
self.log.info("Setup modules...")
for module_def in modules:
try:
self.validate_module_specification(module_def)
module = module_def.get('module', '')
if not isinstance(module, str) or ("." not in module):
raise RuntimeError(f"Errornouse module name {module}")
name = module_def.get("name", module)
ok = True
if includes is not None: # Whitelist/including
ok = reduce(lambda value, inc: value or (inc in name), includes, False)
if excludes is not None: # Blacklisting/excluding
ok = reduce(lambda value, exc: value and (exc not in name), excludes, ok)
if not ok:
continue
# Parse parameters
params = dict(self.globals)
for param in module_def.get("params", []):
for k in ("name","value"):
assert(k in param)
val = param.get("value", None)
if val is None:
continue
if type(val) == str:
if val.startswith("GLOBAL:"):
val = self.globals[val[len("GLOBAL:"):]]
if param.get("type") == "string":
val = str(val)
elif param.get("type") == "integer":
val = int(val)
elif param.get("type") == "float":
val = float(val)
elif param.get("type") == "boolean":
val = (val.lower() == "true")
if not (val is None):
params[param.get("name")] = val
# Exchange prefixing!
if self.prefix:
if "prefix" in params:
assert(type(params["prefix"]) == str)
params["prefix"] = "%s.%s" % (self.prefix, params["prefix"])
else:
params["prefix"] = self.prefix
if self.debug:
params["debug"] = True
# Create new process for the module
t = Process(target=self.worker, args=(module, params), daemon=True)
self.threads.append(t)
t.start()
except:
self.log.error("Failed to start module \"%s\"", module_def.get("name"))
raise
def declare_exchanges(self, exchanges: List[Tuple[str, str]]) -> None:
"""
Declare/redeclare all required AMQP exchanges.
"""
self.log.info("Declaring exchanges...")
for exchange, etype in exchanges:
self.log.debug("\t%s (%s)", exchange, etype)
try:
self.channel.exchange_delete(exchange)
self.channel.exchange_declare(exchange=exchange, type=etype, durable=True, auto_delete=False)
except:
raise ValueError("Invalid exchange spec.")
def check_exchanges(self, exchanges: List[Tuple[str, str]]) -> None:
"""
Declare/redeclare all required AMQP exchanges.
"""
self.log.info("Checking exchanges...")
for exchange, etype in exchanges:
self.log.debug("\t%s (%s)", exchange, etype)
try:
assert check_exchange_exists(exchange, etype)
except:
raise ValueError(f"Exchange {exchange} not according to spec.")
def wait(self) -> NoReturn:
"""
Wait until someone dies.
"""
try:
running = True
while running:
time.sleep(0.5)
for t in self.threads:
if not t.is_alive():
running = False
except KeyboardInterrupt:
pass
def worker(self, module: str, params: Dict[str, Any]) -> NoReturn:
"""
Worker function to start the new module.
"""
try:
# Parse package, class etc. names
package_name, class_name = module.rsplit('.', 1)
module_name = params.get("module_name", class_name) # Verbose name
package = import_module(package_name)
class_object = getattr(package, class_name)
# Check that all the required arguments have been define and output understandable error if not
argspec = inspect.getfullargspec(class_object.__init__)
for j, arg in enumerate(argspec.args[1:]):
if j >= len(argspec.defaults or []) and arg not in params:
raise RuntimeError(f"Module {module_name} (class {class_name}) missing argument {arg!r}")
with self.rlock:
self.log.info("Starting %s (%s.%s)", module_name, package_name, class_name)
# Call module class constuctor
instance = class_object(**params)
# Start executing
ret = instance.run()
# TODO: if asyncio.iscoroutine(ret): ret = await ret
with self.rlock:
self.log.info("Module %s (%s.%s) exited", module_name, package_name, class_name)
except KeyboardInterrupt:
pass
except: # Catch all exceptions!
with self.rlock:
self.log.critical("%s crashed!", module, exc_info=True)
def validate_module_specification(self, module_spec: dict) -> None:
""" Validate module specification """
REQUIRED_FIELDS = (
("name", str, False),
("module", str, True),
("params", list, False)
)
if not isinstance(module_spec, dict):
raise ModuleValidationError(f"Module specification is not a dict but a {type(module_spec)}")
for field_name, field_type, field_required in REQUIRED_FIELDS:
if field_name not in module_spec:
if not field_required:
continue
raise ModuleValidationError(f"{field_name!r} missing from module specification {module_spec!r}")
if not isinstance(module_spec[field_name], field_type):
raise ModuleValidationError(f"{field_name!r} has wrong type. Expected {field_type!r} got {type(module_spec[field_name])}")
for param in module_spec.get("params", []):
if not isinstance(param, dict):
raise ModuleValidationError(f"Parameter define {param!r} in is not a dict")
if "name" not in param:
raise ModuleValidationError(f"Parameter define {param!r} missing field 'name'")
if "value" not in param:
raise ModuleValidationError(f"Parameter define {param!r} missing field 'value'")
def validate_launch_specification(self, launch_cfg: Dict) -> None:
"""
"""
if not isinstance(launch_cfg, dict):
raise ModuleValidationError(f"Module specification is not a dict but a {type(launch_cfg)}")
if "exchanges" in launch_cfg:
exchanges = launch_cfg["exchanges"]
if not isinstance(exchanges, dict):
raise ModuleValidationError(f"Exchange specification is not a dict but a {type(exchanges)}")
if "modules" in launch_cfg:
modules = launch_cfg["modules"]
if not isinstance(launch_cfg, dict):
raise ModuleValidationError(f"Module specification is not a list but a {type(modules)}")
def setup_parser(parser: argparse.ArgumentParser) -> None:
from argcomplete import FilesCompleter
parser.add_argument('cfg',
help='Configuration file').completer = FilesCompleter("*.yaml")
parser.add_argument('--declare_exchanges', action='store_true',
help='Declare exchanges')
parser.add_argument('-d', '--debug', action='store_true',
help='Enable debug features')
parser.add_argument('--include', nargs='*',
help='Modules to be included from the configuration')
parser.add_argument('--exclude', nargs='*',
help='Modules to be excluded from the configuration')
def main(parser: argparse.ArgumentParser, args: argparse.Namespace) -> None:
Launcher(
cfg_file=args.cfg,
includes=args.include, excludes=args.exclude,
declare_exchanges=args.declare_exchanges,
debug=args.debug
)