Skip to content

Commit

Permalink
Merge pull request #19673 from agrare/removal_of_miq_vim_broker_worker
Browse files Browse the repository at this point in the history
Completely remove the MiqVimBrokerWorker
  • Loading branch information
Fryguy authored Jan 27, 2020
2 parents d1f056d + 755d219 commit 18534ed
Show file tree
Hide file tree
Showing 15 changed files with 12 additions and 121 deletions.
3 changes: 0 additions & 3 deletions app/models/ext_management_system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,6 @@ def self.refresh_all_ems_timer

def self.refresh_ems(ems_ids, reload = false)
ems_ids = [ems_ids] unless ems_ids.kind_of?(Array)

ExtManagementSystem.where(:id => ems_ids).each { |ems| ems.reset_vim_cache_queue if ems.respond_to?(:reset_vim_cache_queue) } if reload

ems_ids = ems_ids.collect { |id| [ExtManagementSystem, id] }
EmsRefresh.queue_refresh(ems_ids)
end
Expand Down
13 changes: 0 additions & 13 deletions app/models/job_proxy_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ def dispatch

# Skip work if there are no jobs to dispatch
if jobs_to_dispatch.length > 0
broker_available, = Benchmark.realtime_block(:miq_vim_broker_available) { MiqVimBrokerWorker.available_in_zone?(@zone) }
logged_broker_unavailable = false

Benchmark.realtime_block(:active_vm_scans) { active_vm_scans_by_zone }
Benchmark.realtime_block(:busy_proxies) { busy_proxies }
Benchmark.realtime_block(:busy_resources_for_embedded_scanning) { busy_resources_for_embedded_scanning }
Expand All @@ -52,16 +49,6 @@ def dispatch
next
end

if @vm.kind_of?(ManageIQ::Providers::Vmware::InfraManager::Vm) || @vm.kind_of?(ManageIQ::Providers::Vmware::InfraManager::Template)
unless broker_available
unless logged_broker_unavailable
_log.warn("Skipping dispatch because broker is currently unavailable")
logged_broker_unavailable = true
end
next
end
end

