Skip to content

Commit

Permalink
Merge pull request #234 from agrare/streaming_refresh_refactoring
Browse files Browse the repository at this point in the history
Streaming refresh refactoring
  • Loading branch information
Ladas authored Apr 23, 2018
2 parents 0be2f13 + 6ddfae5 commit f116c7c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,44 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems)
def initialize(ems, run_once: false)
@ems = ems
@exit_requested = false
@inventory_cache = ems.class::Inventory::Cache.new
@run_once = run_once

self.exit_requested = false
end

def run
until exit_requested
vim = connect(ems.hostname, ems.authentication_userid, ems.authentication_password)

begin
wait_for_updates(vim)
rescue RbVmomi::Fault => err
_log.error("Caught exception #{err.message}")
_log.log_backtrace(err)
ensure
vim.close unless vim.nil?
vim = nil
end
monitor_updates
break if run_once
end

_log.info("Exiting...")
ensure
vim.serviceContent.sessionManager.Logout unless vim.nil?
end

def stop
_log.info("Exit request received...")
@exit_requested = true
self.exit_requested = true
end

def monitor_updates
vim = connect
wait_for_updates(vim)
ensure
disconnect(vim)
end

private

attr_reader :ems, :exit_requested, :inventory_cache
attr_reader :ems, :inventory_cache, :run_once
attr_accessor :exit_requested

def connect
host = ems.hostname
username, password = ems.auth_user_pwd

def connect(host, username, password)
_log.info("Connecting to #{username}@#{host}...")

vim_opts = {
Expand All @@ -60,7 +62,13 @@ def connect(host, username, password)
conn
end

def wait_for_updates(vim, run_once: false)
def disconnect(vim)
return if vim.nil?

vim.serviceContent.sessionManager.Logout
end

def wait_for_updates(vim)
property_filter = create_property_filter(vim)

# Return if we don't receive any updates for 60 seconds break
Expand All @@ -85,28 +93,26 @@ def wait_for_updates(vim, run_once: false)
# Save the new update set version
version = update_set.version

property_filter_update_set = update_set.filterSet
next if property_filter_update_set.blank?
next if update_set.filterSet.blank?

property_filter_update = update_set.filterSet.detect { |update| update.filter == property_filter }
next if property_filter_update.nil?

# After the initial UpdateSet switch to a targeted persister
persister ||= ems.class::Inventory::Persister::Targeted.new(ems)
parser ||= ems.class::Inventory::Parser.new(persister)

property_filter_update_set.each do |property_filter_update|
next if property_filter_update.filter != property_filter
object_update_set = property_filter_update.objectSet
next if object_update_set.blank?

object_update_set = property_filter_update.objectSet
next if object_update_set.blank?
_log.info("Processing #{object_update_set.count} updates...")

_log.info("Processing #{object_update_set.count} updates...")

process_object_update_set(object_update_set).each do |managed_object, props|
parser.parse(managed_object, props)
end

_log.info("Processing #{object_update_set.count} updates...Complete")
process_object_update_set(object_update_set).each do |managed_object, props|
parser.parse(managed_object, props)
end

_log.info("Processing #{object_update_set.count} updates...Complete")

next if update_set.truncated

ManagerRefresh::SaveInventory.save_inventory(ems, persister.inventory_collections)
Expand All @@ -121,7 +127,7 @@ def wait_for_updates(vim, run_once: false)
break if run_once
end
ensure
property_filter.DestroyPropertyFilter unless property_filter.nil?
destroy_property_filter(property_filter)
end

def process_object_update_set(object_update_set)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def create_property_filter(vim)
vim.propertyCollector.CreateFilter(:spec => spec, :partialUpdates => true)
end

def destroy_property_filter(property_filter)
return if property_filter.nil?

property_filter.DestroyPropertyFilter
end

def full_traversal_object_spec(root)
traversal_spec = [
folder_to_child_entity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:collector) { described_class.new(ems) }
let(:collector) { described_class.new(ems, :run_once => true) }

context "#wait_for_updates" do
it "Performs a full refresh" do
2.times do
# All VIM API calls go to uri https://hostname/sdk so we have to match on the body
VCR.use_cassette(described_class.name.underscore, :match_requests_on => [:body]) do
vim = collector.send(:connect, ems.hostname, ems.authentication_userid, ems.authentication_password)
collector.send(:wait_for_updates, vim, :run_once => true)
vim.close
collector.run

ems.reload

Expand Down

0 comments on commit f116c7c

Please sign in to comment.