diff --git a/app/models/miq_worker/runner.rb b/app/models/miq_worker/runner.rb index d70bfe6747b..bf6a59a97f0 100644 --- a/app/models/miq_worker/runner.rb +++ b/app/models/miq_worker/runner.rb @@ -348,7 +348,12 @@ def do_work_loop end def heartbeat + now = Time.now.utc + # Heartbeats can be expensive, so do them only when needed + return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now + ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb + @last_hb = now do_heartbeat_work rescue SystemExit, SignalException raise @@ -361,11 +366,7 @@ def heartbeat_to_drb # without the oversight of MiqServer::WorkerManagement return if skip_heartbeat? - now = Time.now.utc - # Heartbeats can be expensive, so do them only when needed - return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now messages = worker_monitor_drb.worker_heartbeat(@worker.pid, @worker.class.name, @worker.queue_name) - @last_hb = now messages.each { |msg, *args| process_message(msg, *args) } rescue DRb::DRbError => err do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1) @@ -374,6 +375,31 @@ def heartbeat_to_drb def heartbeat_to_file(timeout = nil) timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s) + + get_messages.each { |msg, *args| process_message(msg, *args) } + end + + def get_messages + messages = [] + @my_last_config_change ||= Time.now.utc + + last_config_change = server_last_change(:last_config_change) + if last_config_change && last_config_change > @my_last_config_change + _log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}") + messages << ["sync_config"] + + @my_last_config_change = last_config_change + end + + messages + end + + def key_store + @key_store ||= Dalli::Client.new(MiqMemcached.server_address, :namespace => "server_monitor") + end + + def server_last_change(key) + key_store.get(key) end def do_gc diff --git a/spec/models/miq_worker/runner_spec.rb b/spec/models/miq_worker/runner_spec.rb index e013f64122b..0ed477ae510 100644 --- a/spec/models/miq_worker/runner_spec.rb +++ b/spec/models/miq_worker/runner_spec.rb @@ -38,4 +38,35 @@ expect(@worker_base.worker_monitor_drb).to eq 0 end end + + context "#get_messages" do + before do + allow_any_instance_of(MiqWorker::Runner).to receive(:worker_initialization) + @worker_base = MiqWorker::Runner.new + end + + it "gets sync_config when last config change was recent" do + allow(@worker_base).to receive(:server_last_change).with(:last_config_change).and_return(1.minute.from_now.utc) + + expect(@worker_base.get_messages).to eq([["sync_config"]]) + end + end + + context "#process_message" do + before do + allow(MiqServer).to receive(:my_zone).and_return("default") + @miq_server = EvmSpecHelper.local_miq_server + allow(@miq_server).to receive(:active_role).and_return("automate)") + + @worker = FactoryGirl.create(:miq_worker, :miq_server_id => @miq_server.id, :type => "MiqGenericWorker") + @worker_base = MiqWorker::Runner.new(:guid => @worker.guid) + end + + it "syncs roles and configuration" do + expect(@worker_base).to receive(:after_sync_active_roles) + expect(@worker_base).to receive(:after_sync_config) + + @worker_base.send(:process_message, "sync_config") + end + end end