From 63816f42aa17f7a6ce4d0ed324448da913f78831 Mon Sep 17 00:00:00 2001 From: Joe Rafaniello Date: Wed, 19 Jul 2017 16:36:56 -0400 Subject: [PATCH 1/2] Delegate the pid knowledge to the worker row. Each worker can implement their own way to invoke processes so let them choose and persist their pid value, don't assume we can use Process.pid. https://bugzilla.redhat.com/show_bug.cgi?id=1469307 https://bugzilla.redhat.com/show_bug.cgi?id=1468898 --- app/models/miq_worker/runner.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/app/models/miq_worker/runner.rb b/app/models/miq_worker/runner.rb index 36971c80754..a90cdd7f1a4 100644 --- a/app/models/miq_worker/runner.rb +++ b/app/models/miq_worker/runner.rb @@ -176,7 +176,6 @@ def find_worker_record def starting_worker_record find_worker_record - @worker.pid = Process.pid @worker.status = "starting" @worker.started_on = Time.now.utc @worker.last_heartbeat = Time.now.utc @@ -190,7 +189,7 @@ def started_worker_record @worker.last_heartbeat = Time.now.utc @worker.update_spid @worker.save - $log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]") + $log.info("#{self.class.name} started. ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{MiqServer.my_zone}], Role [#{MiqServer.my_role}]") end def reload_worker_record @@ -293,7 +292,7 @@ def sync_config sync_log_level sync_worker_settings sync_blacklisted_events - _log.info("ID [#{@worker.id}], PID [#{Process.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:") + _log.info("ID [#{@worker.id}], PID [#{@worker.pid}], GUID [#{@worker.guid}], Zone [#{@my_zone}], Active Roles [#{@active_roles.join(',')}], Assigned Roles [#{MiqServer.my_role}], Configuration:") $log.log_hashes(@worker_settings) $log.info("---") $log.log_hashes(@cfg) From d49338133fe451db9d46df0252ed5c4f3b9825b8 Mon Sep 17 00:00:00 2001 From: Joe Rafaniello Date: Wed, 19 Jul 2017 16:38:48 -0400 Subject: [PATCH 2/2] Tag the monitor thread with the worker info so we can kill it later When the server starts the monitor thread, store the worker class and id in the thread object so the server can then kill that thread if required later. Implement stop/kill/terminate in the same way: look for the thread containing the worker's class and id in Thread.list, exit it, and destroy the worker row. https://bugzilla.redhat.com/show_bug.cgi?id=1469307 https://bugzilla.redhat.com/show_bug.cgi?id=1468898 --- app/models/embedded_ansible_worker.rb | 33 ++++++++++++-- spec/models/embedded_ansible_worker_spec.rb | 50 +++++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/app/models/embedded_ansible_worker.rb b/app/models/embedded_ansible_worker.rb index 05b49e7c924..f9f052006d8 100644 --- a/app/models/embedded_ansible_worker.rb +++ b/app/models/embedded_ansible_worker.rb @@ -5,7 +5,12 @@ class EmbeddedAnsibleWorker < MiqWorker self.required_roles = ['embedded_ansible'] def start_runner - Thread.new do + start_monitor_thread + nil # return no pid + end + + def start_monitor_thread + t = Thread.new do begin self.class::Runner.start_worker(worker_options) # TODO: return supervisord pid @@ -17,13 +22,35 @@ def start_runner Thread.exit end end - nil # return no pid + + t[:worker_class] = self.class.name + t[:worker_id] = id + t end def kill - stop + thread = find_worker_thread_object + + if thread == Thread.main + _log.warn("Cowardly refusing to kill the main thread.") + elsif thread.nil? + _log.info("The monitor thread for worker id: #{id} was not found, it must have already exited.") + else + _log.info("Exiting monitor thread...") + thread.exit + end + destroy end + def find_worker_thread_object + Thread.list.detect do |t| + t[:worker_id] == id && t[:worker_class] == self.class.name + end + end + + alias terminate kill + alias stop kill + def status_update # don't monitor the memory/cpu usage of this process yet # If we don't have a pid of a process we want to monitor,super will catch an Errno::ESRCH and abort the worker diff --git a/spec/models/embedded_ansible_worker_spec.rb b/spec/models/embedded_ansible_worker_spec.rb index d28bf87cdd2..08cf3b9853e 100644 --- a/spec/models/embedded_ansible_worker_spec.rb +++ b/spec/models/embedded_ansible_worker_spec.rb @@ -134,5 +134,55 @@ subject.ensure_host(provider, api_connection) end end + + describe "#start_monitor_thread" do + it "sets worker class and id in thread object" do + allow(Thread).to receive(:new).and_return({}) + allow(described_class::Runner).to receive(:start_worker) + thread = subject.start_monitor_thread + expect(thread[:worker_class]).to eq subject.class.name + expect(thread[:worker_id]).to eq subject.id + end + end + + describe "#find_worker_thread_object" do + it "returns the the thread matching the class and id of the worker" do + worker1 = {:worker_id => subject.id + 1, :worker_class => "SomeOtherWorker"} + worker2 = {:worker_id => subject.id, :worker_class => subject.class.name} + allow(Thread).to receive(:list).and_return([worker1, worker2]) + expect(subject.find_worker_thread_object).to eq(worker2) + end + + it "returns nil if nothing matches" do + worker1 = {:worker_id => subject.id + 1, :worker_class => subject.class.name} + worker2 = {:worker_id => subject.id + 2, :worker_class => subject.class.name} + allow(Thread).to receive(:list).and_return([worker1, worker2]) + expect(subject.find_worker_thread_object).to be_nil + end + end + + %w(kill stop terminate).each do |stop_method| + describe "##{stop_method}" do + it "exits the monitoring thread and destroys the worker row" do + thread_double = double + expect(thread_double).to receive(:exit) + allow(subject).to receive(:find_worker_thread_object).and_return(thread_double) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + it "destroys the worker row but refuses to kill the main thread" do + allow(subject).to receive(:find_worker_thread_object).and_return(Thread.main) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + + it "destroys the worker row if the monitor thread is not found" do + allow(subject).to receive(:find_worker_thread_object).and_return(nil) + subject.public_send(stop_method) + expect { subject.reload }.to raise_error(ActiveRecord::RecordNotFound) + end + end + end end end