Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't queue things that need to run on the same worker container #19956

Merged
merged 5 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 65 additions & 26 deletions app/models/manageiq/providers/ansible_runner_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def start
def pre_execute
verify_options
prepare_repository
queue_signal(:execute)
route_signal(:execute)
end

def launch_runner
Expand All @@ -40,42 +40,20 @@ def execute
response = launch_runner

if response.nil?
queue_signal(:abort, "Failed to run ansible #{execution_type}", "error")
route_signal(:abort, "Failed to run ansible #{execution_type}", "error")
else
context[:ansible_runner_response] = response.dump

started_on = Time.now.utc
update!(:context => context, :started_on => started_on)
miq_task.update!(:started_on => started_on)

queue_signal(:poll_runner)
route_signal(:poll_runner)
end
end

def poll_runner
response = Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response])
if response.running?
if started_on + options[:timeout] < Time.now.utc
response.stop

queue_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error")
else
queue_signal(:poll_runner, :deliver_on => deliver_on)
end
else
result = response.response

context[:ansible_runner_return_code] = result.return_code
context[:ansible_runner_stdout] = result.parsed_stdout

if result.return_code != 0
set_status("ansible #{execution_type} failed", "error")
_log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}")
else
set_status("ansible #{execution_type} completed with no errors", "ok")
end
queue_signal(:post_execute)
end
MiqEnvironment::Command.is_podified? ? wait_for_runner_process : wait_for_runner_process_async
end

def post_execute
Expand All @@ -91,6 +69,16 @@ def post_execute

protected

# Continue in the current process if we're running in pods, or queue the message for the next worker otherwise
# We can't queue in pods as jobs of this type depend on filesystem state
def route_signal(*args, deliver_on: nil)
Fryguy marked this conversation as resolved.
Show resolved Hide resolved
if MiqEnvironment::Command.is_podified?
signal(*args)
else
queue_signal(*args, :deliver_on => deliver_on)
end
end

def queue_signal(*args, deliver_on: nil)
role = options[:role] || "ems_operations"
priority = options[:priority] || MiqQueue::NORMAL_PRIORITY
Expand Down Expand Up @@ -121,6 +109,57 @@ def load_transitions

private

def wait_for_runner_process
monitor = runner_monitor

# If we're running in pods loop so we don't exhaust the stack limit in very long jobs
loop do
break unless monitor.running?
return handle_runner_timeout(monitor) if job_timeout_exceeded?

sleep options[:poll_interval]
end

process_runner_result(monitor.response)
end

def wait_for_runner_process_async
monitor = runner_monitor

if monitor.running?
return handle_runner_timeout(monitor) if job_timeout_exceeded?
queue_signal(:poll_runner, :deliver_on => deliver_on)
else
process_runner_result(monitor.response)
end
end

def process_runner_result(result)
context[:ansible_runner_return_code] = result.return_code
context[:ansible_runner_stdout] = result.parsed_stdout

if result.return_code != 0
set_status("ansible #{execution_type} failed", "error")
_log.warn("ansible #{execution_type} failed:\n#{result.parsed_stdout.join("\n")}")
else
set_status("ansible #{execution_type} completed with no errors", "ok")
end
route_signal(:post_execute)
end

def handle_runner_timeout(monitor)
monitor.stop
route_signal(:abort, "ansible #{execution_type} has been running longer than timeout", "error")
end

def job_timeout_exceeded?
started_on + options[:timeout] < Time.now.utc
end

def runner_monitor
Ansible::Runner::ResponseAsync.load(context[:ansible_runner_response])
end

def verify_options
raise NotImplementedError, "must be implemented in a subclass"
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
context "with playbook_path" do
it "succeeds" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand All @@ -122,7 +122,7 @@

it "will checkout the git repository to a temp dir before proceeding" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand Down Expand Up @@ -181,7 +181,7 @@
]

expect(Ansible::Runner).to receive(:run_async).with(*runner_options).and_return(response_async)
expect(job).to receive(:queue_signal).with(:poll_runner)
expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil)

job.signal(:execute)

Expand All @@ -190,7 +190,7 @@

it "ansible-runner fails" do
expect(Ansible::Runner).to receive(:run_async).and_return(nil)
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error")
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible playbook", "error", :deliver_on => nil)

job.signal(:execute)
end
Expand All @@ -213,7 +213,7 @@

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:queue_signal).with(:post_execute)
expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil)