proxy = nil
if @all_busy_by_host_id_storage_id["#{@vm.host_id}_#{@vm.storage_id}"]
_log.debug("Skipping job id [#{job.id}] guid [#{job.guid}] for vm: [#{@vm.id}] in this dispatch since a prior job with the same host [#{@vm.host_id}] and storage [#{@vm.storage_id}] determined that all resources are busy.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
class ManageIQ::Providers::BaseManager::MetricsCollectorWorker::Runner < ::MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_metrics_collector role, TODO: only for VMware
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class ManageIQ::Providers::BaseManager::RefreshWorker::Runner < ::MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_inventory role, TODO: only for VMware

def after_initialize
@emss = ExtManagementSystem.find([@cfg[:ems_id]])
@emss.each do |ems|
Expand Down
4 changes: 0 additions & 4 deletions app/models/miq_alert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,6 @@ def self.ems_alarms(db, ems = nil)
msg = "Request to retrieve alarms timed out after #{to} seconds"
$log.warn(msg)
raise msg
rescue MiqException::MiqVimBrokerUnavailable
msg = "Unable to retrieve alarms, Management System Connection Broker is currently unavailable"
$log.warn(msg)
raise msg
rescue => err
$log.warn("'#{err.message}', attempting to retrieve alarms")
raise
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_generic_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
class MiqGenericWorker::Runner < MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For ems_operations and smartstate roles
end
14 changes: 0 additions & 14 deletions app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ def get_message
end
end

def message_delivery_suspended?
self.class.delay_queue_delivery_for_vim_broker? && !MiqVimBrokerWorker.available?
end

def deliver_queue_message(msg, &block)
reset_poll_escalate if poll_method == :sleep_poll_escalate

Expand All @@ -118,18 +114,9 @@ def deliver_queue_message(msg, &block)

msg.delivered(status, message, result) unless status == MiqQueue::STATUS_RETRY
do_exit("Exiting worker due to timeout error", 1) if status == MiqQueue::STATUS_TIMEOUT
rescue MiqException::MiqVimBrokerUnavailable
_log.error("#{log_prefix} VimBrokerWorker is not available. Requeueing message...")
msg.unget
ensure
$_miq_worker_current_msg = nil # to avoid log messages inadvertantly prefixed by previous task_id
Thread.current[:tracking_label] = nil
#
# This tells the broker to release any memory being held on behalf of this process
# and reset the global broker handle ($vim_broker_client).
# This is a NOOP if global broker handle is not set.
#
clean_broker_connection
end
end

Expand All @@ -149,7 +136,6 @@ def do_work
loop do
heartbeat
break if thresholds_exceeded?
break if message_delivery_suspended?
msg = get_message
break if msg.nil?
deliver_message(msg)
Expand Down
14 changes: 0 additions & 14 deletions app/models/miq_server/server_smart_proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ module MiqServer::ServerSmartProxy

SMART_ROLES = %w(smartproxy smartstate).freeze

module ClassMethods
# Called from VM scan job as well as scan_sync_vm

def use_broker_for_embedded_proxy?(type = nil)
result = ManageIQ::Providers::Vmware::InfraManager.use_vim_broker? &&
::Settings.coresident_miqproxy[:use_vim_broker]
return result if type.blank? || !result

# Check for a specific type (host/ems) if passed
# Default use_vim_broker is true for ems type
::Settings.coresident_miqproxy["use_vim_broker_#{type}"] == true
end
end

def is_a_proxy?
self.has_role?(:SmartProxy)
end
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ def worker_not_responding(w)
_log.warn(msg)
MiqEvent.raise_evm_event_queue(w.miq_server, "evm_worker_killed", :event_details => msg, :type => w.class.name)
w.kill
MiqVimBrokerWorker.cleanup_for_pid(w.pid)
end

def sync_workers
Expand Down
2 changes: 0 additions & 2 deletions app/models/miq_smart_proxy_worker/runner.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
class MiqSmartProxyWorker::Runner < MiqQueueWorkerBase::Runner
self.delay_startup_for_vim_broker = true # NOTE: For smartproxy role

def do_before_work_loop
@tid = start_heartbeat_thread
end
Expand Down
43 changes: 0 additions & 43 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,6 @@ def worker_monitor_drb
end
end

###############################
# VimBrokerWorker Methods
###############################

def self.delay_startup_for_vim_broker?
!!@delay_startup_for_vim_broker
end

class << self
attr_writer :delay_startup_for_vim_broker
end

def self.delay_queue_delivery_for_vim_broker?
!!@delay_queue_delivery_for_vim_broker
end

class << self
attr_writer :delay_queue_delivery_for_vim_broker

alias require_vim_broker? delay_queue_delivery_for_vim_broker?
alias require_vim_broker= delay_queue_delivery_for_vim_broker=
end

def start
prepare
run
Expand All @@ -128,7 +105,6 @@ def prepare
set_database_application_name
ObjectSpace.garbage_collect
started_worker_record
do_delay_startup_for_vim_broker if self.class.delay_startup_for_vim_broker? && MiqVimBrokerWorker.workers > 0
do_before_work_loop
self
end
Expand Down Expand Up @@ -272,16 +248,6 @@ def do_work
raise NotImplementedError, _("must be implemented in a subclass")
end

def do_delay_startup_for_vim_broker
_log.info("#{log_prefix} Checking that VIM Broker has started before doing work")
loop do
break if MiqVimBrokerWorker.available?
heartbeat
sleep 3
end
_log.info("#{log_prefix} Starting work since VIM Broker has started")
end

def do_work_loop
warn_about_heartbeat_skipping if skip_heartbeat?
loop do
Expand Down Expand Up @@ -477,15 +443,6 @@ def process_message(message, *args)
end
end

def clean_broker_connection
if $vim_broker_client
$vim_broker_client.releaseSession(Process.pid)
$vim_broker_client = nil
end
rescue => err
_log.info("#{log_prefix} Releasing any broker connections for pid: [#{Process.pid}], ERROR: #{err.message}")
end

def process_title
type = @worker.abbreviated_class_name
title = "#{MiqWorker::PROCESS_TITLE_PREFIX} #{type} id: #{@worker.id}"
Expand Down
8 changes: 2 additions & 6 deletions app/models/vm_or_template.rb
Original file line number Diff line number Diff line change
Expand Up @@ -438,12 +438,8 @@ def self.task_arguments(options)
private_class_method :task_arguments

def powerops_callback(task_id, status, msg, result, queue_item)
if queue_item.last_exception.kind_of?(MiqException::MiqVimBrokerUnavailable)
queue_item.requeue(:deliver_on => 1.minute.from_now.utc)
else
task = MiqTask.find_by(:id => task_id)
task.queue_callback("Finished", status, msg, result) if task
end
task = MiqTask.find_by(:id => task_id)
task.queue_callback("Finished", status, msg, result) if task
end

# override
Expand Down
2 changes: 0 additions & 2 deletions spec/models/job_proxy_dispatcher_embedded_scan_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ def assert_at_most_x_scan_jobs_per_y_resource(x_scans, y_resource)

context "and a scan job for each vm" do
before do
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)

@jobs = @vms.collect(&:raw_scan)
end

