-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_jobs
executable file
·156 lines (122 loc) · 5.62 KB
/
queue_jobs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python3
# queue_jobs
import argparse
import os
import subprocess
import time
import math
import sys
import glob
parser = argparse.ArgumentParser(description="Creates a queue of jobs to submit to the cluster and throttles their submission.\nUseful for systems where a number of jobs have high I/O demands, or where the user has\nlimited disk space on the cluster and cannot support too many jobs running concurrently.")
parser.add_argument("scriptlistf", type=str, help="File containing the list of scripts to run, one per line. Use '-' for stdin.\nCan be a directory and all .sh files in that directory will be used as the set of jobs to run.")
parser.add_argument("-i", "--ignore", type=str, nargs='?', const='#', default='', help="Ignore lines beginning with a prefix. [default: #]\nString must be specified if -i is the last option given.")
parser.add_argument("-o", "--outdir", type=str, default=".", help="Directory for job output files. [default: current dir]")
parser.add_argument("-m", "--maxrun", type=int, default=1, help="Maximum number of running jobs. [default: 1]")
parser.add_argument("-r", "--refresh", type=str, default="60", help="Amount of time to wait between checking for open space in the queue (seconds or mm:ss) [default: 60]")
parser.add_argument("-A", "--count_all", action="store_true", help="Consider the user's entire job queue (i.e. count jobs being run even if not by this instance of queue_jobs) when deciding wither to submit new jobs")
parser.add_argument("-s", "--silent", action="store_true", help="Don't print information as jobs are submitted.")
args = parser.parse_args()
# parse input file list and make queue
if os.path.isdir(args.scriptlistf):
jobfile_queue = glob.glob(os.path.join(args.scriptlistf, "*.sh"))
else:
jobfile_queue = []
if args.scriptlistf == "-":
args.scriptlistf = sys.stdin
with open(args.scriptlistf, 'r') as slf:
for line in slf:
if args.ignore=='' or not line.startswith(args.ignore):
jobfile_queue.append(line.strip())
# parse refresh time
rtimesplit = args.refresh.split(":")[::-1]
if len(rtimesplit) > 3:
print("Refresh time may not be given in terms of days.")
sys.exit(1)
refresh_time = sum([int(rtimesplit[x]) * (60 ** x) for x in range(len(rtimesplit))])
if refresh_time > 24 * 60 * 60:
print("Refresh time may not be longer than one day.")
sys.exit(1)
# create output dir if needed
if args.outdir != '.':
if os.path.exists(args.outdir):
if not os.path.isdir(args.outdir):
print("outdir provided is not a directory.")
sys.exit(2)
else:
try:
os.mkdir(args.outdir)
except:
print("Unable to create output directory {}.".format(args.outdir))
sys.exit(2)
# current time as a string for verbose output
def nowstr():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# run a shell command and make its output usable
def shell_output(cmd_lst):
return subprocess.check_output(cmd_lst).decode("utf-8").replace('"', '').strip()
# run squeue to get listing of job IDs for this user
def get_all_jobids():
return shell_output(['squeue', '-h' , '-u', os.getlogin(), '-o', '"%i"']).split("\n")
# submit a single job
def submit_job(jobfile):
global submit_count
outfn = os.path.join(args.outdir, os.path.basename(jobfile) + ".out")
jobid = shell_output(['sbatch', '--parsable', '-o', outfn, jobfile])
submit_count += 1
if not args.silent:
print(("["+job_count_fmt+"/{}]\t{}\t{}\t{}").format(submit_count, total_jobcount, nowstr(), jobfile, jobid))
return jobid
#####################
submit_count = 0
running_jobids = []
total_jobcount = len(jobfile_queue)
job_count_fmt="{{:{}d}}".format(math.ceil(math.log10(total_jobcount))) # correct spacing for job number output
if not args.silent:
print("Queueing {} jobs at {} with parameters maxrun={}, refresh={} ({} s), outdir={}.".format(total_jobcount, nowstr(), str(args.maxrun)+("" if not args.count_all else " [count all]"), args.refresh, refresh_time, args.outdir))
while len(jobfile_queue) > 0:
try:
# update running job IDs
curr_jobs = get_all_jobids()
running_jobids = [x for x in running_jobids if x in curr_jobs]
# run new jobs if space available
if args.count_all:
available_jobs = args.maxrun - len(curr_jobs)
else:
available_jobs = args.maxrun - len(running_jobids)
while available_jobs > 0:
# submit new job and store its ID
new_jobid = submit_job(jobfile_queue.pop(0))
running_jobids.append(new_jobid)
# if no more files to submit, die
if len(jobfile_queue) == 0:
sys.exit(0)
available_jobs -= 1
# flush output
sys.stdout.flush()
# wait to check again
time.sleep(refresh_time)
except KeyboardInterrupt:
try:
print("""
Keyboard interrupt received.
Type a number and press Enter to change the number of simultaneous jobs.
Press Enter with no input to quit but keep running jobs alive.
Press Ctrl+C again to quit and kill running jobs.""", file=sys.stderr)
waiting_for_enter = input()
try:
args.maxrun = int(waiting_for_enter)
if not args.silent:
print("** maxrun changed to {}".format(args.maxrun))
except ValueError:
# Enter pressed - keep jobs running
print("Exiting without killing running jobs.", file=sys.stderr)
print("** Keyboard interrupt received; will keep remaining {} jobs running".format(len(running_jobids)))
sys.exit(1)
except KeyboardInterrupt:
# Ctrl+C pressed again - kill running jobs
print("\nKilling remaining jobs.", file=sys.stderr)
print("** Keyboard interrupt received; killing last {} jobs... ".format(len(running_jobids)), end='')
for jobid in running_jobids:
subprocess.call(["scancel", str(jobid)])
print("done.")
sys.exit(2)