Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2v Throttling #18415

Merged
merged 18 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions app/models/infra_conversion_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
class InfraConversionJob < Job
def self.create_job(options)
# TODO: from settings/user plan settings
options[:conversion_polling_interval] ||= 60 # in seconds
options[:poll_conversion_max] ||= 24 * 60 # i.e. default 24 hour (with 60s per-interval)
options[:poll_post_stage_max] ||= 30 # i.e. default 30 minutes
super(name, options)
end

#
# State-transition diagram:
# :poll_conversion :poll_post_stage
# * /-------------\ /---------------\
# | :initialize | | | |
# v :start v | v |
# waiting_to_start --------> running ------------------------------> post_conversion --/
# | | :start_post_stage |
# | :abort_job | :abort_job |
# \------------------------>| | :finish
# v |
# aborting --------------------------------->|
# :finish v
# finished
#

alias_method :initializing, :dispatch_start
alias_method :finish, :process_finished
alias_method :abort_job, :process_abort
alias_method :cancel, :process_cancel
alias_method :error, :process_error

def load_transitions
self.state ||= 'initialize'

{
:initializing => {'initialize' => 'waiting_to_start'},
:start => {'waiting_to_start' => 'running'},
:poll_conversion => {'running' => 'running'},
:start_post_stage => {'running' => 'post_conversion'},
:poll_post_stage => {'post_conversion' => 'post_conversion'},
:finish => {'*' => 'finished'},
:abort_job => {'*' => 'aborting'},
:cancel => {'*' => 'canceling'},
:error => {'*' => '*'}
}
end

def migration_task
@migration_task ||= target_entity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target_entity is known at initialization time right? This looks like it could just be an attr_reader that is set in initialize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is caching the query result of target_entity. Shall I rely on rails to take care of caching, and just do a alias_method :migration_task, :target_entity?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no problem with the caching, I was saying in initialize you could do @migration_task = target_entity and make migration_task an attr_reader which is basically what this duplicates the behavior of.

# valid states: %w(migrated pending finished active queued)
end

def start
# TransformationCleanup 3 things:
# - kill v2v: ignored because no converion_host is there yet in the original automate-based logic
# - power_on: ignored
# - check_power_on: ignore

migration_task.preflight_check
_log.info(prep_message("Preflight check passed, task.state=#{migration_task.state}. continue ..."))
queue_signal(:poll_conversion)
rescue => error
message = prep_message("Preflight check has failed: #{error}")
_log.info(message)
abort_conversion(message, 'error')
end

def abort_conversion(message, status)
migration_task.cancel
queue_signal(:abort_job, message, status)
end

def polling_timeout(poll_type)
count = "#{poll_type}_count".to_sym
max = "#{poll_type}_max".to_sym
context[count] = (context[count] || 0) + 1
context[count] > options[max]
end

def poll_conversion
return abort_conversion("Polling times out", 'error') if polling_timeout(:poll_conversion)

message = "Getting conversion state"
_log.info(prep_message(message))

unless migration_task.options.fetch_path(:virtv2v_wrapper, 'state_file')
message = "Virt v2v state file not available, continuing poll_conversion"
_log.info(prep_message(message))
return queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end

begin
migration_task.get_conversion_state # migration_task.options will be updated
rescue => exception
_log.log_backtrace(exception)
return abort_conversion("Conversion error: #{exception}", 'error')
end

v2v_status = migration_task.options[:virtv2v_status]
message = "virtv2v_status=#{v2v_status}"
_log.info(prep_message(message))

