Skip to content

Commit

Permalink
Merge pull request #19956 from carbonin/dont_queue_ansible_runner_stuff
Browse files Browse the repository at this point in the history
Don't queue things that need to run on the same worker container

(cherry picked from commit 3b0c1af)
  • Loading branch information
Fryguy authored and simaishi committed Mar 20, 2020
1 parent 3637482 commit ce6f0c2
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 38 deletions.
91 changes: 65 additions & 26 deletions app/models/manageiq/providers/ansible_runner_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def start
def pre_execute
verify_options
prepare_repository
queue_signal(:execute)
route_signal(:execute)
end

def launch_runner
Expand All @@ -40,42 +40,20 @@ def execute
response = launch_runner

if response.nil?
queue_signal(:abort, "Failed to run ansible #{execution_type}", "error")
route_signal(:abort, "Failed to run ansible #{execution_type}", "error")
else
context[:ansible_runner_response] = response.dump

started_on = Time.now.utc
update!(:context => context, :started_on => started_on)
miq_task.update!(:started_on => started_on)

queue_signal(:poll_runner)
route_signal(:poll_runner)
end
end

def poll_runner
response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response])
if response.running?
if started_on + options[:timeout] < Time.now.utc
response.stop

queue_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error")
else
queue_signal(:poll_runner, :deliver_on => deliver_on)
end
else
result = response.response

context[:ansible_runner_return_code] = result.return_code
context[:ansible_runner_stdout] = result.parsed_stdout

if result.return_code != 0
set_status("ansible #{execution_type} failed", "error")
_log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}")
else
set_status("ansible #{execution_type} completed with no errors", "ok")
end
queue_signal(:post_execute)
end
MiqEnvironment::Command.is_podified? ? wait_for_runner_process : wait_for_runner_process_async
end

def post_execute
Expand All @@ -91,6 +69,16 @@ def post_execute

protected

# Continue in the current process if we're running in pods, or queue the message for the next worker otherwise
# We can't queue in pods as jobs of this type depend on filesystem state
def route_signal(*args, deliver_on: nil)
if MiqEnvironment::Command.is_podified?
signal(*args)
else
queue_signal(*args, :deliver_on => deliver_on)
end
end

def queue_signal(*args, deliver_on: nil)
role = options[:role] || "ems_operations"
priority = options[:priority] || MiqQueue::NORMAL_PRIORITY
Expand Down Expand Up @@ -121,6 +109,57 @@ def load_transitions

private

def wait_for_runner_process
monitor = runner_monitor

# If we're running in pods loop so we don't exhaust the stack limit in very long jobs
loop do
break unless monitor.running?
return handle_runner_timeout(monitor) if job_timeout_exceeded?

sleep options[:poll_interval]
end

process_runner_result(monitor.response)
end

def wait_for_runner_process_async
monitor = runner_monitor

if monitor.running?
return handle_runner_timeout(monitor) if job_timeout_exceeded?
queue_signal(:poll_runner, :deliver_on => deliver_on)
else
process_runner_result(monitor.response)
end
end

def process_runner_result(result)
context[:ansible_runner_return_code] = result.return_code
context[:ansible_runner_stdout] = result.parsed_stdout

if result.return_code != 0
set_status("ansible #{execution_type} failed", "error")
_log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}")
else
set_status("ansible #{execution_type} completed with no errors", "ok")
end
route_signal(:post_execute)
end

def handle_runner_timeout(monitor)
monitor.stop
route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error")
end

def job_timeout_exceeded?
started_on + options[:timeout] < Time.now.utc
end

def runner_monitor
Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response])
end

def verify_options
raise NotImplementedError, "must be implemented in a subclass"
end
Expand Down
12 changes: 6 additions & 6 deletions spec/models/manageiq/providers/ansible_playbook_workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
context "with playbook_path" do
it "succeeds" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand All @@ -122,7 +122,7 @@

it "will checkout the git repository to a temp dir before proceeding" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand Down Expand Up @@ -181,7 +181,7 @@
]

