From 4e51332764d54820b6703f063f6aa3a08ee8fb11 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Wed, 25 Jul 2018 13:42:58 -0400 Subject: [PATCH] Add a state machine for long ansible operations This adds a Job state machine for long running async ansible operations. --- .../providers/ansible_operations_workflow.rb | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 app/models/manageiq/providers/ansible_operations_workflow.rb diff --git a/app/models/manageiq/providers/ansible_operations_workflow.rb b/app/models/manageiq/providers/ansible_operations_workflow.rb new file mode 100644 index 000000000000..1937cb28816c --- /dev/null +++ b/app/models/manageiq/providers/ansible_operations_workflow.rb @@ -0,0 +1,91 @@ +class ManageIQ::Providers::AnsibleOperationsWorkflow < Job + def self.create_job(env_vars, extra_vars, playbook_path, timeout: 1.hour, poll_interval: 1.minute) + options = { + :env_vars => env_vars, + :extra_vars => extra_vars, + :playbook_path => playbook_path, + :timeout => timeout, + :poll_interval => poll_interval, + } + + super(name, options) + end + + def pre_playbook + # A step before running the playbook for any optional setup tasks + queue_signal(:run_playbook) + end + + def run_playbook + env_vars, extra_vars, playbook_path = options.values_at(:env_vars, :extra_vars, :playbook_path) + + pid = Ansible::Runner.run_async(env_vars, extra_vars, playbook_path) + if pid.nil? + queue_signal(:error) + else + context[:ansible_runner_pid] = pid + update_attributes!(:context => context) + + queue_signal(:poll_runner) + end + end + + def poll_runner + if Ansible::Runner.running?(context[:ansible_runner_pid]) + queue_signal(:poll_runner, :deliver_on => deliver_on) + else + queue_signal(:post_playbook) + end + end + + def post_playbook + # A step after running the playbook for any optional cleanup tasks + queue_signal(:finish) + end + + alias_method :initializing, :dispatch_start + alias_method :start, :pre_playbook + alias_method :finish, :process_finished + alias_method :abort_job, :process_abort + alias_method :cancel, :process_cancel + alias_method :error, :process_error + + protected + + def queue_signal(*args, deliver_on: nil) + role = options[:role] || "ems_operations" + priority = options[:priority] || MiqQueue::NORMAL_PRIORITY + + MiqQueue.put( + :class_name => self.class.name, + :method_name => "signal", + :instance_id => id, + :priority => priority, + :role => role, + :zone => zone, + :task_id => guid, + :args => args, + :deliver_on => deliver_on + ) + end + + def deliver_on + Time.now.utc + options[:poll_interval] + end + + def load_transitions + self.state ||= 'initialize' + + { + :initializing => {'initialize' => 'waiting_to_start'}, + :start => {'waiting_to_start' => 'pre_playbook'}, + :run_playbook => {'pre_playbook' => 'running'}, + :poll_runner => {'running' => 'running'}, + :post_playbook => {'running' => 'post_playbook'}, + :finish => {'*' => 'finished'}, + :abort_job => {'*' => 'aborting'}, + :cancel => {'*' => 'canceling'}, + :error => {'*' => '*'} + } + end +end