From 928af2468135c4f645583d838f7dfa118bd0d57f Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Thu, 19 Dec 2019 18:08:22 -0500 Subject: [PATCH] PAUSED: Use `git resume` to continue working. --- app/models/miq_queue_worker_base/runner.rb | 2 +- .../miq_server/worker_management/dequeue.rb | 1 - .../miq_server/worker_management/heartbeat.rb | 17 -------- .../miq_server/worker_management/monitor.rb | 6 +-- app/models/miq_worker/runner.rb | 40 ++++++++----------- lib/workers/bin/run_single_worker.rb | 1 + 6 files changed, 20 insertions(+), 47 deletions(-) diff --git a/app/models/miq_queue_worker_base/runner.rb b/app/models/miq_queue_worker_base/runner.rb index 11fe2c4a60b0..a960e2733866 100644 --- a/app/models/miq_queue_worker_base/runner.rb +++ b/app/models/miq_queue_worker_base/runner.rb @@ -160,7 +160,7 @@ def do_work # Only for file based heartbeating def heartbeat_message_timeout(message) - if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout + if message.msg_timeout timeout = worker_settings[:poll] + message.msg_timeout heartbeat_to_file(timeout) end diff --git a/app/models/miq_server/worker_management/dequeue.rb b/app/models/miq_server/worker_management/dequeue.rb index 211dfdc5fb72..c373095fc72c 100644 --- a/app/models/miq_server/worker_management/dequeue.rb +++ b/app/models/miq_server/worker_management/dequeue.rb @@ -47,7 +47,6 @@ def get_queue_message_for_worker(w) end def get_queue_message(pid) - update_worker_last_heartbeat(pid) @workers_lock.synchronize(:SH) do w = @workers[pid] diff --git a/app/models/miq_server/worker_management/heartbeat.rb b/app/models/miq_server/worker_management/heartbeat.rb index 36790e4a6220..baaa17583b06 100644 --- a/app/models/miq_server/worker_management/heartbeat.rb +++ b/app/models/miq_server/worker_management/heartbeat.rb @@ -10,12 +10,6 @@ def worker_add_message(pid, item) end unless @workers_lock.nil? end - def update_worker_last_heartbeat(worker_pid) - @workers_lock.synchronize(:EX) do - @workers[worker_pid][:last_heartbeat] = Time.now.utc if @workers.key?(worker_pid) - end unless @workers_lock.nil? - end - def register_worker(worker_pid, worker_class, queue_name) worker_class = worker_class.constantize if worker_class.kind_of?(String) @@ -52,7 +46,6 @@ def message_for_worker(wid, message, *args) worker_set_message(w, message, *args) unless w.nil? end - # Get the latest heartbeat between the SQL and memory (updated via DRb) def persist_last_heartbeat(w) last_heartbeat = workers_last_heartbeat(w) @@ -71,16 +64,6 @@ def clean_heartbeat_files private def workers_last_heartbeat(w) - ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? workers_last_heartbeat_to_file(w) : workers_last_heartbeat_to_drb(w) - end - - def workers_last_heartbeat_to_drb(w) - @workers_lock.synchronize(:SH) do - @workers.fetch_path(w.pid, :last_heartbeat) - end - end - - def workers_last_heartbeat_to_file(w) File.mtime(w.heartbeat_file).utc if File.exist?(w.heartbeat_file) end end diff --git a/app/models/miq_server/worker_management/monitor.rb b/app/models/miq_server/worker_management/monitor.rb index 1b86ae7b866a..79b2172a9115 100644 --- a/app/models/miq_server/worker_management/monitor.rb +++ b/app/models/miq_server/worker_management/monitor.rb @@ -15,7 +15,7 @@ def monitor_workers # Clear the my_server cache so we can detect role and possibly other changes faster self.class.my_server_clear_cache - resync_needed = sync_needed? + sync_needed? # Sync the workers after sync'ing the child worker settings sync_workers @@ -32,8 +32,6 @@ def monitor_workers persist_last_heartbeat(worker) # Check the worker record for heartbeat timeouts next unless validate_worker(worker) - # Tell the valid workers to sync config if needed - worker_set_message(worker, "sync_config") if resync_needed end do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted? @@ -150,8 +148,6 @@ def sync_needed? update_sync_timestamp(@last_sync) end - - resync_needed end def set_last_change(key, value) diff --git a/app/models/miq_worker/runner.rb b/app/models/miq_worker/runner.rb index 5c422299948f..90190b9754d7 100644 --- a/app/models/miq_worker/runner.rb +++ b/app/models/miq_worker/runner.rb @@ -238,13 +238,8 @@ def do_exit(message = nil, exit_code = 0) exit exit_code end - def message_sync_config(*_args) - _log.info("#{log_prefix} Synchronizing configuration...") - sync_config - _log.info("#{log_prefix} Synchronizing configuration complete...") - end - def sync_config + _log.info("#{log_prefix} Synchronizing configuration...") # Sync roles @active_roles = MiqServer.my_active_roles(true) after_sync_active_roles @@ -263,6 +258,7 @@ def sync_config $log.log_hashes(@cfg) @worker.release_db_connection if @worker.respond_to?(:release_db_connection) + _log.info("#{log_prefix} Synchronizing configuration complete...") end def sync_log_level @@ -326,7 +322,11 @@ def heartbeat # Heartbeats can be expensive, so do them only when needed return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now - ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb + heartbeat_to_file + + sync_config if config_out_of_date? + process_messages_from_server unless MiqEnvironment::Command.is_podified? + @last_hb = now do_heartbeat_work rescue SystemExit, SignalException @@ -335,41 +335,35 @@ def heartbeat do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1) end - def heartbeat_to_drb + def heartbeat_to_file(timeout = nil) # Disable heartbeat check. Useful if a worker is running in isolation # without the oversight of MiqServer::WorkerManagement return if skip_heartbeat? - worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name) - worker_monitor_drb.update_worker_last_heartbeat(@worker.pid) + timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout + File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s) + end + def process_messages_from_server + worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name) worker_monitor_drb.worker_get_messages(@worker.pid).each do |msg, *args| process_message(msg, *args) end rescue DRb::DRbError => err - do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1) + do_exit("Error processing messages from MiqServer because #{err.class.name}: #{err.message}", 1) end - def heartbeat_to_file(timeout = nil) - timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout - File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s) - - get_messages.each { |msg, *args| process_message(msg, *args) } - end - - def get_messages - messages = [] + def config_out_of_date? @my_last_config_change ||= Time.now.utc last_config_change = server_last_change(:last_config_change) if last_config_change && last_config_change > @my_last_config_change _log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}") - messages << ["sync_config"] - @my_last_config_change = last_config_change + return true end - messages + false end def key_store diff --git a/lib/workers/bin/run_single_worker.rb b/lib/workers/bin/run_single_worker.rb index 9d2adddb4e9f..45ff67f55e91 100755 --- a/lib/workers/bin/run_single_worker.rb +++ b/lib/workers/bin/run_single_worker.rb @@ -112,6 +112,7 @@ def all_role_names $log.info("Starting #{worker.class.name} with runner options #{runner_options}") worker.class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start ensure + FileUtils.rm_f(worker.heartbeat_file) $log.info("Deleting worker record for #{worker.class.name}, id #{worker.id}") worker.delete end