-
Notifications
You must be signed in to change notification settings - Fork 1
/
batch.py
72 lines (59 loc) · 1.67 KB
/
batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import logging
import multiprocessing
import time
import sys
import signal
from worker import work
logging.basicConfig(
format='%(asctime)s|%(process)d|%(message)s',
filename='bmt.log',
level=logging.INFO)
logger = logging.getLogger('master')
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(message)s')
ch.setLevel(logging.WARN)
ch.setFormatter(formatter)
logger.addHandler(ch)
contexts = []
finish = False
now_workers = 0
server_index = 0
max_workers = 100
rampup_time_interval = 60*3
rampup_step = 1
def handler(signum, frame):
global finish
logger.info('got a signal')
finish = True
def initialize():
logger.info('initialize master with %d processes'%concurrency)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
for i in range(max_workers):
p = multiprocessing.Process(
target=work, args=(str(i),))
contexts.append((p, i))
def start(from_nu, to_nu):
for context in contexts[from_nu:to_nu]:
logger.info('%s process start'%(context[1]))
context[0].start()
def main(concurrency):
global max_workers, now_workers
max_workers = concurrency
initialize()
logger.info('start loop!')
while True:
if finish:
logger.exception('finish jobs')
[c[0].terminate() for c in contexts[:now_workers]]
[c[0].join() for c in contexts[:now_workers]]
sys.exit(0)
logger.info('now:%d max:%d'%(now_workers-rampup_step, max_workers))
if now_workers <= max_workers:
now_workers += rampup_step
logger.warn('rampup %d/%d'%(now_workers, max_workers))
start(now_workers-rampup_step, now_workers)
time.sleep(rampup_time_interval)
if __name__ == "__main__":
concurrency = int(sys.argv[1])
main(concurrency)