Skip to content

Commit

Permalink
Merge pull request ManageIQ#19666 from carbonin/always_use_file_based…
Browse files Browse the repository at this point in the history
…_heartbeat

Always use file based heartbeat
  • Loading branch information
Fryguy authored Jan 14, 2020
2 parents d8a1005 + fe52aad commit 776b041
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 129 deletions.
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def do_work

# Only for file based heartbeating
def heartbeat_message_timeout(message)
if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout
if message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
end
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_server/worker_management/dequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_queue_message_for_worker(w)
end

def get_queue_message(pid)
update_worker_last_heartbeat(pid)
@workers_lock.synchronize(:SH) do
w = @workers[pid]

Expand Down
17 changes: 0 additions & 17 deletions app/models/miq_server/worker_management/heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ def worker_add_message(pid, item)
end unless @workers_lock.nil?
end

def update_worker_last_heartbeat(worker_pid)
@workers_lock.synchronize(:EX) do
@workers[worker_pid][:last_heartbeat] = Time.now.utc if @workers.key?(worker_pid)
end unless @workers_lock.nil?
end

def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)

Expand Down Expand Up @@ -52,7 +46,6 @@ def message_for_worker(wid, message, *args)
worker_set_message(w, message, *args) unless w.nil?
end

# Get the latest heartbeat between the SQL and memory (updated via DRb)
def persist_last_heartbeat(w)
last_heartbeat = workers_last_heartbeat(w)

Expand All @@ -71,16 +64,6 @@ def clean_heartbeat_files
private

def workers_last_heartbeat(w)
ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? workers_last_heartbeat_to_file(w) : workers_last_heartbeat_to_drb(w)
end

def workers_last_heartbeat_to_drb(w)
@workers_lock.synchronize(:SH) do
@workers.fetch_path(w.pid, :last_heartbeat)
end
end

def workers_last_heartbeat_to_file(w)
File.mtime(w.heartbeat_file).utc if File.exist?(w.heartbeat_file)
end
end
8 changes: 2 additions & 6 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def monitor_workers
# Clear the my_server cache so we can detect role and possibly other changes faster
self.class.my_server_clear_cache

resync_needed = sync_needed?
sync_monitor

# Sync the workers after sync'ing the child worker settings
sync_workers
Expand All @@ -32,8 +32,6 @@ def monitor_workers
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
next unless validate_worker(worker)
# Tell the valid workers to sync config if needed
worker_set_message(worker, "sync_config") if resync_needed
end

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
Expand Down Expand Up @@ -122,7 +120,7 @@ def do_system_limit_exceeded
end
end

def sync_needed?
def sync_monitor
@last_sync ||= Time.now.utc
sync_interval = @worker_monitor_settings[:sync_interval] || 30.minutes
sync_interval_reached = sync_interval.seconds.ago.utc > @last_sync
Expand Down Expand Up @@ -150,8 +148,6 @@ def sync_needed?

update_sync_timestamp(@last_sync)
end

resync_needed
end

def set_last_change(key, value)
Expand Down
41 changes: 19 additions & 22 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,6 @@ def do_exit(message = nil, exit_code = 0)
exit exit_code
end

def message_sync_config(*_args)
_log.info("#{log_prefix} Synchronizing configuration...")
sync_config
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

def sync_config
# Sync roles
@active_roles = MiqServer.my_active_roles(true)
Expand Down Expand Up @@ -326,7 +320,16 @@ def heartbeat
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb
heartbeat_to_file

if config_out_of_date?
_log.info("#{log_prefix} Synchronizing configuration...")
sync_config
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

process_messages_from_server unless MiqEnvironment::Command.is_podified?

@last_hb = now
do_heartbeat_work
rescue SystemExit, SignalException
Expand All @@ -335,41 +338,35 @@ def heartbeat
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def heartbeat_to_drb
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

def process_messages_from_server
worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
worker_monitor_drb.update_worker_last_heartbeat(@worker.pid)

worker_monitor_drb.worker_get_messages(@worker.pid).each do |msg, *args|
process_message(msg, *args)
end
rescue DRb::DRbError => err
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
do_exit("Error processing messages from MiqServer because #{err.class.name}: #{err.message}", 1)
end

def heartbeat_to_file(timeout = nil)
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)

get_messages.each { |msg, *args| process_message(msg, *args) }
end

def get_messages
messages = []
def config_out_of_date?
@my_last_config_change ||= Time.now.utc

last_config_change = server_last_change(:last_config_change)
if last_config_change && last_config_change > @my_last_config_change
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
messages << ["sync_config"]

@my_last_config_change = last_config_change
return true
end

messages
false
end

def key_store
Expand Down
1 change: 1 addition & 0 deletions lib/workers/bin/run_single_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def all_role_names
$log.info("Starting #{worker.class.name} with runner options #{runner_options}")
worker.class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start
ensure
FileUtils.rm_f(worker.heartbeat_file)
$log.info("Deleting worker record for #{worker.class.name}, id #{worker.id}")
worker.delete
end
Expand Down
109 changes: 47 additions & 62 deletions spec/models/miq_server/worker_management/heartbeat_spec.rb
Original file line number Diff line number Diff line change
@@ -1,84 +1,69 @@
describe MiqServer::WorkerManagement::Heartbeat do
context "#persist_last_heartbeat" do
let(:miq_server) { EvmSpecHelper.local_miq_server.tap(&:setup_drb_variables) }
let(:pid) { 1234 }
let(:worker) { FactoryBot.create(:miq_worker, :miq_server_id => miq_server.id, :pid => pid) }

