Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill workers that don't stop after a configurable time #13805

Merged
merged 2 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def check_not_responding(class_name = nil)
processed_workers = []
miq_workers.each do |w|
next unless class_name.nil? || (w.type == class_name)
next unless [:not_responding, :memory_exceeded].include?(worker_get_monitor_reason(w.pid))
next unless monitor_reason_not_responding?(w)
next unless [:waiting_for_stop_before_restart, :waiting_for_stop].include?(worker_get_monitor_status(w.pid))
processed_workers << w
worker_not_responding(w)
Expand All @@ -99,6 +99,10 @@ def check_not_responding(class_name = nil)
processed_workers.collect(&:id)
end

def monitor_reason_not_responding?(w)
[Reason::NOT_RESPONDING, Reason::MEMORY_EXCEEDED].include?(worker_get_monitor_reason(w.pid)) || w.stopping_for_too_long?
end

def do_system_limit_exceeded
self.class.monitor_class_names_in_kill_order.each do |class_name|
workers = class_name.constantize.find_current.to_a
Expand All @@ -109,7 +113,7 @@ def do_system_limit_exceeded
msg = "#{w.format_full_log_msg} is being stopped because system resources exceeded threshold, it will be restarted once memory has freed up"
_log.warn(msg)
MiqEvent.raise_evm_event_queue_in_region(w.miq_server, "evm_server_memory_exceeded", :event_details => msg, :type => w.class.name)
restart_worker(w, :memory_exceeded)
restart_worker(w, Reason::MEMORY_EXCEEDED)
break
end
end
Expand Down
3 changes: 3 additions & 0 deletions app/models/miq_server/worker_management/monitor/reason.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
module MiqServer::WorkerManagement::Monitor::Reason
extend ActiveSupport::Concern

MEMORY_EXCEEDED = :memory_exceeded
NOT_RESPONDING = :not_responding

def worker_set_monitor_reason(pid, reason)
@workers_lock.synchronize(:EX) do
@workers[pid][:monitor_reason] = reason if @workers.key?(pid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def validate_worker(w)
msg = "#{w.format_full_log_msg} has not responded in #{Time.now.utc - w.last_heartbeat} seconds, restarting worker"
_log.error(msg)
MiqEvent.raise_evm_event_queue(w.miq_server, "evm_worker_not_responding", :event_details => msg, :type => w.class.name)
restart_worker(w, :not_responding)
restart_worker(w, Reason::NOT_RESPONDING)
return false
end

Expand Down
8 changes: 8 additions & 0 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ def enabled_or_running?
!is_stopped? || actually_running?
end

def stopping_for_too_long?
# Note, a 'stopping' worker heartbeats in DRb but NOT to
# the database, so we can see how long it's been
# 'stopping' by checking the last_heartbeat.
stopping_timeout = self.class.worker_settings[:stopping_timeout] || 10.minutes
status == MiqWorker::STATUS_STOPPING && last_heartbeat < stopping_timeout.seconds.ago
end

def validate_active_messages
active_messages.each { |msg| msg.check_for_timeout(_log.prefix) }
end
Expand Down
1 change: 1 addition & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@
:poll_method: :normal
:restart_interval: 0.hours
:starting_timeout: 10.minutes
:stopping_timeout: 10.minutes
:embedded_ansible_worker:
:poll: 10.seconds
:memory_threshold: 0.megabytes
Expand Down
34 changes: 34 additions & 0 deletions spec/models/miq_server/worker_management/monitor_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
describe MiqServer::WorkerManagement::Monitor do
context "#check_not_responding" do
let(:server) { EvmSpecHelper.local_miq_server }
let(:worker) do
FactoryGirl.create(:miq_worker,
:type => "MiqGenericWorker",
:miq_server => server,
:pid => 12345,
:last_heartbeat => 5.minutes.ago)
end

before do
server.setup_drb_variables
server.worker_add(worker.pid)
end

it "destroys an unresponsive 'stopping' worker" do
worker.update(:last_heartbeat => 20.minutes.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers).to be_empty
expect { worker.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "monitors recently heartbeated 'stopping' workers" do
worker.update(:last_heartbeat => 1.minute.ago)
server.stop_worker(worker)
server.check_not_responding
server.reload
expect(server.miq_workers.first.id).to eq(worker.id)
end
end
end
21 changes: 21 additions & 0 deletions spec/models/miq_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,27 @@ def check_has_required_role(worker_role_names, expected_result)
expect(@worker.worker_options).to eq(:guid => @worker.guid)
end

describe "#stopping_for_too_long?" do
subject { @worker.stopping_for_too_long? }

it "false if started" do
@worker.update(:status => described_class::STATUS_STARTED)
expect(subject).to be_falsey
end

it "true if stopping and not heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 30.minutes.ago)
expect(subject).to be_truthy
end

it "false if stopping and heartbeated recently" do
@worker.update(:status => described_class::STATUS_STOPPING,
:last_heartbeat => 1.minute.ago)
expect(subject).to be_falsey
end
end

it "is_current? false when starting" do
@worker.update_attribute(:status, described_class::STATUS_STARTING)
expect(@worker.is_current?).not_to be_truthy
Expand Down