job.signal(:poll_runner)
end
Expand All @@ -233,7 +233,7 @@
Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error")
expect(job).to receive(:queue_signal).with(:abort, "ansible playbook has been running longer than timeout", "error", :deliver_on => nil)

job.signal(:poll_runner)
end
Expand Down
77 changes: 71 additions & 6 deletions spec/models/manageiq/providers/ansible_role_workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
context "with roles_path" do
it "succeeds" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to_not receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

Expand All @@ -122,13 +122,21 @@

it "will checkout the git repository to a temp dir before proceeding" do
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:queue_signal).with(:execute)
expect(job).to receive(:queue_signal).with(:execute, :deliver_on => nil)

job.signal(:pre_execute)

expect(job.options[:roles_path]).to start_with File.join(Dir.tmpdir, "ansible-runner-git")
expect(job.options[:roles_path]).to end_with roles_relative_path
end

it "doesn't queue the next state when running in pods" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect_any_instance_of(ManageIQ::Providers::EmbeddedAnsible::AutomationManager::ConfigurationScriptSource).to receive(:checkout_git_repository)
expect(job).to receive(:signal).with(:execute)

job.pre_execute
end
end

context "without role_name" do
Expand Down Expand Up @@ -179,19 +187,37 @@
]

expect(Ansible::Runner).to receive(:run_role_async).with(*runner_options).and_return(response_async)
expect(job).to receive(:queue_signal).with(:poll_runner)
expect(job).to receive(:queue_signal).with(:poll_runner, :deliver_on => nil)

job.signal(:execute)

expect(job.context[:ansible_runner_response]).to eq(response_async.dump)
end

it "doesn't queue the next state when running in pods with a success response" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
response_async = Ansible::Runner::ResponseAsync.new(:base_dir => "/path/to/results")

expect(Ansible::Runner).to receive(:run_role_async).and_return(response_async)
expect(job).to receive(:signal).with(:poll_runner)

job.execute
end

it "ansible-runner fails" do
expect(Ansible::Runner).to receive(:run_role_async).and_return(nil)
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error")
expect(job).to receive(:queue_signal).with(:abort, "Failed to run ansible role", "error", :deliver_on => nil)

job.signal(:execute)
end

it "doesn't queue the next state when running in pods with a failure response" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(Ansible::Runner).to receive(:run_role_async).and_return(nil)
expect(job).to receive(:signal).with(:abort, "Failed to run ansible role", "error")

job.execute
end
end

context "#current_job_timeout" do
Expand All @@ -217,11 +243,22 @@

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:queue_signal).with(:post_execute)
expect(job).to receive(:queue_signal).with(:post_execute, :deliver_on => nil)

job.signal(:poll_runner)
end

it "doesn't queue the next state when running in pods and ansible-runner completed " do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(response_async).to receive(:running?).and_return(false)

response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:signal).with(:post_execute)

job.poll_runner
end

it "ansible-runner still running" do
now = Time.now.utc
allow(Time).to receive(:now).and_return(now)
Expand All @@ -231,18 +268,46 @@
job.signal(:poll_runner)
end

it "if ansible-runner still runningin pods it loops until the job is done" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
expect(response_async).to receive(:running?).and_return(true, false)

# First loop, the job is still running so we sleep for the poll interval
expect(job).to receive(:sleep).with(1)

# Second loop we get a response and signal the post_execute state
response = Ansible::Runner::Response.new(response_async.dump.merge(:return_code => 0))
expect(response_async).to receive(:response).and_return(response)
expect(job).to receive(:signal).with(:post_execute)

job.poll_runner
end

it "fails if the role has been running too long" do
time = job.started_on + job.options[:timeout] + 5.minutes

Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error")
expect(job).to receive(:queue_signal).with(:abort, "ansible role has been running longer than timeout", "error", :deliver_on => nil)

job.signal(:poll_runner)
end
end

it "Doesn't queue abort state when the role times out and running in pods" do
allow(MiqEnvironment::Command).to receive(:is_podified?).and_return(true)
time = job.started_on + job.options[:timeout] + 5.minutes

Timecop.travel(time) do
expect(response_async).to receive(:running?).and_return(true)
expect(response_async).to receive(:stop)
expect(job).to receive(:signal).with(:abort, "ansible role has been running longer than timeout", "error")

job.poll_runner
end
end

context "deliver_on" do
let(:options) { [{"ENV" => "VAR"}, {"arg1" => "val1"}, {:roles_path => "/path/to/role"}, :poll_interval => 5.minutes] }

Expand Down