Skip to content

Commit

Permalink
Process server monitor sync_config message by comparing timestamp fro…
Browse files Browse the repository at this point in the history
…m key store with last change timestamp
  • Loading branch information
gtanzillo committed Aug 16, 2017
1 parent 3be2350 commit 829ce96
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 4 deletions.
34 changes: 30 additions & 4 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,12 @@ def do_work_loop
end

def heartbeat
now = Time.now.utc
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb
@last_hb = now
do_heartbeat_work
rescue SystemExit, SignalException
raise
Expand All @@ -361,11 +366,7 @@ def heartbeat_to_drb
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

now = Time.now.utc
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now
messages = worker_monitor_drb.worker_heartbeat(@worker.pid, @worker.class.name, @worker.queue_name)
@last_hb = now
messages.each { |msg, *args| process_message(msg, *args) }
rescue DRb::DRbError => err
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
Expand All @@ -374,6 +375,31 @@ def heartbeat_to_drb
def heartbeat_to_file(timeout = nil)
timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)

get_messages.each { |msg, *args| process_message(msg, *args) }
end

def get_messages
messages = []
@my_last_config_change ||= Time.now.utc

last_config_change = server_last_change(:last_config_change)
if last_config_change && last_config_change > @my_last_config_change
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
messages << ["sync_config"]

@my_last_config_change = last_config_change
end

messages
end

def key_store
@key_store ||= Dalli::Client.new(MiqMemcached.server_address, :namespace => "server_monitor")
end

def server_last_change(key)
key_store.get(key)
end

def do_gc
Expand Down
31 changes: 31 additions & 0 deletions spec/models/miq_worker/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,35 @@
expect(@worker_base.worker_monitor_drb).to eq 0
end
end

context "#get_messages" do
before do
allow_any_instance_of(MiqWorker::Runner).to receive(:worker_initialization)
@worker_base = MiqWorker::Runner.new
end

it "gets sync_config when last config change was recent" do
allow(@worker_base).to receive(:server_last_change).with(:last_config_change).and_return(1.minute.from_now.utc)

expect(@worker_base.get_messages).to eq([["sync_config"]])
end
end

context "#process_message" do
before do
allow(MiqServer).to receive(:my_zone).and_return("default")
@miq_server = EvmSpecHelper.local_miq_server
allow(@miq_server).to receive(:active_role).and_return("automate)")

@worker = FactoryGirl.create(:miq_worker, :miq_server_id => @miq_server.id, :type => "MiqGenericWorker")
@worker_base = MiqWorker::Runner.new(:guid => @worker.guid)
end

it "syncs roles and configuration" do
expect(@worker_base).to receive(:after_sync_active_roles)
expect(@worker_base).to receive(:after_sync_config)

@worker_base.send(:process_message, "sync_config")
end
end
end

0 comments on commit 829ce96

Please sign in to comment.