Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update driven refresh #186

Merged
merged 7 commits into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory < ManagerRefresh::Inventory
require_nested :Cache
require_nested :Collector
require_nested :Parser
require_nested :Persister
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Cache
def initialize
@data = Hash.new { |h, k| h[k] = {} }
end

def insert(obj, change_set = [])
props = data[obj.class.wsdl_name][obj._ref] = {}
process_change_set(props, change_set)
props
end

def delete(obj)
data[obj.class.wsdl_name].delete(obj._ref)
end

def update(obj, change_set)
props = data[obj.class.wsdl_name][obj._ref]
process_change_set(props, change_set) unless props.nil?
props
end

delegate :[], :keys, :to => :data

private

attr_reader :data

def process_change_set(props, change_set)
change_set.each do |prop_change|
process_prop_change(props, prop_change)
end
end

def process_prop_change(prop_hash, prop_change)
h, prop_str = hash_target(prop_hash, prop_change.name)
tag, key = tag_and_key(prop_str)

case prop_change.op
when "add"
h[tag] ||= []
h[tag] << prop_change.val
when "remove", "indirectRemove"
if key
a, i = get_array_entry(h[tag], key)
a.delete_at(i)
else
h.delete(tag)
end
when "assign"
if key
# TODO
raise "Array properties aren't supported yet"
else
h[tag] = prop_change.val
end
end
end

def hash_target(base_hash, key_string)
h = base_hash
prop_keys = split_prop_path(key_string)

prop_keys[0...-1].each do |key|
key, array_key = tag_and_key(key)
if array_key
array, idx = get_array_entry(h[key], array_key)
raise "hashTarget: Could not traverse tree through array element #{k}[#{array_key}] in #{key_string}" unless array
h = array[idx]
else
h[key] ||= {}
h = h[key]
end
end

return h, prop_keys[-1]
end

def split_prop_path(prop_path)
path_array = []
in_key = false
pc = ""