expect(Ansible::Runner).to receive(:run_async).with(*runner_options).and_return(response_async)
expect(job).to receive(:queue_signal).with(:poll_runner)
expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil)

job.signal(:execute)

Expand All @@ -190,7 +190,7 @@

it "ansible-runner fails" do
expect(Ansible::Runner).to receive(:run_async).and_return(nil)
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error")
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error", :deliver_on => nil)

job.signal(:execute)
end
Expand All @@ -213,7 +213,7 @@

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:queue_signal).with(:post_execute)
expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil)

job.signal(:poll_runner)
end
Expand All @@ -233,7 +233,7 @@
Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error")
expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error", :deliver_on => nil)

job.signal(:poll_runner)
end
Expand Down
77 changes: 71 additions & 6 deletions spec/models/manageiq/providers/ansible_role_workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
context "with roles_path" do
it "succeeds" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand All @@ -122,13 +122,21 @@

it "will checkout the git repository to a temp dir before proceeding" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

expect(job.options[:roles_path]).to start_with File.join(Dir.tmpdir, "ansible-runner-git")
expect(job.options[:roles_path]).to end_with roles_relative_path
end

it "doesn't queue the next state when running in pods" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:signal).with(:execute)

job.pre_execute
end
end

context "without role_name" do
Expand Down Expand Up @@ -179,19 +187,37 @@
]

expect(Ansible::Runner).to receive(:run_role_async).with(*runner_options).and_return(response_async)
expect(job).to receive(:queue_signal).with(:poll_runner)
expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil)

job.signal(:execute)

expect(job.context[:ansible_runner_response]).to eq(response_async.dump)
end

it "doesn't queue the next state when running in pods with a success response" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
response_async = Ansible::Runner::ResponseAsync.new(:base_dir => "/path/to/results")

expect(Ansible::Runner).to receive(:run_role_async).and_return(response_async)
expect(job).to receive(:signal).with(:poll_runner)

job.execute
end

it "ansible-runner fails" do
expect(Ansible::Runner).to receive(:run_role_async).and_return(nil)
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error")
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error", :deliver_on => nil)

job.signal(:execute)
end

it "doesn't queue the next state when running in pods with a failure response" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(Ansible::Runner).to receive(:run_role_async).and_return(nil)
expect(job).to receive(:signal).with(:abort, "Failed to run ansible role", "error")

job.execute
end
end

context "#current_job_timeout" do
Expand All @@ -217,11 +243,22 @@

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:queue_signal).with(:post_execute)
expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil)

job.signal(:poll_runner)
end

it "doesn't queue the next state when running in pods and ansible-runner completed " do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(response_async).to receive(:running?).and_return(false)

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:signal).with(:post_execute)

job.poll_runner
end

it "ansible-runner still running" do
now = Time.now.utc
allow(Time).to receive(:now).and_return(now)
Expand All @@ -231,18 +268,46 @@
job.signal(:poll_runner)
end

it "if ansible-runner still runningin pods it loops until the job is done" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(response_async).to receive(:running?).and_return(true, false)

# First loop, the job is still running so we sleep for the poll interval
expect(job).to receive(:sleep).with(1)

# Second loop we get a response and signal the post_execute state
response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:signal).with(:post_execute)

job.poll_runner
end

it "fails if the role has been running too long" do
time = job.started_on + job.options[:timeout] + 5.minutes

Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error")
expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error", :deliver_on => nil)

job.signal(:poll_runner)
end
end

it "Doesn't queue abort state when the role times out and running in pods" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
time = job.started_on + job.options[:timeout] + 5.minutes

Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:signal).with(:abort, "ansible role has been running longer than timeout", "error")

job.poll_runner
end
end

context "deliver_on" do
let(:options) { [{"ENV" => "VAR"}, {"arg1" => "val1"}, {:roles_path => "/path/to/role"}, :poll_interval => 5.minutes] }

Expand Down

0 comments on commit ce6f0c2

Please sign in to comment.