Skip to content

Commit

Permalink
Add a saver class and thread
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Apr 23, 2018
1 parent f116c7c commit 3ee682f
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ class ManageIQ::Providers::Vmware::InfraManager::Inventory::Collector
include PropertyCollector
include Vmdb::Logging

def initialize(ems, run_once: false)
def initialize(ems, run_once: false, threaded: true)
@ems = ems
@inventory_cache = ems.class::Inventory::Cache.new
@run_once = run_once
@saver = ems.class::Inventory::Saver.new(:threaded => threaded)

self.exit_requested = false
end

def run
_log.info("Monitor updates thread started")

saver.start_thread

until exit_requested
monitor_updates
break if run_once
end

_log.info("Exiting...")
saver.stop_thread

_log.info("Monitor updates thread exited")
end

def stop
_log.info("Exit request received...")
_log.info("Monitor updates thread exiting...")
self.exit_requested = true
end

Expand All @@ -33,7 +39,7 @@ def monitor_updates

private

attr_reader :ems, :inventory_cache, :run_once
attr_reader :ems, :inventory_cache, :run_once, :saver
attr_accessor :exit_requested

def connect
Expand Down Expand Up @@ -115,7 +121,8 @@ def wait_for_updates(vim)

next if update_set.truncated

ManagerRefresh::SaveInventory.save_inventory(ems, persister.inventory_collections)
save_inventory(persister)

persister = nil
parser = nil

Expand Down Expand Up @@ -163,4 +170,8 @@ def process_object_update_modify(obj, change_set, _missing_set = [])
def process_object_update_leave(obj)
inventory_cache.delete_object(obj.class.wsdl_name, obj._ref)
end

def save_inventory(persister)
saver.queue_save_inventory(persister)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
class ManageIQ::Providers::Vmware::InfraManager::Inventory::Saver
include Vmdb::Logging

def initialize(threaded: true)
@join_limit = 30
@queue = Queue.new
@should_exit = false
@threaded = threaded
@thread = nil
end

def start_thread
return unless threaded

@thread = Thread.new do
saver_thread
_log.info("Save inventory thread exiting")
end

_log.info("Save inventory thread started")
end

def stop_thread(wait: true)
return unless threaded

_log.info("Save inventory thread stopping...")

@should_exit = true
join_thread if wait
end

def queue_save_inventory(persister)
if threaded
ensure_saver_thread
queue.push(persister)
else
save_inventory(persister)
end
end

private

attr_reader :join_limit, :queue, :should_exit, :thread, :threaded

def saver_thread
loop do
while (persister = dequeue)
save_inventory(persister)
end

break if should_exit

sleep(5)
end
rescue => err
_log.warn(err)
_log.log_backtrace(err)
end

def join_thread
return unless thread&.alive?

unless thread.join(join_limit)
thread.kill
end
end

def ensure_saver_thread
return if thread&.alive?

_log.warn("Save inventory thread exited, restarting")
start_thread
end

def dequeue
queue.deq(:non_block => true)
rescue ThreadError
end

def save_inventory(persister)
ems = persister.manager
inventory = persister.inventory_collections

ManagerRefresh::SaveInventory.save_inventory(ems, inventory)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
ems.update_authentication(:default => {:userid => username, :password => password})
end
end
let(:collector) { described_class.new(ems, :run_once => true) }
let(:collector) { described_class.new(ems, :run_once => true, :threaded => false) }

context "#wait_for_updates" do
it "Performs a full refresh" do
2.times do
# All VIM API calls go to uri https://hostname/sdk so we have to match on the body
VCR.use_cassette(described_class.name.underscore, :match_requests_on => [:body]) do
collector.run
collector.monitor_updates

ems.reload

Expand Down

0 comments on commit 3ee682f

Please sign in to comment.