Skip to content

Commit

Permalink
Merge pull request #13950 from jrafanie/darga_backport_13805_13919
Browse files Browse the repository at this point in the history
[DARGA] Kill workers that don't stop after a configurable time
  • Loading branch information
simaishi authored Mar 10, 2017
2 parents b15926d + 2b5d162 commit cad0c87
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 36 deletions.
8 changes: 6 additions & 2 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def check_not_responding(class_name = nil)
processed_workers = []
miq_workers.each do |w|
next unless class_name.nil? || (w.type == class_name)
next unless [:not_responding, :memory_exceeded].include?(worker_get_monitor_reason(w.pid))
next unless monitor_reason_not_responding?(w)
next unless [:waiting_for_stop_before_restart, :waiting_for_stop].include?(worker_get_monitor_status(w.pid))
processed_workers << w
worker_not_responding(w)
Expand All @@ -99,6 +99,10 @@ def check_not_responding(class_name = nil)
processed_workers.collect(&:id)
end

def monitor_reason_not_responding?(w)
[MiqServer::NOT_RESPONDING, MiqServer::MEMORY_EXCEEDED].include?(worker_get_monitor_reason(w.pid)) || w.stopping_for_too_long?
end

def do_system_limit_exceeded
self.class.monitor_class_names_in_kill_order.each do |class_name|
workers = class_name.constantize.find_current.to_a
Expand All @@ -109,7 +113,7 @@ def do_system_limit_exceeded
msg = "#{w.format_full_log_msg} is being stopped because system resources exceeded threshold, it will be restarted once memory has freed up"
_log.warn(msg)
MiqEvent.raise_evm_event_queue_in_region(w.miq_server, "evm_server_memory_exceeded", :event_details => msg, :type => w.class.name)
restart_worker(w, :memory_exceeded)
restart_worker(w, MiqServer::MEMORY_EXCEEDED)
break
end
end
Expand Down
3 changes: 3 additions & 0 deletions app/models/miq_server/worker_management/monitor/reason.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module MiqServer::WorkerManagement::Monitor::Reason
extend ActiveSupport::Concern

MEMORY_EXCEEDED = :memory_exceeded
NOT_RESPONDING = :not_responding

