From ce6f0c2a297606190a367466bc59403001cd029f Mon Sep 17 00:00:00 2001 From: Jason Frey Date: Wed, 18 Mar 2020 16:00:08 -0400 Subject: [PATCH] Merge pull request #19956 from carbonin/dont_queue_ansible_runner_stuff Don't queue things that need to run on the same worker container (cherry picked from commit 3b0c1afe659408a2047aad85c1fc85e5280cf671) --- .../providers/ansible_runner_workflow.rb | 91 +++++++++++++------ .../ansible_playbook_workflow_spec.rb | 12 +-- .../providers/ansible_role_workflow_spec.rb | 77 ++++++++++++++-- 3 files changed, 142 insertions(+), 38 deletions(-) diff --git a/app/models/manageiq/providers/ansible_runner_workflow.rb b/app/models/manageiq/providers/ansible_runner_workflow.rb index df60c686c9e..0c036348921 100644 --- a/app/models/manageiq/providers/ansible_runner_workflow.rb +++ b/app/models/manageiq/providers/ansible_runner_workflow.rb @@ -29,7 +29,7 @@ def start def pre_execute verify_options prepare_repository - queue_signal(:execute) + route_signal(:execute) end def launch_runner @@ -40,7 +40,7 @@ def execute response = launch_runner if response.nil? - queue_signal(:abort, "Failed to run ansible #{execution_type}", "error") + route_signal(:abort, "Failed to run ansible #{execution_type}", "error") else context[:ansible_runner_response] = response.dump @@ -48,34 +48,12 @@ def execute update!(:context => context, :started_on => started_on) miq_task.update!(:started_on => started_on) - queue_signal(:poll_runner) + route_signal(:poll_runner) end end def poll_runner - response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) - if response.running? - if started_on + options[:timeout] < Time.now.utc - response.stop - - queue_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") - else - queue_signal(:poll_runner, :deliver_on => deliver_on) - end - else - result = response.response - - context[:ansible_runner_return_code] = result.return_code - context[:ansible_runner_stdout] = result.parsed_stdout - - if result.return_code != 0 - set_status("ansible #{execution_type} failed", "error") - _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") - else - set_status("ansible #{execution_type} completed with no errors", "ok") - end - queue_signal(:post_execute) - end + MiqEnvironment::Command.is_podified? ? wait_for_runner_process : wait_for_runner_process_async end def post_execute @@ -91,6 +69,16 @@ def post_execute protected + # Continue in the current process if we're running in pods, or queue the message for the next worker otherwise + # We can't queue in pods as jobs of this type depend on filesystem state + def route_signal(*args, deliver_on: nil) + if MiqEnvironment::Command.is_podified? + signal(*args) + else + queue_signal(*args, :deliver_on => deliver_on) + end + end + def queue_signal(*args, deliver_on: nil) role = options[:role] || "ems_operations" priority = options[:priority] || MiqQueue::NORMAL_PRIORITY @@ -121,6 +109,57 @@ def load_transitions private + def wait_for_runner_process + monitor = runner_monitor + + # If we're running in pods loop so we don't exhaust the stack limit in very long jobs + loop do + break unless monitor.running? + return handle_runner_timeout(monitor) if job_timeout_exceeded? + + sleep options[:poll_interval] + end + + process_runner_result(monitor.response) + end + + def wait_for_runner_process_async + monitor = runner_monitor + + if monitor.running? + return handle_runner_timeout(monitor) if job_timeout_exceeded? + queue_signal(:poll_runner, :deliver_on => deliver_on) + else + process_runner_result(monitor.response) + end + end + + def process_runner_result(result) + context[:ansible_runner_return_code] = result.return_code + context[:ansible_runner_stdout] = result.parsed_stdout + + if result.return_code != 0 + set_status("ansible #{execution_type} failed", "error") + _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") + else + set_status("ansible #{execution_type} completed with no errors", "ok") + end + route_signal(:post_execute) + end + + def handle_runner_timeout(monitor) + monitor.stop + route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") + end + + def job_timeout_exceeded? + started_on + options[:timeout] < Time.now.utc + end + + def runner_monitor + Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) + end + def verify_options raise NotImplementedError, "must be implemented in a subclass" end diff --git a/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb b/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb index 7d46dcec10e..6fddd93bb25 100644 --- a/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb +++ b/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb @@ -109,7 +109,7 @@ context "with playbook_path" do it "succeeds" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -122,7 +122,7 @@ it "will checkout the git repository to a temp dir before proceeding" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -181,7 +181,7 @@ ] expect(Ansible::Runner).to receive(:run_async).with(*runner_options).and_return(response_async) - expect(job).to receive(:queue_signal).with(:poll_runner) + expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil) job.signal(:execute) @@ -190,7 +190,7 @@ it "ansible-runner fails" do expect(Ansible::Runner).to receive(:run_async).and_return(nil) - expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error") + expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error", :deliver_on => nil) job.signal(:execute) end @@ -213,7 +213,7 @@ response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) expect(response_async).to receive(:response).and_return(response) - expect(job).to receive(:queue_signal).with(:post_execute) + expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil) job.signal(:poll_runner) end @@ -233,7 +233,7 @@ Timecop.travel(time) do expect(response_async).to receive(:running?).and_return(true) expect(response_async).to receive(:stop) - expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error") + expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error", :deliver_on => nil) job.signal(:poll_runner) end diff --git a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb index 9203750bd42..acd942bfc8f 100644 --- a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb +++ b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb @@ -109,7 +109,7 @@ context "with roles_path" do it "succeeds" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -122,13 +122,21 @@ it "will checkout the git repository to a temp dir before proceeding" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) expect(job.options[:roles_path]).to start_with File.join(Dir.tmpdir, "ansible-runner-git") expect(job.options[:roles_path]).to end_with roles_relative_path end + + it "doesn't queue the next state when running in pods" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) + expect(job).to receive(:signal).with(:execute) + + job.pre_execute + end end context "without role_name" do @@ -179,19 +187,37 @@ ] expect(Ansible::Runner).to receive(:run_role_async).with(*runner_options).and_return(response_async) - expect(job).to receive(:queue_signal).with(:poll_runner) + expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil) job.signal(:execute) expect(job.context[:ansible_runner_response]).to eq(response_async.dump) end + it "doesn't queue the next state when running in pods with a success response" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + response_async = Ansible::Runner::ResponseAsync.new(:base_dir => "/path/to/results") + + expect(Ansible::Runner).to receive(:run_role_async).and_return(response_async) + expect(job).to receive(:signal).with(:poll_runner) + + job.execute + end + it "ansible-runner fails" do expect(Ansible::Runner).to receive(:run_role_async).and_return(nil) - expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error") + expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error", :deliver_on => nil) job.signal(:execute) end + + it "doesn't queue the next state when running in pods with a failure response" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect(Ansible::Runner).to receive(:run_role_async).and_return(nil) + expect(job).to receive(:signal).with(:abort, "Failed to run ansible role", "error") + + job.execute + end end context "#current_job_timeout" do @@ -217,11 +243,22 @@ response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) expect(response_async).to receive(:response).and_return(response) - expect(job).to receive(:queue_signal).with(:post_execute) + expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil) job.signal(:poll_runner) end + it "doesn't queue the next state when running in pods and ansible-runner completed " do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect(response_async).to receive(:running?).and_return(false) + + response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) + expect(response_async).to receive(:response).and_return(response) + expect(job).to receive(:signal).with(:post_execute) + + job.poll_runner + end + it "ansible-runner still running" do now = Time.now.utc allow(Time).to receive(:now).and_return(now) @@ -231,18 +268,46 @@ job.signal(:poll_runner) end + it "if ansible-runner still runningin pods it loops until the job is done" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect(response_async).to receive(:running?).and_return(true, false) + + # First loop, the job is still running so we sleep for the poll interval + expect(job).to receive(:sleep).with(1) + + # Second loop we get a response and signal the post_execute state + response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) + expect(response_async).to receive(:response).and_return(response) + expect(job).to receive(:signal).with(:post_execute) + + job.poll_runner + end + it "fails if the role has been running too long" do time = job.started_on + job.options[:timeout] + 5.minutes Timecop.travel(time) do expect(response_async).to receive(:running?).and_return(true) expect(response_async).to receive(:stop) - expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error") + expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error", :deliver_on => nil) job.signal(:poll_runner) end end + it "Doesn't queue abort state when the role times out and running in pods" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + time = job.started_on + job.options[:timeout] + 5.minutes + + Timecop.travel(time) do + expect(response_async).to receive(:running?).and_return(true) + expect(response_async).to receive(:stop) + expect(job).to receive(:signal).with(:abort, "ansible role has been running longer than timeout", "error") + + job.poll_runner + end + end + context "deliver_on" do let(:options) { [{"ENV" => "VAR"}, {"arg1" => "val1"}, {:roles_path => "/path/to/role"}, :poll_interval => 5.minutes] }