forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcapture.rb
237 lines (200 loc) · 10.3 KB
/
capture.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
module Metric::CiMixin::Capture
def perf_capture_object
self.class.parent::MetricsCapture.new(self)
end
delegate :perf_collect_metrics, :to => :perf_capture_object
def ems_for_capture_target
if self.kind_of?(ExtManagementSystem)
self
elsif respond_to?(:ext_management_system) && ext_management_system.present?
ext_management_system
elsif respond_to?(:old_ext_management_system) && old_ext_management_system.present?
old_ext_management_system
end
end
def split_capture_intervals(interval_name, start_time, end_time, threshold = 1.day)
raise _("Start time must be earlier than End time") if start_time > end_time
# Create an array of ordered pairs from start_time and end_time so that each ordered pair is contained
# within the threshold. Then, reverse it so the newest ordered pair is first:
# start_time = 2017/01/01 12:00:00, end_time = 2017/01/04 12:00:00
# [[interval_name, 2017-01-03 12:00:00 UTC, 2017-01-04 12:00:00 UTC],
# [interval_name, 2017-01-02 12:00:00 UTC, 2017-01-03 12:00:00 UTC],
# [interval_name, 2017-01-01 12:00:00 UTC, 2017-01-02 12:00:00 UTC]]
(start_time.utc..end_time.utc).step_value(threshold).each_cons(2).collect do |s_time, e_time|
[interval_name, s_time, e_time]
end.reverse
end
private :split_capture_intervals
def perf_capture_queue(interval_name, options = {})
start_time = options[:start_time]
end_time = options[:end_time]
priority = options[:priority] || Metric::Capture.const_get("#{interval_name.upcase}_PRIORITY")
task_id = options[:task_id]
zone = options[:zone] || my_zone
zone = zone.name if zone.respond_to?(:name)
ems = ems_for_capture_target
raise ArgumentError, "invalid interval_name '#{interval_name}'" unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, "end_time cannot be specified if start_time is nil" if start_time.nil? && !end_time.nil?
raise ArgumentError, "target does not have an ExtManagementSystem" if ems.nil?
start_time = start_time.utc unless start_time.nil?
end_time = end_time.utc unless end_time.nil?
log_target = "#{self.class.name} name: [#{name}], id: [#{id}]"
cb = nil
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
end_time = Time.now.utc if end_time.nil?
else
start_time = last_perf_capture_on unless start_time
end_time = Time.now.utc unless end_time
cb = {:class_name => self.class.name, :instance_id => id, :method_name => :perf_capture_callback, :args => [[task_id]]} if task_id
end
# Determine what items we should be queuing up
items = [interval_name]
items = split_capture_intervals(interval_name, start_time, end_time) if start_time
# Queue up the actual items
queue_item = {
:class_name => self.class.name,
:instance_id => id,
:role => 'ems_metrics_collector',
:queue_name => ems.metrics_collector_queue_name,
:zone => zone,
:state => ['ready', 'dequeue'],
}
items.each do |item_interval, *start_and_end_time|
# Should both interval name and args (dates) be part of uniqueness query?
queue_item_options = queue_item.merge(:method_name => "perf_capture_#{item_interval}")
queue_item_options[:args] = start_and_end_time if start_and_end_time.present?
MiqQueue.put_or_update(queue_item_options) do |msg, qi|
if msg.nil?
qi[:priority] = priority
qi.delete(:state)
qi[:miq_callback] = cb if cb
qi
elsif msg.state == "ready" && (task_id || MiqQueue.higher_priority?(priority, msg.priority))
qi[:priority] = priority
# rerun the job (either with new task or higher priority)
qi.delete(:state)
if task_id
existing_tasks = (((msg.miq_callback || {})[:args] || []).first) || []
qi[:miq_callback] = cb.merge(:args => [existing_tasks + [task_id]])
end
qi
else
interval = qi[:method_name].sub("perf_capture_", "")
_log.debug "Skipping capture of #{log_target} - Performance capture for interval #{interval} is still running"
# NOTE: do not update the message queue
nil
end
end
end
end
def perf_capture_realtime(*args)
perf_capture('realtime', *args)
end
def perf_capture_hourly(*args)
perf_capture('hourly', *args)
end
def perf_capture_historical(*args)
perf_capture('historical', *args)
end
def perf_capture(interval_name, start_time = nil, end_time = nil)
unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, _("invalid interval_name '%{name}'") % {:name => interval_name}
end
raise ArgumentError, _("end_time cannot be specified if start_time is nil") if start_time.nil? && !end_time.nil?
start_time = start_time.utc unless start_time.nil?
end_time = end_time.utc unless end_time.nil?
log_header = "[#{interval_name}]"
log_target = "#{self.class.name} name: [#{name}], id: [#{id}]"
log_target << ", start_time: [#{start_time}]" unless start_time.nil?
log_target << ", end_time: [#{end_time}]" unless end_time.nil?
# Determine the start_time for capturing if not provided
if interval_name == 'historical'
start_time = Metric::Capture.historical_start_time if start_time.nil?
interval_name_for_capture = 'hourly'
else
start_time = last_perf_capture_on if start_time.nil?
if start_time.nil? && interval_name == 'hourly'
# For hourly on the first capture, we don't want to get all of the
# historical data, so we shorten the query
start_time = 4.hours.ago.utc
end
interval_name_for_capture = interval_name
end
# Determine the expected start time, so we can detect gaps or missing data
expected_start_range = start_time
# If we've changed power state within the last hour, the returned data
# may not include all the data we'd expect
expected_start_range = nil if self.respond_to?(:state_changed_on) && state_changed_on && state_changed_on > Time.now.utc - 1.hour
unless expected_start_range.nil?
# Shift the expected time for first item, since you may not get back an
# item for the first timestamp.
case interval_name
when 'realtime' then expected_start_range += (1.minute / Metric::Capture::REALTIME_METRICS_PER_MINUTE)
when 'hourly' then expected_start_range += 1.hour
end
expected_start_range = expected_start_range.iso8601
end
_log.info "#{log_header} Capture for #{log_target}..."
start_range = end_range = counters = counter_values = nil
_, t = Benchmark.realtime_block(:total_time) do
Benchmark.realtime_block(:capture_state) { perf_capture_state }
counters_by_mor, counter_values_by_mor_and_ts = perf_collect_metrics(interval_name_for_capture, start_time, end_time)
counters = counters_by_mor[ems_ref] || {}
counter_values = counter_values_by_mor_and_ts[ems_ref] || {}
ts = counter_values.keys.sort
start_range = ts.first
end_range = ts.last
end
_log.info "#{log_header} Capture for #{log_target}...Complete - Timings: #{t.inspect}"
if start_range.nil?
_log.info "#{log_header} Skipping processing for #{log_target} as no metrics were captured."
else
if expected_start_range && start_range > expected_start_range
_log.warn "#{log_header} For #{log_target}, expected to get data as of [#{expected_start_range}], but got data as of [#{start_range}]."
# Raise ems_performance_gap_detected alert event to enable notification.
MiqEvent.raise_evm_alert_event_queue(ext_management_system, "ems_performance_gap_detected",
:resource_class => self.class.name,
:resource_id => id,
:expected_start_range => expected_start_range,
:start_range => start_range
)
end
perf_process(interval_name, start_range, end_range, counters, counter_values)
end
end
def perf_capture_callback(task_ids, _status, _message, _result)
tasks = MiqTask.where(:id => task_ids)
tasks.each do |t|
t.lock do |task|
tkey = "#{self.class.name}:#{id}"
task.context_data[:complete] << tkey
task.pct_complete = (task.context_data[:complete].length.to_f / task.context_data[:targets].length.to_f) * 100
if (task.context_data[:targets] - task.context_data[:complete]).empty?
# Task is done, call the rollup on the parent
task.state, task.status, task.message = [MiqTask::STATE_FINISHED, MiqTask::STATUS_OK, "Performance collection complete, #{task.context_data[:complete].length} out of #{task.context_data[:targets].length} collections completed"]
# Splitting e.g. "ManageIQ::Providers::Openstack::InfraManager::EmsCluster:8" to class and id
pclass, _, pid = task.context_data[:parent].rpartition(":")
parent = pclass.constantize.find(pid)
msg = "Queueing [#{task.context_data[:interval]}] rollup to #{parent.class.name} id: [#{parent.id}] for time range: [#{task.context_data[:start]} - #{task.context_data[:end]}]"
_log.info "#{msg}..."
parent.perf_rollup_range_queue(task.context_data[:start], task.context_data[:end], task.context_data[:interval])
_log.info "#{msg}...Complete"
else
task.state, task.status, task.message = [MiqTask::STATE_ACTIVE, MiqTask::STATUS_OK, task.message = "Performance collection active, #{task.context_data[:complete].length} out of #{task.context_data[:targets].length} collections completed"]
end
_log.info("Updating task id: [#{task.id}] #{task.message}")
task.save!
end
end
end
def perf_capture_state
VimPerformanceState.capture(self)
end
def perf_capture_realtime_now
# For UI to enable refresh of realtime charts on demand
log_target = "#{self.class.name} name: [#{name}], id: [#{id}]"
_log.info "Realtime capture requested for #{log_target}"
perf_capture_queue('realtime', :priority => MiqQueue::HIGH_PRIORITY)
end
end