Expand Down
5 changes: 0 additions & 5 deletions spec/models/job_proxy_dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
context "with a vm without a storage" do
before do
# Test a vm without a storage (ie, removed from VC but retained in the VMDB)
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage = nil
@vm.save
Expand All @@ -103,7 +102,6 @@
context "with a Microsoft vm without a storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage = nil
@vm.vendor = "microsoft"
Expand All @@ -119,7 +117,6 @@
context "with a Microsoft vm with a Microsoft storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage.store_type = "CSVFS"
@vm.vendor = "microsoft"
Expand All @@ -135,7 +132,6 @@
context "with a Microsoft vm with an invalid storage" do
before do
# Test a Microsoft vm without a storage
allow(MiqVimBrokerWorker).to receive(:available_in_zone?).and_return(true)
@vm = @vms.first
@vm.storage.store_type = "XFS"
@vm.vendor = "microsoft"
Expand All @@ -151,7 +147,6 @@

context "with jobs, a default smartproxy for repo scanning" do
before do
allow(MiqVimBrokerWorker).to receive(:available?).and_return(true)
@repo_proxy = @proxies.last
if @repo_proxy
@repo_proxy.name = "repo_proxy"
Expand Down
20 changes: 10 additions & 10 deletions spec/models/miq_action_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
tenant = FactoryBot.create(:tenant)
group = FactoryBot.create(:miq_group, :tenant => tenant)
@user = FactoryBot.create(:user, :userid => "test", :miq_groups => [group])
@vm = FactoryBot.create(:vm_vmware, :evm_owner => @user, :miq_group => group)
@vm = FactoryBot.create(:vm_infra, :evm_owner => @user, :miq_group => group)
@action = FactoryBot.create(:miq_action)
expect(@action).not_to be_nil
@action.options = {:ae_request => "test_custom_automation"}
Expand Down Expand Up @@ -64,9 +64,9 @@

context "#action_evm_event" do
it "for Vm" do
ems = FactoryBot.create(:ems_vmware)
host = FactoryBot.create(:host_vmware)
vm = FactoryBot.create(:vm_vmware, :host => host, :ext_management_system => ems)
ems = FactoryBot.create(:ems_infra)
host = FactoryBot.create(:host)
vm = FactoryBot.create(:vm_infra, :host => host, :ext_management_system => ems)
action = FactoryBot.create(:miq_action)
res = action.action_evm_event(action, vm, :policy => FactoryBot.create(:miq_policy))

Expand All @@ -86,7 +86,7 @@

context "#raise_automation_event" do
before do
@vm = FactoryBot.create(:vm_vmware)
@vm = FactoryBot.create(:vm_infra)
allow(@vm).to receive(:my_zone).and_return("vm_zone")
FactoryBot.create(:miq_event_definition, :name => "raise_automation_event")
FactoryBot.create(:miq_event_definition, :name => "vm_start")
Expand Down Expand Up @@ -156,9 +156,9 @@
context "#action_vm_stop" do
before do
@zone = FactoryBot.create(:zone)
@ems = FactoryBot.create(:ems_vmware, :zone => @zone)
@host = FactoryBot.create(:host_vmware)
@vm = FactoryBot.create(:vm_vmware, :host => @host, :ext_management_system => @ems)
@ems = FactoryBot.create(:ems_infra, :zone => @zone)
@host = FactoryBot.create(:host)
@vm = FactoryBot.create(:vm_infra, :host => @host, :ext_management_system => @ems)
@action = FactoryBot.create(:miq_action, :name => "vm_stop")
end

Expand Down Expand Up @@ -186,7 +186,7 @@

context "#action_vm_retire" do
before do
@vm = FactoryBot.create(:vm_vmware)
@vm = FactoryBot.create(:vm_infra)
allow(@vm).to receive(:my_zone).and_return("vm_zone")
@event = FactoryBot.create(:miq_event_definition, :name => "assigned_company_tag")
@action = FactoryBot.create(:miq_action, :name => "vm_retire")
Expand Down Expand Up @@ -509,7 +509,7 @@ def stub_csv(data)
let(:tenant) { FactoryBot.create(:tenant) }
let(:group) { FactoryBot.create(:miq_group, :tenant => tenant) }
let(:user) { FactoryBot.create(:user, :userid => "test", :miq_groups => [group]) }
let(:vm) { FactoryBot.create(:vm_vmware, :evm_owner => user, :miq_group => group, :hardware => hardware) }
let(:vm) { FactoryBot.create(:vm_infra, :evm_owner => user, :miq_group => group, :hardware => hardware) }
let(:action) { FactoryBot.create(:miq_action, :name => "run_ansible_playbook", :options => action_options) }
let(:stap) { FactoryBot.create(:service_template_ansible_playbook) }
let(:ip1) { "1.1.1.94" }
Expand Down

0 comments on commit 18534ed

Please sign in to comment.