Skip to content

Commit

Permalink
Merge pull request #15612 from jrafanie/track_and_kill_embedded_ansib…
Browse files Browse the repository at this point in the history
…le_monitoring_thread

Track and kill embedded ansible monitoring thread
  • Loading branch information
Fryguy authored Jul 21, 2017
2 parents 7517674 + d493381 commit d6c4949
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
33 changes: 30 additions & 3 deletions app/models/embedded_ansible_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ class EmbeddedAnsibleWorker < MiqWorker
self.required_roles = ['embedded_ansible']

def start_runner
Thread.new do
start_monitor_thread
nil # return no pid
end

def start_monitor_thread
t = Thread.new do
begin
self.class::Runner.start_worker(worker_options)
# TODO: return supervisord pid
Expand All @@ -17,13 +22,35 @@ def start_runner
Thread.exit
end
end
nil # return no pid

t[:worker_class] = self.class.name
t[:worker_id] = id
t
end

def kill
stop
thread = find_worker_thread_object

if thread == Thread.main
_log.warn("Cowardly refusing to kill the main thread.")
elsif thread.nil?
_log.info("The monitor thread for worker id: #{id} was not found, it must have already exited.")
else
_log.info("Exiting monitor thread...")
thread.exit
end
destroy
end

def find_worker_thread_object
Thread.list.detect do |t|
t[:worker_id] == id && t[:worker_class] == self.class.name
end
end

alias terminate kill
alias stop kill

def status_update
# don't monitor the memory/cpu usage of this process yet
# If we don't have a pid of a process we want to monitor,super will catch an Errno::ESRCH and abort the worker
Expand Down
5 changes: 2 additions & 3 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ def find_worker_record

def starting_worker_record
find_worker_record
@worker.pid = Process.pid
@worker.status = "starting"
@worker.started_on = Time.now.utc
@worker.last_heartbeat = Time.now.utc
Expand All @@ -190,7 +189,7 @@ def started_worker_record
@worker.last_heartbeat = Time.now.utc
@worker.update_spid
@worker.save
$log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]")
$log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]")
end

def reload_worker_record
Expand Down Expand Up @@ -293,7 +292,7 @@ def sync_config
sync_log_level
sync_worker_settings
sync_blacklisted_events
_log.info("ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:")
_log.info("ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:")
$log.log_hashes(@worker_settings)
$log.info("---")
$log.log_hashes(@cfg)
Expand Down
50 changes: 50 additions & 0 deletions spec/models/embedded_ansible_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,55 @@
subject.ensure_host(provider, api_connection)
end
end

describe "#start_monitor_thread" do
it "sets worker class and id in thread object" do
allow(Thread).to receive(:new).and_return({})
allow(described_class::Runner).to receive(:start_worker)
thread = subject.start_monitor_thread
expect(thread[:worker_class]).to eq subject.class.name
expect(thread[:worker_id]).to eq subject.id
end
end

describe "#find_worker_thread_object" do
it "returns the the thread matching the class and id of the worker" do
worker1 = {:worker_id => subject.id + 1, :worker_class => "SomeOtherWorker"}
worker2 = {:worker_id => subject.id, :worker_class => subject.class.name}
allow(Thread).to receive(:list).and_return([worker1, worker2])
expect(subject.find_worker_thread_object).to eq(worker2)
end

it "returns nil if nothing matches" do
worker1 = {:worker_id => subject.id + 1, :worker_class => subject.class.name}
worker2 = {:worker_id => subject.id + 2, :worker_class => subject.class.name}
allow(Thread).to receive(:list).and_return([worker1, worker2])
expect(subject.find_worker_thread_object).to be_nil
end
end

%w(kill stop terminate).each do |stop_method|
describe "##{stop_method}" do
it "exits the monitoring thread and destroys the worker row" do
thread_double = double
expect(thread_double).to receive(:exit)
allow(subject).to receive(:find_worker_thread_object).and_return(thread_double)
subject.public_send(stop_method)
expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "destroys the worker row but refuses to kill the main thread" do
allow(subject).to receive(:find_worker_thread_object).and_return(Thread.main)
subject.public_send(stop_method)
expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "destroys the worker row if the monitor thread is not found" do
allow(subject).to receive(:find_worker_thread_object).and_return(nil)
subject.public_send(stop_method)
expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound)
end
end
end
end
end

0 comments on commit d6c4949

Please sign in to comment.