forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 1
/
miq_worker.rb
633 lines (504 loc) · 17.8 KB
/
miq_worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
require 'io/wait'
class MiqWorker < ApplicationRecord
include ContainerCommon
include SystemdCommon
include UuidMixin
after_initialize :set_system_uid
before_destroy :error_out_tasks_with_active_queue_message, :log_destroy_of_worker_messages
belongs_to :miq_server
has_many :messages, :as => :handler, :class_name => 'MiqQueue'
has_many :active_messages, -> { where(["state = ?", "dequeue"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :ready_messages, -> { where(["state = ?", "ready"]) }, :as => :handler, :class_name => 'MiqQueue'
has_many :processed_messages, -> { where.not(:state => "ready") }, :as => :handler, :class_name => 'MiqQueue', :dependent => :destroy
virtual_column :friendly_name, :type => :string
virtual_column :uri_or_queue_name, :type => :string
scope :with_miq_server_id, ->(server_id) { where(:miq_server_id => server_id) }
scope :with_status, ->(status) { where(:status => status) }
class_attribute :my_guid
STATUS_CREATING = 'creating'.freeze
STATUS_STARTING = 'starting'.freeze
STATUS_STARTED = 'started'.freeze
STATUS_READY = 'ready'.freeze
STATUS_WORKING = 'working'.freeze
STATUS_STOPPING = 'stopping'.freeze
STATUS_STOPPED = 'stopped'.freeze
STATUS_KILLED = 'killed'.freeze
STATUS_ABORTED = 'aborted'.freeze
STATUSES_STARTING = [STATUS_CREATING, STATUS_STARTING]
STATUSES_CURRENT = [STATUS_STARTED, STATUS_READY, STATUS_WORKING]
STATUSES_STOPPED = [STATUS_STOPPED, STATUS_KILLED, STATUS_ABORTED]
STATUSES_CURRENT_OR_STARTING = STATUSES_CURRENT + STATUSES_STARTING
STATUSES_ALIVE = STATUSES_CURRENT_OR_STARTING + [STATUS_STOPPING]
PROCESS_INFO_FIELDS = %i[priority memory_usage percent_memory percent_cpu memory_size cpu_time proportional_set_size unique_set_size]
PROCESS_TITLE_PREFIX = "MIQ:".freeze
def self.atShutdown
stop_workers
end
class << self
attr_writer :workers, :rails_worker
end
def self.bundler_groups
%w[manageiq_default ui_dependencies]
end
def self.kill_priority
raise NotImplementedError, ".kill_priority must be implemented in a subclass"
end
def self.workers
return 0 unless has_required_role?
return @workers.call if @workers.kind_of?(Proc)
return @workers unless @workers.nil?
workers_configured_count
end
def self.rails_worker?
return @rails_worker.call if @rails_worker.kind_of?(Proc)
return @rails_worker unless @rails_worker.nil?
true
end
def self.scalable?
maximum_workers_count.nil? || maximum_workers_count > 1
end
def scalable?
self.class.scalable?
end
def self.workers_configured_count
count = worker_settings[:count]
if maximum_workers_count.kind_of?(Integer) && (maximum_workers_count < count)
count = maximum_workers_count
end
count
end
def self.concrete_subclasses
leaf_subclasses | descendants.select { |d| d.try(:acts_as_sti_leaf_class?) }
end
class_attribute :default_queue_name, :required_roles, :maximum_workers_count, :include_stopping_workers_on_synchronize, :worker_settings_paths
self.include_stopping_workers_on_synchronize = false
self.required_roles = []
self.worker_settings_paths = []
def self.server_scope
return current_scope if current_scope && current_scope.where_values_hash.include?('miq_server_id')
where(:miq_server_id => MiqServer.my_server&.id)
end
CONDITION_CURRENT = {:status => STATUSES_CURRENT}
def self.find_current
server_scope.where(CONDITION_CURRENT)
end
def self.find_current_in_my_region
in_my_region.where(CONDITION_CURRENT)
end
def self.find_starting
server_scope.where(:status => STATUSES_STARTING)
end
def self.find_all_starting
find_starting.where(:type => MiqWorkerType.worker_class_names)
end
def self.find_current_or_starting
server_scope.where(:status => STATUSES_CURRENT_OR_STARTING)
end
def self.find_alive
server_scope.where(:status => STATUSES_ALIVE)
end
def self.has_required_role?
roles = if required_roles.kind_of?(Proc)
required_roles.call
else
required_roles
end
return true if roles.blank?
roles = Array(roles) if roles.kind_of?(String)
raise _("Unexpected type: <self.required_roles.class.name>") unless roles.kind_of?(Array)
roles.any? { |role| MiqServer.my_server.has_active_role?(role) }
end
def self.enough_resource_to_start_worker?
MiqServer.my_server.enough_resource_to_start_worker?(self)
end
def self.sync_workers
w = include_stopping_workers_on_synchronize ? find_alive : find_current_or_starting
current = w.length
desired = workers
result = {:adds => [], :deletes => []}
if current != desired
_log.info("Workers are being synchronized: Current #: [#{current}], Desired #: [#{desired}]")
if desired > current && enough_resource_to_start_worker?
(desired - current).times { result[:adds] << start_worker.pid }
elsif desired < current
w = w.to_a
(current - desired).times do
ww = w.pop
result[:deletes] << ww.pid
ww.stop
end
end
end
result
end
# Convert the Models name from MiqGenericWorker to :generic_worker
def self.settings_name
@settings_name ||=
if self == MiqWorker
:worker_base
elsif module_parent.try(:short_token)
# :generic_worker_infra, :generic_worker_vmware
:"#{normalized_type}_#{module_parent.short_token.underscore}"
else
# :generic_worker
normalized_type.to_sym
end
end
# Grab all the classes in the hierarchy below ActiveRecord::Base
def self.path_to_my_worker_settings
@path_to_my_worker_settings ||=
ancestors.grep(Class).select { |c| c <= MiqWorker }.reverse.collect(&:settings_name)
end
def self.fetch_worker_settings_from_server(miq_server, options = {})
return {} unless miq_server
# TODO: commit bb15370a2131e5a8f02f63de334959685b68d620 added the conditional here
# to prefer the passed in options before using the server settings for bug 998991,
# bug 1004455, bug 1004459. We'll keep this logic for now.
server_config = options[:config] || miq_server.settings
fetch_worker_settings_from_options_hash(server_config, options[:raw])
end
def self.fetch_worker_settings_from_options_hash(options_hash, raw = false)
return {} unless options_hash.key?(:workers)
settings = {}
# Get the configuration values
section = options_hash[:workers]
unless section.nil?
classes = path_to_my_worker_settings
classes.each do |c|
section = section[c]
raise _("Missing config section %{section_name}") % {:section_name => c} if section.nil?
defaults = section[:defaults]
unless defaults.nil?
defaults.delete_if { |_k, v| v == Vmdb::Settings::RESET_VALUE }
settings.merge!(defaults)
end
end
section.delete_if { |_k, v| v == Vmdb::Settings::RESET_VALUE }
settings.merge!(section)
normalize_settings!(settings) unless raw == true
end
settings
end
# If not specified, provide the worker_settings cleaned up in fixnums, etc. instead of 1.seconds, 10.megabytes
# and decrypt any values which are encrypted with ManageIQ::Password.
def self.normalize_settings!(settings, recurse: false)
settings.each_key do |k|
v = settings[k]
if v.kind_of?(Hash) && recurse
normalize_settings!(v, :recurse => true)
elsif v.kind_of?(String)
if v.number_with_method?
settings[k] = v.to_i_with_method
elsif v.match?(/\A\d+(.\d+)?\z/) # case where int/float saved as string
settings[k] = v.to_i
elsif ManageIQ::Password.encrypted?(v)
settings[k] = ManageIQ::Password.decrypt(v)
end
end
end
end
def worker_settings(options = {})
self.class.fetch_worker_settings_from_server(miq_server, options)
end
def heartbeat_file
@heartbeat_file ||= Workers::MiqDefaults.heartbeat_file(guid)
end
def self.worker_settings(options = {})
fetch_worker_settings_from_server(MiqServer.my_server, options)
end
def self.reload_worker_settings
# By default worker settings are reloaded periodically by the worker runner
end
def self.start_workers
return unless has_required_role?
workers.times { start_worker }
end
def self.stop_workers
server_scope.each(&:stop)
end
def self.restart_workers
find_current.each(&:restart)
end
def self.status_update
find_current.each(&:status_update)
end
def self.log_status(level = :info)
find_current.each { |w| w.log_status(level) }
end
def self.init_worker_object(*params)
params = params.first
params = {} unless params.kind_of?(Hash)
params[:queue_name] = default_queue_name unless params.key?(:queue_name) || default_queue_name.nil?
params[:status] = STATUS_CREATING
params[:last_heartbeat] = Time.now.utc
server_scope.new(params)
end
def self.create_worker_record(*params)
init_worker_object(*params).tap(&:save!)
end
def self.start_worker(*params)
w = containerized_worker? ? init_worker_object(*params) : create_worker_record(*params)
w.start
w
end
cache_with_timeout(:my_worker) { server_scope.find_by(:guid => my_guid) }
def self.status_update_all
MiqWorker.status_update
end
def self.log_status_all(level = :info)
MiqWorker.log_status(level)
end
def self.containerized_worker?
MiqEnvironment::Command.is_podified?
end
def containerized_worker?
self.class.containerized_worker?
end
def self.systemd_worker?
MiqEnvironment::Command.supports_systemd?
end
def systemd_worker?
self.class.systemd_worker?
end
def start_runner
if systemd_worker?
start_systemd_worker
elsif containerized_worker?
start_runner_via_container
else
start_runner_via_spawn
end
end
def start_runner_via_container
create_container_objects
end
def self.build_command_line(guid, ems_id = nil)
raise ArgumentError, "No guid provided" unless guid
require 'awesome_spawn'
cmd = "#{Gem.ruby} #{runner_script}"
cmd = "nice -n #{nice_increment} #{cmd}" if ENV["APPLIANCE"]
options = {:guid => guid, :heartbeat => nil}
if ems_id
options[:ems_id] = ems_id.kind_of?(Array) ? ems_id.join(",") : ems_id
end
"#{AwesomeSpawn::CommandLineBuilder.new.build(cmd, options)} #{name}"
end
def self.runner_script
script = ManageIQ.root.join("lib/workers/bin/run_single_worker.rb")
raise "script not found: #{script}" unless File.exist?(script)
script
end
def command_line
self.class.build_command_line(*worker_options.values_at(:guid, :ems_id))
end
def start_runner_via_spawn
pid = Kernel.spawn(
{"BUNDLER_GROUPS" => self.class.bundler_groups.join(",")},
command_line,
[:out, :err] => [Rails.root.join("log/evm.log"), "a"]
)
Process.detach(pid)
pid
end
def start
self.pid = start_runner
save if !containerized_worker? && !systemd_worker?
msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
_log.info(msg)
self
end
def stop
miq_server.worker_manager.stop_worker_queue(self)
end
# Let the worker monitor start a new worker
alias restart stop
def kill
kill_process
destroy
end
# kill needs be done by the worker's orchestrator pod / server process
# TODO: Note, stop is async through the queue, while kill is sync. Should kill be async too?
# Also, this looks a lot like MiqServer#stop_worker_queue except stop_worker is called on the server row whereas
# we're calling kill on the worker row.
def kill_async
MiqQueue.put_unless_exists(
:class_name => self.class.name,
:instance_id => id,
:method_name => 'kill',
:queue_name => 'miq_server',
:server_guid => miq_server.guid,
:zone => miq_server.my_zone
)
end
def kill_process
if containerized_worker?
delete_container_objects
return
end
unless pid.nil?
begin
_log.info("Killing worker: ID [#{id}], PID [#{pid}], GUID [#{guid}], status [#{status}]")
Process.kill(9, pid)
loop do
break unless is_alive?
sleep(0.01)
end
rescue Errno::ESRCH
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed")
rescue => err
_log.warn("Worker ID [#{id}] PID [#{pid}] GUID [#{guid}] has been killed, but with the following error: #{err}")
end
end
end
def is_current?
STATUSES_CURRENT.include?(status)
end
def is_alive?
STATUSES_ALIVE.include?(status) && actually_running?
end
def is_stopped?
STATUSES_STOPPED.include?(status)
end
def current_or_starting?
STATUSES_CURRENT_OR_STARTING.include?(status)
end
def started?
STATUS_STARTED == status
end
def actually_running?
MiqProcess.is_worker?(pid)
end
def enabled_or_running?
!is_stopped? || actually_running?
end
def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || Workers::MiqDefaults.stopping_timeout
status == MiqWorker::STATUS_STOPPING && (last_heartbeat + current_timeout.to_i) < stopping_timeout.seconds.ago
end
def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
def clean_active_messages
active_messages.each do |m|
_log.warn("Message id: [#{m.id}] Setting state to 'error'")
m.delivered_in_error('Clean Active Messages')
end
end
private def set_system_uid
self.system_uid = unit_name if systemd_worker?
end
private def error_out_tasks_with_active_queue_message
message = "Task Handler: [#{friendly_name}] ID [#{id}] has been deleted!"
processed_messages.includes(:miq_task).where.not(:miq_task_id => nil).each do |m|
# Note, this is done synchronously from destroy because workers have 1 message they're currently "handling"
# and each message can only have 1 task, so this should be very fast even for many workers.
m.miq_task.update_status(MiqTask::STATE_FINISHED, MiqTask::STATUS_ERROR, message)
end
end
def log_destroy_of_worker_messages
ready_messages.each do |m|
_log.warn("Nullifying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
m.update(:handler_id => nil, :handler_type => nil) rescue nil
end
processed_messages.each do |m|
_log.warn("Destroying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
end
end
def status_update
return if MiqEnvironment::Command.is_podified?
begin
pinfo = MiqProcess.processInfo(pid)
rescue Errno::ESRCH
update(:status => STATUS_ABORTED)
_log.warn("No such process [#{friendly_name}] with PID=[#{pid}], aborting worker.")
rescue => err
_log.warn("Unexpected error: #{err.message}, while requesting process info for [#{friendly_name}] with PID=[#{pid}]")
else
# Ensure the hash only contains the values we want to store in the table
pinfo.slice!(*PROCESS_INFO_FIELDS)
pinfo[:os_priority] = pinfo.delete(:priority)
update!(pinfo)
end
end
def log_status(level = :info)
_log.send(level, "[#{friendly_name}] Worker ID [#{id}], PID [#{pid}], GUID [#{guid}], Last Heartbeat [#{last_heartbeat}], Process Info: Memory Usage [#{memory_usage}], Memory Size [#{memory_size}], Proportional Set Size: [#{proportional_set_size}], Unique Set Size: [#{unique_set_size}], Memory % [#{percent_memory}], CPU Time [#{cpu_time}], CPU % [#{percent_cpu}], Priority [#{os_priority}]")
end
def current_timeout
msg = active_messages.first
msg.try(:msg_timeout)
end
def uri_or_queue_name
uri || queue_name
end
def friendly_name
normalized_type.titleize
end
delegate :normalized_type, :to => :class
def self.abbreviated_class_name
name.sub(/^ManageIQ::Providers::/, "")
end
def abbreviated_class_name
self.class.abbreviated_class_name
end
def self.minimal_class_name
abbreviated_class_name
.sub("Miq", "")
.sub("Worker", "")
end
def minimal_class_name
self.class.minimal_class_name
end
def database_application_name
zone = MiqServer.my_server.zone
"MIQ|#{Process.pid}|#{miq_server.compressed_id}|#{compressed_id}|#{zone.compressed_id}|#{minimal_class_name}|#{zone.name}".truncate(64)
end
def format_full_log_msg
"Worker [#{self.class}] with ID: [#{id}], PID: [#{pid}], GUID: [#{guid}]"
end
def format_short_log_msg
"Worker ID: [#{id}]"
end
def self.release_db_connection
ActiveRecord::Base.connection_pool.release_connection if ActiveRecord::Base.connected?
end
def update_heartbeat
update_attribute(:last_heartbeat, Time.now.utc)
end
def self.config_settings_path
@config_settings_path ||= [:workers] + path_to_my_worker_settings
end
class << self
attr_writer :config_settings_path
end
def update_spid(spid = ActiveRecord::Base.connection.spid)
self.sql_spid = spid
end
def update_spid!(spid = ActiveRecord::Base.connection.spid)
if sql_spid != spid
self.sql_spid = spid
save
end
end
def worker_options
{:guid => guid}
end
def self.normalized_type
@normalized_type ||= if module_parent == Object
name.sub(/^Miq/, '').underscore
else
name.demodulize.underscore
end
end
def self.nice_increment
delta = worker_settings[:nice_delta]
delta.kind_of?(Integer) ? delta.to_s : "10"
end
def self.display_name(number = 1)
n_('Worker', 'Workers', number)
end
private_class_method :nice_increment
end