Skip to content

Commit

Permalink
refactor queue_signal
Browse files Browse the repository at this point in the history
  • Loading branch information
jameswnl committed Jan 30, 2019
1 parent ec1470a commit 371e584
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 32 deletions.
14 changes: 14 additions & 0 deletions app/models/job/state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,18 @@ def signal(signal, *args)
raise _("%{signal} is not permitted at state %{state}") % {:signal => signal, :state => state}
end
end

def queue_signal(*args, deliver_on: nil, role: "ems_operations", priority: MiqQueue::NORMAL_PRIORITY)
MiqQueue.put(
:class_name => self.class.name,
:method_name => "signal",
:instance_id => id,
:priority => priority,
:role => role,
:zone => zone,
:task_id => guid,
:args => args,
:deliver_on => deliver_on
)
end
end
13 changes: 1 addition & 12 deletions app/models/manageiq/providers/ems_refresh_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,7 @@ def poll_refresh
def queue_signal(*args, deliver_on: nil)
role = options[:role] || "ems_operations"
priority = options[:priority] || MiqQueue::NORMAL_PRIORITY

MiqQueue.put(
:class_name => self.class.name,
:method_name => "signal",
:instance_id => id,
:priority => priority,
:role => role,
:zone => zone,
:task_id => guid,
:args => args,
:deliver_on => deliver_on
)
super(*args, deliver_on, role, priority)
end

alias_method :initializing, :dispatch_start
Expand Down
29 changes: 9 additions & 20 deletions app/models/manageiq/providers/infra_conversion_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ def abort_conversion(message, status)

def poll_conversion
# TODO: how much time should we wait before timing out?
self.message = "Getting conversion state"
message = "Getting conversion state"
_log.info(prep_message(message))

if migration_task.options[:virtv2v_wrapper].nil? || migration_task.options[:virtv2v_wrapper]['state_file'].nil?
self.message = "options[:virtv2v_wrapper]['state_file'] not available, continuing poll_conversion"
message = "options[:virtv2v_wrapper]['state_file'] not available, continuing poll_conversion"
_log.info(prep_message(message))
return queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end
Expand All @@ -92,34 +92,34 @@ def poll_conversion
end

v2v_status = migration_task.options[:virtv2v_status]
self.message = "virtv2v_status=#{status}"
message = "virtv2v_status=#{status}"
_log.info(prep_message(message))

case v2v_status
when 'active'
queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
when 'failed'
self.message = "disk conversion failed"
message = "disk conversion failed"
abort_conversion(prep_message(message), 'error')
when 'succeeded'
self.message = "disk conversion succeeded"
message = "disk conversion succeeded"
_log.info(prep_message(message))
queue_signal(:start_post_stage)
else
self.message = prep_message("Unknown converstion status: #{v2v_status}")
message = prep_message("Unknown converstion status: #{v2v_status}")
abort_conversion(message, 'error')
end
end

def start_post_stage
# once we refactor Automate's PostTransformation into a job, we kick start it here
self.message = 'To wait for PostTransformation ...'
message = 'To wait for PostTransformation ...'
_log.info(prep_message("To start polling for PostTransformation stage"))
queue_signal(:poll_post_stage, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end

def poll_post_stage
self.message = "PostTransformation state=#{migration_task.state}, status=#{migration_task.status}"
message = "PostTransformation state=#{migration_task.state}, status=#{migration_task.status}"
_log.info(prep_message(message))
if migration_task.state == 'finished'
self.status = migration_task.status
Expand All @@ -132,18 +132,7 @@ def poll_post_stage
def queue_signal(*args, deliver_on: nil)
role = options[:role] || "ems_operations"
priority = options[:priority] || MiqQueue::NORMAL_PRIORITY

MiqQueue.put(
:class_name => self.class.name,
:method_name => "signal",
:instance_id => id,
:priority => priority,
:role => role,
:zone => zone,
:task_id => guid,
:args => args,
:deliver_on => deliver_on
)
super(*args, deliver_on, role, priority)
end

def prep_message(contents)
Expand Down

0 comments on commit 371e584

Please sign in to comment.