-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathbaseDeployer.py
30 lines (24 loc) · 960 Bytes
/
baseDeployer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import abc
from extraConfigRunner import ExtraConfigRunner
from clustersConfig import ClustersConfig, ExtraConfigArgs
import host
from typing import Optional
from concurrent.futures import Future
import timer
import collections
class BaseDeployer(abc.ABC):
def __init__(self, cc: ClustersConfig, steps: list[str]):
self._cc = cc
self._extra_config = ExtraConfigRunner(cc)
self._futures: dict[str, Future[Optional[host.Result]]] = {}
self.steps = tuple(steps)
def _prepost_config(self, to_run: ExtraConfigArgs) -> None:
self._extra_config.run(to_run, self._futures)
def _preconfig(self) -> None:
for e in self._cc.preconfig:
self._prepost_config(e)
def _postconfig(self) -> None:
for e in self._cc.postconfig:
self._prepost_config(e)
def _empty_timers(self) -> dict[str, timer.StopWatch]:
return collections.defaultdict(lambda: timer.StopWatch())