def worker_set_monitor_reason(pid, reason)
@workers_lock.synchronize(:EX) do
@workers[pid][:monitor_reason] = reason if @workers.key?(pid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def validate_worker(w)
msg = "#{w.format_full_log_msg} has not responded in #{Time.now.utc - w.last_heartbeat} seconds, restarting worker"
_log.error(msg)
MiqEvent.raise_evm_event_queue(w.miq_server, "evm_worker_not_responding", :event_details => msg, :type => w.class.name)
restart_worker(w, :not_responding)
restart_worker(w, MiqServer::NOT_RESPONDING)
return false
end

Expand Down
8 changes: 8 additions & 0 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,14 @@ def enabled_or_running?
!is_stopped? || actually_running?
end

def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || 10.minutes
status == MiqWorker::STATUS_STOPPING && last_heartbeat < stopping_timeout.seconds.ago
end

def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
Expand Down
1 change: 1 addition & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1102,6 +1102,7 @@
:poll_method: :normal
:restart_interval: 0.hours
:starting_timeout: 10.minutes
:stopping_timeout: 10.minutes
:ems_refresh_core_worker:
:poll: 1.seconds
:memory_threshold: 400.megabytes
Expand Down
34 changes: 34 additions & 0 deletions spec/models/miq_server/worker_management/monitor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
describe MiqServer::WorkerManagement::Monitor do
context "#check_not_responding" do
let(:server) { EvmSpecHelper.local_miq_server }
let(:worker) do
FactoryGirl.create(:miq_worker,
:type => "MiqGenericWorker",
:miq_server => server,
:pid => 12345,
:last_heartbeat => 5.minutes.ago)
end

before do
server.setup_drb_variables
server.worker_add(worker.pid)
end

it "destroys an unresponsive 'stopping' worker" do
worker.update(:last_heartbeat => 20.minutes.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers).to be_empty
expect { worker.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "monitors recently heartbeated 'stopping' workers" do
worker.update(:last_heartbeat => 1.minute.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers.first.id).to eq(worker.id)
end
end
end
78 changes: 45 additions & 33 deletions spec/models/miq_server/worker_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -351,48 +351,60 @@
end
end

context "with worker that is using a lot of memory" do
context "threshold validation" do
let(:worker) { FactoryGirl.create(:miq_worker, :miq_server_id => server.id, :pid => 42) }
let(:server) { @miq_server }

before(:each) do
@worker1 = FactoryGirl.create(:miq_worker, :miq_server_id => @miq_server.id, :memory_usage => 2.gigabytes, :pid => 42)
allow_any_instance_of(MiqServer).to receive(:get_time_threshold).and_return(2.minutes)
allow_any_instance_of(MiqServer).to receive(:get_memory_threshold).and_return(500.megabytes)
allow_any_instance_of(MiqServer).to receive(:get_restart_interval).and_return(0.hours)
@miq_server.setup_drb_variables
allow(server).to receive(:get_time_threshold).and_return(2.minutes)
allow(server).to receive(:get_memory_threshold).and_return(500.megabytes)
allow(server).to receive(:get_restart_interval).and_return(0.hours)
server.setup_drb_variables
end

it "should not trigger memory threshold if worker is creating" do
@worker1.status = MiqWorker::STATUS_CREATING
expect(@miq_server.validate_worker(@worker1)).to be_truthy
it "should mark not responding if not recently heartbeated" do
worker.update(:last_heartbeat => 20.minutes.ago)
expect(server.validate_worker(worker)).to be_falsey
expect(worker.reload.status).to eq(MiqWorker::STATUS_STOPPING)
end

it "should not trigger memory threshold if worker is starting" do
@worker1.status = MiqWorker::STATUS_STARTING
expect(@miq_server.validate_worker(@worker1)).to be_truthy
end
context "for excessive memory" do
before { worker.memory_usage = 2.gigabytes }

it "should trigger memory threshold if worker is started" do
@worker1.status = MiqWorker::STATUS_STARTED
expect(@miq_server).to receive(:worker_set_monitor_status).with(@worker1.pid, :waiting_for_stop_before_restart).once
@miq_server.validate_worker(@worker1)
end
it "should not trigger memory threshold if worker is creating" do
worker.status = MiqWorker::STATUS_CREATING
expect(server.validate_worker(worker)).to be_truthy
end

it "should trigger memory threshold if worker is ready" do
@worker1.status = MiqWorker::STATUS_READY
expect(@miq_server).to receive(:worker_set_monitor_status).with(@worker1.pid, :waiting_for_stop_before_restart).once
@miq_server.validate_worker(@worker1)
end
it "should not trigger memory threshold if worker is starting" do
worker.status = MiqWorker::STATUS_STARTING
expect(server.validate_worker(worker)).to be_truthy
end

it "should trigger memory threshold if worker is working" do
@worker1.status = MiqWorker::STATUS_WORKING
expect(@miq_server).to receive(:worker_set_monitor_status).with(@worker1.pid, :waiting_for_stop_before_restart).once
@miq_server.validate_worker(@worker1)
end
it "should trigger memory threshold if worker is started" do
worker.status = MiqWorker::STATUS_STARTED
expect(server).to receive(:worker_set_monitor_status).with(worker.pid, :waiting_for_stop_before_restart).once
server.validate_worker(worker)
end

it "should trigger memory threshold if worker is ready" do
worker.status = MiqWorker::STATUS_READY
expect(server).to receive(:worker_set_monitor_status).with(worker.pid, :waiting_for_stop_before_restart).once
server.validate_worker(worker)
end

it "should trigger memory threshold if worker is working" do
worker.status = MiqWorker::STATUS_WORKING
expect(server).to receive(:worker_set_monitor_status).with(worker.pid, :waiting_for_stop_before_restart).once
server.validate_worker(worker)
end

it "should return proper message on heartbeat" do
@worker1.status = MiqWorker::STATUS_READY
expect(@miq_server.worker_heartbeat(@worker1.pid)).to eq([])
@miq_server.validate_worker(@worker1) # Validation will populate message
expect(@miq_server.worker_heartbeat(@worker1.pid)).to eq([['exit']])
it "should return proper message on heartbeat" do
worker.status = MiqWorker::STATUS_READY
expect(server.worker_heartbeat(worker.pid)).to eq([])
server.validate_worker(worker) # Validation will populate message
expect(server.worker_heartbeat(worker.pid)).to eq([['exit']])
end
end
end
end
Expand Down
21 changes: 21 additions & 0 deletions spec/models/miq_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,27 @@ def check_has_required_role(worker_role_names, expected_result)
expect(@worker.worker_options).to eq(:guid => @worker.guid)
end

describe "#stopping_for_too_long?" do
subject { @worker.stopping_for_too_long? }

it "false if started" do
@worker.update(:status => described_class::STATUS_STARTED)
expect(subject).to be_falsey
end

it "true if stopping and not heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 30.minutes.ago)
expect(subject).to be_truthy
end

it "false if stopping and heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 1.minute.ago)
expect(subject).to be_falsey
end
end

it "is_current? false when starting" do
@worker.update_attribute(:status, described_class::STATUS_STARTING)
expect(@worker.is_current?).not_to be_truthy
Expand Down

0 comments on commit cad0c87

Please sign in to comment.