it "sets initial and subsequent heartbeats" do
2.times do
t = Time.now.utc
Timecop.freeze(t) do
miq_server.update_worker_last_heartbeat(pid)
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(t)
end
end
let(:miq_server) { EvmSpecHelper.local_miq_server }
let(:worker) { FactoryBot.create(:miq_worker, :miq_server_id => miq_server.id) }

# Iterating by 5 each time to allow enough spacing to be more than 1 second
# apart when using be_within(x).of(t)
context "when using file based heartbeating" do
let!(:first_heartbeat) { Time.now.utc }
let!(:heartbeat_file) { "/path/to/worker.hb" }

around do |example|
ENV["WORKER_HEARTBEAT_METHOD"] = "file"
ENV["WORKER_HEARTBEAT_FILE"] = heartbeat_file
example.run
ENV.delete("WORKER_HEARTBEAT_METHOD")
ENV.delete("WORKER_HEARTBEAT_FILE")
end
let!(:first_heartbeat) { Time.now.utc }
let!(:heartbeat_file) { "/path/to/worker.hb" }

context "with an existing heartbeat file" do
it "sets initial and subsequent heartbeats" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(true, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat, first_heartbeat + 5)
around do |example|
ENV["WORKER_HEARTBEAT_METHOD"] = "file"
ENV["WORKER_HEARTBEAT_FILE"] = heartbeat_file
example.run
ENV.delete("WORKER_HEARTBEAT_METHOD")
ENV.delete("WORKER_HEARTBEAT_FILE")
end

[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end
context "with an existing heartbeat file" do
it "sets initial and subsequent heartbeats" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(true, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat, first_heartbeat + 5)

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
end
end
end

context "with a missing heartbeat file" do
it "sets initial heartbeat only" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false).exactly(4).times
expect(File).to receive(:mtime).with(heartbeat_file).never

# This has different results first iteration of the loop compared to
# the rest:
# 1. Sets the initial heartbeat
# 2. Doesn't update the worker's last_heartbeat value after that
#
# So the result from the database should not change after the first
# iteration of the loop
[0, 5, 10, 15].each do |i|
Timecop.freeze(first_heartbeat + i) do
miq_server.persist_last_heartbeat(worker)
end
context "with a missing heartbeat file" do
it "sets initial heartbeat only" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false).exactly(4).times
expect(File).to receive(:mtime).with(heartbeat_file).never

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat)
# This has different results first iteration of the loop compared to
# the rest:
# 1. Sets the initial heartbeat
# 2. Doesn't update the worker's last_heartbeat value after that
#
# So the result from the database should not change after the first
# iteration of the loop
[0, 5, 10, 15].each do |i|
Timecop.freeze(first_heartbeat + i) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat)
end
end
end

context "with a missing heartbeat file on the first validate" do
it "sets initial heartbeat default, and updates the heartbeat from the file second" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat + 5)

[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end
context "with a missing heartbeat file on the first validate" do
it "sets initial heartbeat default, and updates the heartbeat from the file second" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat + 5)

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
end
end
end
Expand Down
30 changes: 10 additions & 20 deletions spec/models/miq_worker/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,24 @@
end
end

context "#get_messages" do
context "#config_out_of_date?" do
before do
allow_any_instance_of(MiqWorker::Runner).to receive(:worker_initialization)
@worker_base = MiqWorker::Runner.new
end

it "gets sync_config when last config change was recent" do
allow(@worker_base).to receive(:server_last_change).with(:last_config_change).and_return(1.minute.from_now.utc)

expect(@worker_base.get_messages).to eq([["sync_config"]])
end
end

context "#process_message" do
before do
allow(MiqServer).to receive(:my_zone).and_return("default")
@miq_server = EvmSpecHelper.local_miq_server
allow(@miq_server).to receive(:active_role).and_return("automate)")

@worker = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id, :type => "MiqGenericWorker")
@worker_base = MiqWorker::Runner.new(:guid => @worker.guid)
it "returns true for the first call and false for subsequent calls" do
expect(@worker_base).to receive(:server_last_change).with(:last_config_change).thrice.and_return(1.minute.from_now.utc)
expect(@worker_base.config_out_of_date?).to be_truthy
expect(@worker_base.config_out_of_date?).to be_falsey
expect(@worker_base.config_out_of_date?).to be_falsey
end

it "syncs roles and configuration" do
expect(@worker_base).to receive(:after_sync_active_roles)
expect(@worker_base).to receive(:after_sync_config)
it "returns true when last config change was updated" do
expect(@worker_base).to receive(:server_last_change).with(:last_config_change).twice.and_return(1.minute.ago.utc, 1.minute.from_now.utc)

@worker_base.send(:process_message, "sync_config")
expect(@worker_base.config_out_of_date?).to be_falsey
expect(@worker_base.config_out_of_date?).to be_truthy
end
end

Expand Down

0 comments on commit 776b041

Please sign in to comment.