prop_path.split(//).each do |c|
case c
when "."
unless in_key
path_array << pc
pc = ""
next
end
when "["
in_key = true
when "]"
in_key = false
end
pc << c
end

path_array << pc unless pc.empty?
path_array
end

def tag_and_key(prop_str)
return prop_str.to_sym, nil unless prop_str.include?("[")

if prop_str =~ /([^\[]+)\[([^\]]+)\]/
tag, key = $1, $2
else
raise "tagAndKey: malformed property string #{prop_str}"
end
key = key[1...-1] if key[0, 1] == '"' && key[-1, 1] == '"'
return tag.to_sym, key
end

def get_array_entry(array, key)
return nil, nil unless array.kind_of?(Array)

array.each_index do |n|
array_entry = array[n]

entry_key = array_entry.respond_to?("key") ? array_entry.key : array_entry
case entry_key
when RbVmomi::BasicTypes::ManagedObject
return array, n if entry_key._ref == key
else
return array, n if entry_key.to_s == key
end
end
end
end
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include InventoryCache
include PropertyCollector
include Vmdb::Logging

attr_reader :ems, :exit_requested
private :ems, :exit_requested

def initialize(ems)
@ems = ems
@exit_requested = false
@inventory_cache = ems.class::Inventory::Cache.new
end

def run
Expand All @@ -18,7 +15,7 @@ def run
begin
wait_for_updates(vim)
rescue RbVmomi::Fault => err
_log.err("Caught exception #{err.message}")
_log.error("Caught exception #{err.message}")
_log.log_backtrace(err)
ensure
vim.close unless vim.nil?
Expand All @@ -38,6 +35,8 @@ def stop

private

attr_reader :ems, :exit_requested, :inventory_cache

def connect(host, username, password)
_log.info("Connecting to #{username}@#{host}...")

Expand Down Expand Up @@ -99,7 +98,13 @@ def wait_for_updates(vim, run_once: false)
object_update_set = property_filter_update.objectSet
next if object_update_set.blank?

process_object_update_set(object_update_set) { |obj, props| parser.parse(obj, props) }
_log.info("Processing #{object_update_set.count} updates...")

process_object_update_set(object_update_set).each do |managed_object, props|
parser.parse(managed_object, props)
end

_log.info("Processing #{object_update_set.count} updates...Complete")
end

next if update_set.truncated
Expand All @@ -119,77 +124,37 @@ def wait_for_updates(vim, run_once: false)
property_filter.DestroyPropertyFilter unless property_filter.nil?
end

def process_object_update_set(object_update_set, &block)
_log.info("Processing #{object_update_set.count} updates...")

object_update_set.each do |object_update|
process_object_update(object_update, &block)
def process_object_update_set(object_update_set)
object_update_set.map do |object_update|
process_object_update(object_update)
end

_log.info("Processing #{object_update_set.count} updates...Complete")
end

def process_object_update(object_update)
managed_object = object_update.obj

props =
case object_update.kind
when "enter", "modify"
process_object_update_modify(managed_object, object_update.changeSet)
when "enter"
process_object_update_enter(managed_object, object_update.changeSet, object_update.missingSet)
when "modify"
process_object_update_modify(managed_object, object_update.changeSet, object_update.missingSet)
when "leave"
process_object_update_leave(managed_object)
end

yield managed_object, props if block_given?

return managed_object, props
end

def process_object_update_modify(obj, change_set, _missing_set = [])
obj_type = obj.class.wsdl_name
obj_ref = obj._ref

props = inventory_cache[obj_type][obj_ref].dup

change_set.each do |property_change|
next if property_change.nil?

case property_change.op
when 'add'
process_property_change_add(props, property_change)
when 'remove', 'indirectRemove'
process_property_change_remove(props, property_change)
when 'assign'
process_property_change_assign(props, property_change)
end
end

update_inventory_cache(obj_type, obj_ref, props)

props
end

def process_object_update_leave(obj)
obj_type = obj.class.wsdl_name
obj_ref = obj._ref

inventory_cache[obj_type].delete(obj_ref)

nil
end

def process_property_change_add(props, property_change)
name = property_change.name

props[name] ||= []
props[name] << property_change.val
def process_object_update_enter(obj, change_set, _missing_set = [])
inventory_cache.insert(obj, change_set)
end

def process_property_change_remove(props, property_change)
props.delete(property_change.name)
def process_object_update_modify(obj, change_set, _missing_set = [])
inventory_cache.update(obj, change_set)
end

def process_property_change_assign(props, property_change)
props[property_change.name] = property_change.val
def process_object_update_leave(obj)
inventory_cache.delete_object(obj.class.wsdl_name, obj._ref)
end
end

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,9 @@ def parse_compute_resource(object, props)
cluster_hash = {
:ems_ref => object._ref,
:uid_ems => object._ref,
:name => CGI.unescape(props[:name]),
}

if props.include?("name")
cluster_hash[:name] = URI.decode(props["name"])
end

parse_compute_resource_summary(cluster_hash, props)
parse_compute_resource_das_config(cluster_hash, props)
parse_compute_resource_drs_config(cluster_hash, props)
Expand All @@ -53,13 +50,10 @@ def parse_datacenter(object, props)
:ems_ref => object._ref,
:uid_ems => object._ref,
:type => "Datacenter",
:name => CGI.unescape(props[:name]),
:ems_children => {},
}

if props.include?("name")
dc_hash[:name] = URI.decode(props["name"])
end

parse_datacenter_children(dc_hash, props)

persister.ems_folders.build(dc_hash)
Expand Down Expand Up @@ -102,13 +96,10 @@ def parse_folder(object, props)
:ems_ref => object._ref,
:uid_ems => object._ref,
:type => "EmsFolder",
:name => CGI.unescape(props[:name]),
:ems_children => {},
}

if props.include?("name")
folder_hash[:name] = URI.decode(props["name"])
end

parse_folder_children(folder_hash, props)

persister.ems_folders.build(folder_hash)
Expand Down Expand Up @@ -154,13 +145,10 @@ def parse_resource_pool(object, props)
rp_hash = {
:ems_ref => object._ref,
:uid_ems => object._ref,
:name => CGI.unescape(props[:name]),
:vapp => object.kind_of?(RbVmomi::VIM::VirtualApp),
}

if props.include?("name")
rp_hash[:name] = URI.decode(props["name"])
end

parse_resource_pool_memory_allocation(rp_hash, props)
parse_resource_pool_cpu_allocation(rp_hash, props)
parse_resource_pool_children(rp_hash, props)
Expand All @@ -173,16 +161,15 @@ def parse_storage_pod(object, props)
persister.ems_folders.manager_uuids << object._ref
return if props.nil?

name = props.fetch_path(:summary, :name)

pod_hash = {
:ems_ref => object._ref,
:uid_ems => object._ref,
:type => "StorageCluster",
:name => CGI.unescape(name),
}

if props.include?("summary.name")
pod_hash[:name] = URI.decode(props["summary.name"])
end

persister.ems_folders.build(pod_hash)
end

Expand Down
Loading