diff --git a/app/models/embedded_ansible_worker.rb b/app/models/embedded_ansible_worker.rb index 05b49e7c924..f9f052006d8 100644 --- a/app/models/embedded_ansible_worker.rb +++ b/app/models/embedded_ansible_worker.rb @@ -5,7 +5,12 @@ class EmbeddedAnsibleWorker < MiqWorker self.required_roles = ['embedded_ansible'] def start_runner - Thread.new do + start_monitor_thread + nil # return no pid + end + + def start_monitor_thread + t = Thread.new do begin self.class::Runner.start_worker(worker_options) # TODO: return supervisord pid @@ -17,13 +22,35 @@ def start_runner Thread.exit end end - nil # return no pid + + t[:worker_class] = self.class.name + t[:worker_id] = id + t end def kill - stop + thread = find_worker_thread_object + + if thread == Thread.main + _log.warn("Cowardly refusing to kill the main thread.") + elsif thread.nil? + _log.info("The monitor thread for worker id: #{id} was not found, it must have already exited.") + else + _log.info("Exiting monitor thread...") + thread.exit + end + destroy end + def find_worker_thread_object + Thread.list.detect do |t| + t[:worker_id] == id && t[:worker_class] == self.class.name + end + end + + alias terminate kill + alias stop kill + def status_update # don't monitor the memory/cpu usage of this process yet # If we don't have a pid of a process we want to monitor,super will catch an Errno::ESRCH and abort the worker diff --git a/app/models/miq_worker/runner.rb b/app/models/miq_worker/runner.rb index 36971c80754..a90cdd7f1a4 100644 --- a/app/models/miq_worker/runner.rb +++ b/app/models/miq_worker/runner.rb @@ -176,7 +176,6 @@ def find_worker_record def starting_worker_record find_worker_record - @worker.pid = Process.pid @worker.status = "starting" @worker.started_on = Time.now.utc @worker.last_heartbeat = Time.now.utc @@ -190,7 +189,7 @@ def started_worker_record @worker.last_heartbeat = Time.now.utc @worker.update_spid @worker.save - $log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]") + $log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]") end def reload_worker_record @@ -293,7 +292,7 @@ def sync_config sync_log_level sync_worker_settings sync_blacklisted_events - _log.info("ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:") + _log.info("ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:") $log.log_hashes(@worker_settings) $log.info("---") $log.log_hashes(@cfg) diff --git a/spec/models/embedded_ansible_worker_spec.rb b/spec/models/embedded_ansible_worker_spec.rb index d28bf87cdd2..08cf3b9853e 100644 --- a/spec/models/embedded_ansible_worker_spec.rb +++ b/spec/models/embedded_ansible_worker_spec.rb @@ -134,5 +134,55 @@ subject.ensure_host(provider, api_connection) end end + + describe "#start_monitor_thread" do + it "sets worker class and id in thread object" do + allow(Thread).to receive(:new).and_return({}) + allow(described_class::Runner).to receive(:start_worker) + thread = subject.start_monitor_thread + expect(thread[:worker_class]).to eq subject.class.name + expect(thread[:worker_id]).to eq subject.id + end + end + + describe "#find_worker_thread_object" do + it "returns the the thread matching the class and id of the worker" do + worker1 = {:worker_id => subject.id + 1, :worker_class => "SomeOtherWorker"} + worker2 = {:worker_id => subject.id, :worker_class => subject.class.name} + allow(Thread).to receive(:list).and_return([worker1, worker2]) + expect(subject.find_worker_thread_object).to eq(worker2) + end + + it "returns nil if nothing matches" do + worker1 = {:worker_id => subject.id + 1, :worker_class => subject.class.name} + worker2 = {:worker_id => subject.id + 2, :worker_class => subject.class.name} + allow(Thread).to receive(:list).and_return([worker1, worker2]) + expect(subject.find_worker_thread_object).to be_nil + end + end + + %w(kill stop terminate).each do |stop_method| + describe "##{stop_method}" do + it "exits the monitoring thread and destroys the worker row" do + thread_double = double + expect(thread_double).to receive(:exit) + allow(subject).to receive(:find_worker_thread_object).and_return(thread_double) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + it "destroys the worker row but refuses to kill the main thread" do + allow(subject).to receive(:find_worker_thread_object).and_return(Thread.main) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + it "destroys the worker row if the monitor thread is not found" do + allow(subject).to receive(:find_worker_thread_object).and_return(nil) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + end + end end end