-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline_status.py
166 lines (118 loc) · 3.98 KB
/
pipeline_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3
"""
Purpose
-------
This module is intended to collect pipeline run statistics (such as
time, cpu, RAM for each tasks) into a report JSON
Expected input
--------------
- ``trace_file`` : *Trace file generated by nextflow*
Code documentation
------------------
"""
__version__ = "1.0.0"
__build__ = "16012018"
__template__ = "pipeline_status-nf"
import os
import json
import traceback
from os.path import join
from assemblerflow_utils.assemblerflow_base import get_logger, log_error
logger = get_logger(__file__)
LOG_STATS = ".pipeline_status.json"
if __file__.endswith(".command.sh"):
fastq_id = 'sample_id'
TRACE_FILE = 'pipeline_stats.txt'
WORKDIR = '${workflow.projectDir}'
def get_json_info(fields, header):
"""
Parameters
----------
fields
Returns
-------
"""
json_dic = dict((x, y) for x, y in zip(header, fields))
return json_dic
def get_previous_stats(stats_path):
"""
Parameters
----------
workdir
Returns
-------
"""
logger.debug("Path to pipeline status data set to: {}".format(stats_path))
if os.path.exists(stats_path):
logger.debug("Existing pipeline status data found. Loading JSON.")
with open(stats_path) as fh:
stats_json = json.load(fh)
else:
logger.debug("No pipeline status data found.")
stats_json = {}
return stats_json
def main(sample_id, trace_file, workdir):
"""
Parses a nextflow trace file, searches for processes with a specific tag
and sends a JSON report with the relevant information
The expected fields for the trace file are::
0. task_id
1. process
2. tag
3. status
4. exit code
5. start timestamp
6. container
7. cpus
8. duration
9. realtime
10. queue
11. cpu percentage
12. memory percentage
13. real memory size of the process
14. virtual memory size of the process
Parameters
----------
trace_file : str
Path to the nextflow trace file
"""
# Determine the path of the stored JSON for the sample_id
stats_suffix = ".stats.json"
stats_path = join(workdir, sample_id + stats_suffix)
trace_path = join(workdir, trace_file)
logger.info("Starting pipeline status routine")
logger.debug("Checking for previous pipeline status data")
stats_array = get_previous_stats(stats_path)
logger.info("Stats JSON object set to : {}".format(stats_array))
# Search for this substring in the tags field. Only lines with this
# tag will be processed for the reports
tag = " getStats"
logger.debug("Tag variable set to: {}".format(tag))
logger.info("Starting parsing of trace file: {}".format(trace_path))
with open(trace_path) as fh:
header = next(fh).strip().split()
logger.debug("Header set to: {}".format(header))
for line in fh:
fields = line.strip().split("\t")
# Check if tag substring is in the tag field of the nextflow trace
if tag in fields[2] and fields[3] == "COMPLETED":
logger.debug(
"Parsing trace line with COMPLETED status: {}".format(
line))
current_json = get_json_info(fields, header)
stats_array[fields[0]] = current_json
else:
logger.debug(
"Ignoring trace line without COMPLETED status"
" or stats specific tag: {}".format(
line))
with open(join(stats_path), "w") as fh, open(".report.json", "w") as rfh:
fh.write(json.dumps(stats_array, separators=(",", ":")))
rfh.write(json.dumps(stats_array, separators=(",", ":")))
if __name__ == "__main__":
try:
main(fastq_id, TRACE_FILE, WORKDIR)
except Exception:
logger.error("Module exited unexpectedly with error:\\n{}".format(
traceback.format_exc()))
log_error()