forked from cms-sw/cms-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgetWorkflowStatsFromES.py
208 lines (170 loc) · 6.49 KB
/
getWorkflowStatsFromES.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import json, sys
from time import time
from es_utils import get_payload
from ROOT import *
'''
this program uses pyROOT, no brainer would be to set cmsenv before running it
'''
def _format(s, **kwds):
return s % kwds
def getWorkflowStatsFromES(release='*', arch='*', lastNdays=7, page_size=0):
query_url = 'http://cmses-master02.cern.ch:9200/relvals_stats_*/_search'
query_datsets = """
{
"query": {
"filtered": {
"query": {
"bool": {
"should": [
{
"query_string": {
"query": "release:%(release_cycle)s AND architecture:%(architecture)s",
"lowercase_expanded_terms": false
}
}
]
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"from": %(start_time)s,
"to": %(end_time)s
}
}
}
]
}
}
}
},
"from": %(from)s,
"size": %(page_size)s
}
"""
datasets = {}
ent_from = 0
json_out = []
info_request = False
queryInfo = {}
queryInfo["end_time"] = int(time() * 1000)
queryInfo["start_time"] = queryInfo["end_time"] - int(86400 * 1000 * lastNdays)
queryInfo["architecture"] = arch
queryInfo["release_cycle"] = release
queryInfo["from"] = 0
if page_size < 1:
info_request = True
queryInfo["page_size"] = 2
else:
queryInfo["page_size"] = page_size
total_hits = 0
while True:
queryInfo["from"] = ent_from
es_data = get_payload(query_url, _format(query_datsets, **queryInfo)) # here
content = json.loads(es_data)
content.pop("_shards", None)
total_hits = content['hits']['total']
if info_request:
info_request = False
queryInfo["page_size"] = total_hits
continue
hits = len(content['hits']['hits'])
if hits == 0: break
ent_from = ent_from + hits
json_out.append(content)
if ent_from >= total_hits:
break
return json_out[0]['hits']['hits']
'''
have a function that narrows the result to fields of interest, described in a list and in the given order
'''
def filterElasticSearchResult(ES_result=None, list_of_fields=None):
#arch = ES_result[0]['_source']['architecture']
#print arch
final_struct = {}
for element in ES_result:
source_object = element['_source']
if source_object['exit_code'] is not 0: continue
stamp = source_object['@timestamp']
flow = source_object['workflow']
step = source_object['step']
if not stamp in final_struct:
final_struct.update({stamp: {}})
if not flow in final_struct[stamp]:
final_struct[stamp].update({flow: {}})
step_data = {}
for stat_item in list_of_fields:
step_data.update({stat_item: source_object[stat_item]})
final_struct[stamp][flow].update({step: step_data})
return final_struct
'''
deeper in this context :) ,this function
1. gets two objects filtered after the ES query
2. for each sub-step key found tries to find the same in both objects and to make the difference between their values
'''
def compareMetrics(firstObject=None, secondObject=None,workflow=None,stepnum=None):
fields = []
comparison_results = {}
for stamp in firstObject:
for wf in firstObject[stamp]:
for step in firstObject[stamp][wf]:
fields = firstObject[stamp][wf][step].keys()
break
break
break
for f in fields:
comparison_results.update({f: []})
for stamp in firstObject:
for wf in firstObject[stamp]:
if workflow:
if (float(wf) != float(workflow)): continue
for step in firstObject[stamp][wf]:
if stepnum:
#print stepnum, step
if str(stepnum) != str(step): continue
for field in firstObject[stamp][wf][step]:
#print field
if stamp in secondObject and wf in secondObject[stamp] \
and step in secondObject[stamp][wf] \
and field in secondObject[stamp][wf][step]:
first_metric = firstObject[stamp][wf][step][field]
second_metric = secondObject[stamp][wf][step][field]
if field is 'time' or 'cpu_avg':
difference = first_metric - second_metric
if field is 'rss_max' or 'rss_avg' or 'rss_75' or 'rss_25':
if second_metric is 0: continue #sometimes the result is zero even when the exit_code is non 0
#difference = 100 - ( float( float(first_metric) / float(second_metric) ) * 100 )
difference = (first_metric - second_metric) / 1048576
comparison_results[field].append(difference)
return comparison_results
if __name__ == "__main__":
opts = None
release = None
fields = ['time', 'rss_max', 'cpu_avg', 'rss_75' , 'rss_25' , 'rss_avg' ]
arch = 'slc6_amd64_gcc630'
days = int(sys.argv[5])
page_size = 0
limit = 20
release_one = sys.argv[1]
release_two = sys.argv[2]
archone = sys.argv[3]
archtwo = sys.argv[4]
wf_n = None
step_n = None
if len(sys.argv) > 6: wf_n = sys.argv[6]
if len(sys.argv) > 7: step_n = sys.argv[7]
print wf_n, step_n
json_out_first = getWorkflowStatsFromES(release_one, archone, days, page_size)
json_out_second = getWorkflowStatsFromES(release_two, archtwo, days, page_size)
filtered_first = filterElasticSearchResult(json_out_first, fields)
filtered_second = filterElasticSearchResult(json_out_second, fields)
comp_results = compareMetrics(filtered_first, filtered_second, wf_n, step_n)
print json.dumps(comp_results, indent=2, sort_keys=True, separators=(',', ': '))
for hist in comp_results:
histo = TH1F(hist, hist, 100000, -5000, 5000)
for i in comp_results[hist]:
histo.Fill(i)
histo.SaveAs(hist+".root")