case v2v_status
when 'active'
queue_signal(:poll_conversion, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
when 'failed'
message = "disk conversion failed"
abort_conversion(prep_message(message), 'error')
when 'succeeded'
message = "disk conversion succeeded"
_log.info(prep_message(message))
queue_signal(:start_post_stage)
else
message = prep_message("Unknown converstion status: #{v2v_status}")
abort_conversion(message, 'error')
end
end

def start_post_stage
# once we refactor Automate's PostTransformation into a job, we kick start it here
message = "To wait for Post-Transformation progress"
_log.info(prep_message(message))
queue_signal(:poll_post_stage, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end

def poll_post_stage
return abort_conversion("Polling times out", 'error') if polling_timeout(:poll_post_stage)

message = "PostTransformation state=#{migration_task.state}, status=#{migration_task.status}"
_log.info(prep_message(message))
if migration_task.state == 'finished'
self.status = migration_task.status
queue_signal(:finish)
else
queue_signal(:poll_post_stage, :deliver_on => Time.now.utc + options[:conversion_polling_interval])
end
end

def queue_signal(*args, deliver_on: nil)
MiqQueue.put(
:class_name => self.class.name,
:method_name => "signal",
:instance_id => id,
:role => "ems_operations",
:zone => zone,
:task_id => guid,
:args => args,
:deliver_on => deliver_on
)
end

def prep_message(contents)
"MiqRequestTask id=#{migration_task.id}, InfraConversionJob id=#{id}. #{contents}"
end
end
5 changes: 5 additions & 0 deletions app/models/job_proxy_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def initialize

def dispatch
_dummy, t = Benchmark.realtime_block(:total_time) do
Benchmark.realtime_block(:v2v_dispatching) { dispatch_v2v_migrations }
Benchmark.realtime_block(:container_dispatching) { dispatch_container_scan_jobs }
jobs_to_dispatch, = Benchmark.realtime_block(:pending_vm_jobs) { pending_jobs }
Benchmark.current_realtime[:vm_jobs_to_dispatch_count] = jobs_to_dispatch.length
Expand Down Expand Up @@ -98,6 +99,10 @@ def container_image_scan_class
ManageIQ::Providers::Kubernetes::ContainerManager::Scanning::Job
end

def dispatch_v2v_migrations
InfraConversionThrottler.start_conversions
end

def dispatch_container_scan_jobs
jobs_by_ems, = Benchmark.realtime_block(:pending_container_jobs) { pending_container_jobs }
Benchmark.current_realtime[:container_jobs_to_dispatch_count] = jobs_by_ems.values.reduce(0) { |m, o| m + o.length }
Expand Down
12 changes: 12 additions & 0 deletions app/models/service_template_transformation_plan_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,16 @@ def update_request_status
Notification.create(:type => "transformation_plan_request_failed", :options => {:plan_name => description}, :subject => self)
end
end

def post_create_request_tasks
miq_request_tasks.each do |req_task|
job_options = {
:target_class => req_task.class.name,
:target_id => req_task.id
}
job = InfraConversionJob.create_job(job_options)
req_task.options[:infra_conversion_job_id] = job.id
req_task.save!
end
end
end
77 changes: 47 additions & 30 deletions app/models/service_template_transformation_plan_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ def post_ansible_playbook_service_template
end

def update_transformation_progress(progress)
options[:progress] = (options[:progress] || {}).merge(progress)
save
update_options(:progress => (options[:progress] || {}).merge(progress))
end

def task_finished
Expand All @@ -52,13 +51,12 @@ def task_active
# This method returns true if all mappings are ok. It also preload
# virtv2v_disks and network_mappings in task options
def preflight_check
raise 'OSP destination and source power_state is off' if destination_ems.emstype == 'openstack' && source.power_state == 'off'
update_options(:source_vm_power_state => source.power_state) # This will determine power_state of destination_vm
jameswnl marked this conversation as resolved.
Show resolved Hide resolved
destination_cluster
virtv2v_disks
network_mappings
raise if destination_ems.emstype == 'openstack' && source.power_state == 'off'
true
rescue
false
update_attributes(:state => 'migrate')
end

def source_cluster
Expand All @@ -85,19 +83,13 @@ def transformation_type

def virtv2v_disks
return options[:virtv2v_disks] if options[:virtv2v_disks].present?

options[:virtv2v_disks] = calculate_virtv2v_disks
save!

update_options(:virtv2v_disks => calculate_virtv2v_disks)
options[:virtv2v_disks]
end

def network_mappings
return options[:network_mappings] if options[:network_mappings].present?

options[:network_mappings] = calculate_network_mappings
save!

update_options(:network_mappings => calculate_network_mappings)
options[:network_mappings]
end

Expand Down Expand Up @@ -148,18 +140,23 @@ def transformation_log_queue(userid = nil)
end

_log.info("Queuing the download of transformation log for #{description} with ID [#{id}]")
options = {:userid => userid, :action => 'transformation_log'}
task_options = {:userid => userid, :action => 'transformation_log'}
queue_options = {:class_name => self.class,
:method_name => 'transformation_log',
:instance_id => id,
:priority => MiqQueue::HIGH_PRIORITY,
:args => [],
:zone => conversion_host.resource.my_zone}
MiqTask.generic_action_with_callback(options, queue_options)
MiqTask.generic_action_with_callback(task_options, queue_options)
end

def infra_conversion_job
Job.find(options[:infra_conversion_job_id])
end

def cancel
update_attributes(:cancelation_status => MiqRequestTask::CANCEL_STATUS_REQUESTED)
infra_conversion_job.cancel
end

def canceling
Expand All @@ -176,49 +173,65 @@ def conversion_options
destination_cluster = transformation_destination(source_cluster)
destination_storage = transformation_destination(source_storage)

options = {
results = {
:source_disks => virtv2v_disks.map { |disk| disk[:path] },
:network_mappings => network_mappings
}

options.merge!(send("conversion_options_source_provider_#{source_ems.emstype}_#{source_transport_method}", source_storage))
options.merge!(send("conversion_options_destination_provider_#{destination_ems.emstype}", destination_cluster, destination_storage))
results.merge!(send("conversion_options_source_provider_#{source_ems.emstype}_#{source_transport_method}", source_storage))
results.merge!(send("conversion_options_destination_provider_#{destination_ems.emstype}", destination_cluster, destination_storage))
end

def update_options(opts)
with_lock do
jameswnl marked this conversation as resolved.
Show resolved Hide resolved
# Automate is updating this options hash (various keys) as well, using with_lock.
options.merge!(opts)
update_attributes(:options => options)
end
options
end

def run_conversion
start_timestamp = Time.now.utc.strftime('%Y-%m-%d %H:%M:%S')
options[:virtv2v_wrapper] = conversion_host.run_conversion(conversion_options)
options[:virtv2v_started_on] = start_timestamp
options[:virtv2v_status] = 'active'
updates = {}
updates[:virtv2v_wrapper] = conversion_host.run_conversion(conversion_options)
updates[:virtv2v_started_on] = start_timestamp
updates[:virtv2v_status] = 'active'
_log.info("InfraConversionJob run_conversion to update_options: #{updates}")
update_options(updates)
end

def get_conversion_state
updates = {}
virtv2v_state = conversion_host.get_conversion_state(options[:virtv2v_wrapper]['state_file'])
updated_disks = virtv2v_disks
if virtv2v_state['finished'].nil?
updated_disks.each do |disk|
matching_disks = virtv2v_state['disks'].select { |d| d['path'] == disk[:path] }
raise "No disk matches '#{disk[:path]}'. Aborting." if matching_disks.length.zero?
raise "More than one disk matches '#{disk[:path]}'. Aborting." if matching_disks.length > 1
raise "More than one disk matches '#{disk[:path]}'. Aborting." if matching_disks.length > 1
disk[:percent] = matching_disks.first['progress']
end
else
options[:virtv2v_finished_on] = Time.now.utc.strftime('%Y-%m-%d %H:%M:%S')
updates[:virtv2v_finished_on] = Time.now.utc.strftime('%Y-%m-%d %H:%M:%S')
if virtv2v_state['failed']
options[:virtv2v_status] = 'failed'
updates[:virtv2v_status] = 'failed'
raise "Disks transformation failed."
else
options[:virtv2v_status] = 'succeeded'
updates[:virtv2v_status] = 'succeeded'
updated_disks.each { |d| d[:percent] = 100 }
end
end
options[:virtv2v_disks] = updated_disks
end
updates[:virtv2v_disks] = updated_disks
ensure
_log.info("InfraConversionJob get_conversion_state to update_options: #{updates}")
update_options(updates)
end

def kill_virtv2v(signal = 'TERM')
virtv2v_state = conversion_host.get_conversion_state(options[:virtv2v_wrapper]['state_file'])
conversion_host.kill_process(virtv2v_state['pid'].to_s, signal)
return false if options[:virtv2v_started_on].blank? || options[:virtv2v_finished_on].present? || options[:virtv2v_wrapper].blank?
return false unless options[:virtv2v_wrapper]['pid']
conversion_host.kill_process(options[:virtv2v_wrapper]['pid'], signal)
end

private
Expand Down Expand Up @@ -322,4 +335,8 @@ def conversion_options_destination_provider_openstack(cluster, storage)
:osp_security_groups_ids => [destination_security_group.ems_ref]
}
end

def valid_states
super << 'migrate'
end
end
24 changes: 24 additions & 0 deletions lib/infra_conversion_throttler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class InfraConversionThrottler
DEFAULT_EMS_MAX_RUNNERS = 10

def self.start_conversions
pending_conversion_jobs.each do |ems, jobs|
running = ems.conversion_hosts.inject(0) { |sum, ch| sum + ch.active_tasks.size }
slots = (ems.miq_custom_get('Max Transformation Runners') || DEFAULT_EMS_MAX_RUNNERS).to_i - running
jobs.each do |job|
eligible_hosts = ems.conversion_hosts.select(&:eligible?).sort_by { |ch| ch.active_tasks.size }
break if slots <= 0 || eligible_hosts.empty?
job.migration_task.update_attributes!(:conversion_host => eligible_hosts.first)
job.queue_signal(:start)
_log.info("Pening InfraConversionJob: id=#{job.id} signaled to start")
slots -= 1
end
end
end

def self.pending_conversion_jobs
pending = InfraConversionJob.where(:state => 'waiting_to_start')
_log.info("Pening InfraConversionJob: #{pending.count}")
pending.group_by { |job| job.migration_task.destination_ems }
end
end
7 changes: 7 additions & 0 deletions spec/factories/job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FactoryBot.define do
factory :job do
sequence(:name) { |n| "job_#{seq_padded_for_sorting(n)}" }
end

factory :infra_conversion_job, :parent => :job
end
Loading