From 06b74657badf35f0ffef9d392e08483ebea41cec Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Mon, 16 Mar 2020 14:55:46 -0400 Subject: [PATCH 1/5] Only queue ansible runner ops when not in pods This commit adds an intermediate method (#route_signal) to determine if a call should be queued or not. When running in containers, each generic worker is a separate container so we can't queue anything between checking out the playbook repository and cleaning it up. If we do, it might end up executing on a container that doesn't have the repo checked out or isn't running the ansible runner process. We want to continue queueing these operations on appliances as the previous reasoning doesn't apply (we will always queue for a worker on the same server) and we still need to handle ansible playbooks that might run longer than the timeout for a single queue message. For now these long-running playbooks won't have a solution on pods, but shorter ones will work. --- .../providers/ansible_runner_workflow.rb | 21 ++++-- .../providers/ansible_role_workflow_spec.rb | 73 +++++++++++++++++-- 2 files changed, 82 insertions(+), 12 deletions(-) diff --git a/app/models/manageiq/providers/ansible_runner_workflow.rb b/app/models/manageiq/providers/ansible_runner_workflow.rb index df60c686c9e..cd5ae773a34 100644 --- a/app/models/manageiq/providers/ansible_runner_workflow.rb +++ b/app/models/manageiq/providers/ansible_runner_workflow.rb @@ -29,7 +29,7 @@ def start def pre_execute verify_options prepare_repository - queue_signal(:execute) + route_signal(:execute) end def launch_runner @@ -40,7 +40,7 @@ def execute response = launch_runner if response.nil? - queue_signal(:abort, "Failed to run ansible #{execution_type}", "error") + route_signal(:abort, "Failed to run ansible #{execution_type}", "error") else context[:ansible_runner_response] = response.dump @@ -48,7 +48,7 @@ def execute update!(:context => context, :started_on => started_on) miq_task.update!(:started_on => started_on) - queue_signal(:poll_runner) + route_signal(:poll_runner) end end @@ -58,9 +58,9 @@ def poll_runner if started_on + options[:timeout] < Time.now.utc response.stop - queue_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") + route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") else - queue_signal(:poll_runner, :deliver_on => deliver_on) + route_signal(:poll_runner, :deliver_on => deliver_on) end else result = response.response @@ -74,7 +74,7 @@ def poll_runner else set_status("ansible #{execution_type} completed with no errors", "ok") end - queue_signal(:post_execute) + route_signal(:post_execute) end end @@ -91,6 +91,15 @@ def post_execute protected + def route_signal(*args, deliver_on: nil) + if MiqEnvironment::Command.is_podified? + sleep(deliver_on - Time.now.utc) if deliver_on + signal(*args) + else + queue_signal(*args, :deliver_on => deliver_on) + end + end + def queue_signal(*args, deliver_on: nil) role = options[:role] || "ems_operations" priority = options[:priority] || MiqQueue::NORMAL_PRIORITY diff --git a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb index 9203750bd42..7b53d87e5ad 100644 --- a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb +++ b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb @@ -109,7 +109,7 @@ context "with roles_path" do it "succeeds" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -122,13 +122,21 @@ it "will checkout the git repository to a temp dir before proceeding" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) expect(job.options[:roles_path]).to start_with File.join(Dir.tmpdir, "ansible-runner-git") expect(job.options[:roles_path]).to end_with roles_relative_path end + + it "doesn't queue the next state when running in pods" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) + expect(job).to receive(:signal).with(:execute) + + job.pre_execute + end end context "without role_name" do @@ -179,19 +187,37 @@ ] expect(Ansible::Runner).to receive(:run_role_async).with(*runner_options).and_return(response_async) - expect(job).to receive(:queue_signal).with(:poll_runner) + expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil) job.signal(:execute) expect(job.context[:ansible_runner_response]).to eq(response_async.dump) end + it "doesn't queue the next state when running in pods with a success response" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + response_async = Ansible::Runner::ResponseAsync.new(:base_dir => "/path/to/results") + + expect(Ansible::Runner).to receive(:run_role_async).and_return(response_async) + expect(job).to receive(:signal).with(:poll_runner) + + job.execute + end + it "ansible-runner fails" do expect(Ansible::Runner).to receive(:run_role_async).and_return(nil) - expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error") + expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error", :deliver_on => nil) job.signal(:execute) end + + it "doesn't queue the next state when running in pods with a failure response" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect(Ansible::Runner).to receive(:run_role_async).and_return(nil) + expect(job).to receive(:signal).with(:abort, "Failed to run ansible role", "error") + + job.execute + end end context "#current_job_timeout" do @@ -217,11 +243,22 @@ response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) expect(response_async).to receive(:response).and_return(response) - expect(job).to receive(:queue_signal).with(:post_execute) + expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil) job.signal(:poll_runner) end + it "doesn't queue the next state when running in pods and ansible-runner completed " do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + expect(response_async).to receive(:running?).and_return(false) + + response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) + expect(response_async).to receive(:response).and_return(response) + expect(job).to receive(:signal).with(:post_execute) + + job.poll_runner + end + it "ansible-runner still running" do now = Time.now.utc allow(Time).to receive(:now).and_return(now) @@ -231,18 +268,42 @@ job.signal(:poll_runner) end + it "doesn't queue the next state when running in pods and ansible-runner still running" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + now = Time.now.utc + allow(Time).to receive(:now).and_return(now) + expect(response_async).to receive(:running?).and_return(true) + expect(job).to receive(:sleep).with(1) + expect(job).to receive(:signal).with(:poll_runner) + + job.poll_runner + end + it "fails if the role has been running too long" do time = job.started_on + job.options[:timeout] + 5.minutes Timecop.travel(time) do expect(response_async).to receive(:running?).and_return(true) expect(response_async).to receive(:stop) - expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error") + expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error", :deliver_on => nil) job.signal(:poll_runner) end end + it "Doesn't queue abort state when the role times out and running in pods" do + allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) + time = job.started_on + job.options[:timeout] + 5.minutes + + Timecop.travel(time) do + expect(response_async).to receive(:running?).and_return(true) + expect(response_async).to receive(:stop) + expect(job).to receive(:signal).with(:abort, "ansible role has been running longer than timeout", "error") + + job.poll_runner + end + end + context "deliver_on" do let(:options) { [{"ENV" => "VAR"}, {"arg1" => "val1"}, {:roles_path => "/path/to/role"}, :poll_interval => 5.minutes] } From ae011bc35a895aa85f24e472e4678b45a704f8cc Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 17 Mar 2020 10:22:15 -0400 Subject: [PATCH 2/5] Fix ansible playbook workflow specs --- .../providers/ansible_playbook_workflow_spec.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb b/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb index 7d46dcec10e..6fddd93bb25 100644 --- a/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb +++ b/spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb @@ -109,7 +109,7 @@ context "with playbook_path" do it "succeeds" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -122,7 +122,7 @@ it "will checkout the git repository to a temp dir before proceeding" do expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository) - expect(job).to receive(:queue_signal).with(:execute) + expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil) job.signal(:pre_execute) @@ -181,7 +181,7 @@ ] expect(Ansible::Runner).to receive(:run_async).with(*runner_options).and_return(response_async) - expect(job).to receive(:queue_signal).with(:poll_runner) + expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil) job.signal(:execute) @@ -190,7 +190,7 @@ it "ansible-runner fails" do expect(Ansible::Runner).to receive(:run_async).and_return(nil) - expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error") + expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error", :deliver_on => nil) job.signal(:execute) end @@ -213,7 +213,7 @@ response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) expect(response_async).to receive(:response).and_return(response) - expect(job).to receive(:queue_signal).with(:post_execute) + expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil) job.signal(:poll_runner) end @@ -233,7 +233,7 @@ Timecop.travel(time) do expect(response_async).to receive(:running?).and_return(true) expect(response_async).to receive(:stop) - expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error") + expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error", :deliver_on => nil) job.signal(:poll_runner) end From b82fc875a1a10f2356e849d330d8e1efa32d2707 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Tue, 17 Mar 2020 14:47:24 -0400 Subject: [PATCH 3/5] Don't recurse into poll_runner when in pods, loop instead For a sufficiently long-running job, we could exceed the stack size threshold, so this commit implements a loop in poll_runner that is only used when we're running in pods and waiting for the ansible runner process to finish. --- .../providers/ansible_runner_workflow.rb | 53 ++++++++++++------- .../providers/ansible_role_workflow_spec.rb | 14 +++-- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/app/models/manageiq/providers/ansible_runner_workflow.rb b/app/models/manageiq/providers/ansible_runner_workflow.rb index cd5ae773a34..3af2c584f88 100644 --- a/app/models/manageiq/providers/ansible_runner_workflow.rb +++ b/app/models/manageiq/providers/ansible_runner_workflow.rb @@ -53,28 +53,40 @@ def execute end def poll_runner - response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) - if response.running? - if started_on + options[:timeout] < Time.now.utc - response.stop - - route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") + loop do + response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) + if response.running? + if started_on + options[:timeout] < Time.now.utc + response.stop + + route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") + else + if MiqEnvironment::Command.is_podified? + # If we're running in pods loop so we don't exhaust the stack limit in very long jobs + sleep options[:poll_interval] + next + else + queue_signal(:poll_runner, :deliver_on => deliver_on) + end + end else - route_signal(:poll_runner, :deliver_on => deliver_on) + result = response.response + + context[:ansible_runner_return_code] = result.return_code + context[:ansible_runner_stdout] = result.parsed_stdout + + if result.return_code != 0 + set_status("ansible #{execution_type} failed", "error") + _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") + else + set_status("ansible #{execution_type} completed with no errors", "ok") + end + route_signal(:post_execute) end - else - result = response.response - - context[:ansible_runner_return_code] = result.return_code - context[:ansible_runner_stdout] = result.parsed_stdout - if result.return_code != 0 - set_status("ansible #{execution_type} failed", "error") - _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") - else - set_status("ansible #{execution_type} completed with no errors", "ok") - end - route_signal(:post_execute) + # Break out of the loop when we've either queued a message + # or, if we're running in pods, the job has finished + break end end @@ -91,9 +103,10 @@ def post_execute protected + # Continue in the current process if we're running in pods, or queue the message for the next worker otherwise + # We can't queue in pods as jobs of this type depend on filesystem state def route_signal(*args, deliver_on: nil) if MiqEnvironment::Command.is_podified? - sleep(deliver_on - Time.now.utc) if deliver_on signal(*args) else queue_signal(*args, :deliver_on => deliver_on) diff --git a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb index 7b53d87e5ad..acd942bfc8f 100644 --- a/spec/models/manageiq/providers/ansible_role_workflow_spec.rb +++ b/spec/models/manageiq/providers/ansible_role_workflow_spec.rb @@ -268,13 +268,17 @@ job.signal(:poll_runner) end - it "doesn't queue the next state when running in pods and ansible-runner still running" do + it "if ansible-runner still runningin pods it loops until the job is done" do allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true) - now = Time.now.utc - allow(Time).to receive(:now).and_return(now) - expect(response_async).to receive(:running?).and_return(true) + expect(response_async).to receive(:running?).and_return(true, false) + + # First loop, the job is still running so we sleep for the poll interval expect(job).to receive(:sleep).with(1) - expect(job).to receive(:signal).with(:poll_runner) + + # Second loop we get a response and signal the post_execute state + response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0)) + expect(response_async).to receive(:response).and_return(response) + expect(job).to receive(:signal).with(:post_execute) job.poll_runner end From 4949950fc88cd19f1045a3d8b35863b01f1e9e49 Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Wed, 18 Mar 2020 13:35:46 -0400 Subject: [PATCH 4/5] Create a method for loading the runner async result This also renames the local variable from response to monitor becasue this is really the object that is responsible for checking on the runner process. Also `result = response.response` makes me cringe --- .../manageiq/providers/ansible_runner_workflow.rb | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/app/models/manageiq/providers/ansible_runner_workflow.rb b/app/models/manageiq/providers/ansible_runner_workflow.rb index 3af2c584f88..302ff9fb3ef 100644 --- a/app/models/manageiq/providers/ansible_runner_workflow.rb +++ b/app/models/manageiq/providers/ansible_runner_workflow.rb @@ -54,10 +54,10 @@ def execute def poll_runner loop do - response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) - if response.running? + monitor = runner_monitor + if monitor.running? if started_on + options[:timeout] < Time.now.utc - response.stop + monitor.stop route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") else @@ -70,7 +70,7 @@ def poll_runner end end else - result = response.response + result = monitor.response context[:ansible_runner_return_code] = result.return_code context[:ansible_runner_stdout] = result.parsed_stdout @@ -143,6 +143,10 @@ def load_transitions private + def runner_monitor + Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) + end + def verify_options raise NotImplementedError, "must be implemented in a subclass" end From d0135bb676fe535b481e7b7150f0646af2b3178f Mon Sep 17 00:00:00 2001 From: Nick Carboni Date: Wed, 18 Mar 2020 14:08:40 -0400 Subject: [PATCH 5/5] Cleanup AnsibleRunnerWorkflow#poll_runner method This breaks up the #poll_runner method into smaller, more easily comprehensible parts, and specifically only implements a loop in the pods-specific method. --- .../providers/ansible_runner_workflow.rb | 83 +++++++++++-------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/app/models/manageiq/providers/ansible_runner_workflow.rb b/app/models/manageiq/providers/ansible_runner_workflow.rb index 302ff9fb3ef..0c036348921 100644 --- a/app/models/manageiq/providers/ansible_runner_workflow.rb +++ b/app/models/manageiq/providers/ansible_runner_workflow.rb @@ -53,41 +53,7 @@ def execute end def poll_runner - loop do - monitor = runner_monitor - if monitor.running? - if started_on + options[:timeout] < Time.now.utc - monitor.stop - - route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") - else - if MiqEnvironment::Command.is_podified? - # If we're running in pods loop so we don't exhaust the stack limit in very long jobs - sleep options[:poll_interval] - next - else - queue_signal(:poll_runner, :deliver_on => deliver_on) - end - end - else - result = monitor.response - - context[:ansible_runner_return_code] = result.return_code - context[:ansible_runner_stdout] = result.parsed_stdout - - if result.return_code != 0 - set_status("ansible #{execution_type} failed", "error") - _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") - else - set_status("ansible #{execution_type} completed with no errors", "ok") - end - route_signal(:post_execute) - end - - # Break out of the loop when we've either queued a message - # or, if we're running in pods, the job has finished - break - end + MiqEnvironment::Command.is_podified? ? wait_for_runner_process : wait_for_runner_process_async end def post_execute @@ -143,6 +109,53 @@ def load_transitions private + def wait_for_runner_process + monitor = runner_monitor + + # If we're running in pods loop so we don't exhaust the stack limit in very long jobs + loop do + break unless monitor.running? + return handle_runner_timeout(monitor) if job_timeout_exceeded? + + sleep options[:poll_interval] + end + + process_runner_result(monitor.response) + end + + def wait_for_runner_process_async + monitor = runner_monitor + + if monitor.running? + return handle_runner_timeout(monitor) if job_timeout_exceeded? + queue_signal(:poll_runner, :deliver_on => deliver_on) + else + process_runner_result(monitor.response) + end + end + + def process_runner_result(result) + context[:ansible_runner_return_code] = result.return_code + context[:ansible_runner_stdout] = result.parsed_stdout + + if result.return_code != 0 + set_status("ansible #{execution_type} failed", "error") + _log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}") + else + set_status("ansible #{execution_type} completed with no errors", "ok") + end + route_signal(:post_execute) + end + + def handle_runner_timeout(monitor) + monitor.stop + route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error") + end + + def job_timeout_exceeded? + started_on + options[:timeout] < Time.now.utc + end + def runner_monitor Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response]) end