Skip to content

Commit

Permalink
Merge pull request #15247 from Ladas/concurent_batch_safe_saver
Browse files Browse the repository at this point in the history
Concurent safe batch saver
  • Loading branch information
agrare authored Jun 11, 2017
2 parents 28c2023 + 25840aa commit d62dd7d
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 76 deletions.
22 changes: 16 additions & 6 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ManagerRefresh::SaveCollection
module Saver
class Base
include Vmdb::Logging
include ManagerRefresh::SaveCollection::Saver::SqlHelper

attr_reader :inventory_collection

Expand Down Expand Up @@ -31,6 +32,10 @@ def save_inventory_collection!

attr_reader :unique_index_keys, :unique_db_primary_keys

def batch_size
1000
end

def delete_complement(inventory_collection)
return unless inventory_collection.delete_allowed?

Expand All @@ -55,6 +60,12 @@ def delete_complement(inventory_collection)
"#{all_manager_uuids_size}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def assert_distinct_relation(record)
if unique_db_primary_keys.include?(record.id) # Include on Set is O(1)
# Change the InventoryCollection's :association or :arel parameter to return distinct results. The :through
Expand All @@ -75,12 +86,11 @@ def assert_distinct_relation(record)

def assert_referential_integrity(hash, inventory_object)
inventory_object.inventory_collection.fixed_foreign_keys.each do |x|
if hash[x].blank?
_log.info("Ignoring #{inventory_object} because of missing foreign key #{x} for "\
"#{inventory_object.inventory_collection.parent.class.name}:"\
"#{inventory_object.inventory_collection.parent.id}")
return false
end
next unless hash[x].blank?
_log.info("Ignoring #{inventory_object} because of missing foreign key #{x} for "\
"#{inventory_object.inventory_collection.parent.class.name}:"\
"#{inventory_object.inventory_collection.parent.id}")
return false
end
true
end
Expand Down
86 changes: 25 additions & 61 deletions app/models/manager_refresh/save_collection/saver/concurrent_safe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,59 +19,42 @@ def save!(inventory_collection, association)
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
ActiveRecord::Base.transaction do
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
# TODO(lsmola) do a transaction for a batches of deletion
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
ActiveRecord::Base.transaction do
association.find_each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(1000) do |batch|
ActiveRecord::Base.transaction do
batch.each do |index, inventory_object|
hash = attributes_index.delete(index)
create_record!(inventory_collection, hash, inventory_object)
created_counter += 1
end
ActiveRecord::Base.transaction do
inventory_objects_index.each do |index, inventory_object|
create_record!(inventory_collection, attributes_index.delete(index), inventory_object)
created_counter += 1
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))

# TODO(lsmola) ignore all N:M relations, since we use pure SQL, all N:M needs to be modeled as a separate IC, or
# can we process those automatically? Using a convention? But still, it needs to be a separate IC, to have
# efficient saving.
hash.reject! { |_key, value| value.kind_of?(Array) }

if !inventory_object.inventory_collection.check_changed? || record.changed?
update_query = inventory_object.inventory_collection.model_class.where(:id => record.id)
if hash[:remote_data_timestamp]
Expand All @@ -88,31 +71,12 @@ def update_record!(inventory_collection, record, hash, inventory_object)
def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank?
table_name = inventory_object.inventory_collection.model_class.table_name
insert_query = %{
INSERT INTO #{table_name} (#{hash.keys.join(", ")})
VALUES
(
#{hash.values.map { |x| ActiveRecord::Base.connection.quote(x) }.join(", ")}
)
ON CONFLICT (#{inventory_object.inventory_collection.unique_index_columns.join(", ")})
DO
UPDATE
SET #{hash.keys.map { |x| "#{x} = EXCLUDED.#{x}" }.join(", ")}
}
# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
# the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of
# changing ems_id could lead to putting it back by a refresh.
all_attribute_keys = hash.keys
hash = inventory_collection.model_class.new(hash).attributes.symbolize_keys

# This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a
# part of the data, since for the fake records, we just want to update ems_ref.
if hash[:remote_data_timestamp].present?
insert_query += %{
WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp)
}
end
result_id = ActiveRecord::Base.connection.insert_sql(insert_query)
result_id = ActiveRecord::Base.connection.insert_sql(
build_insert_query(inventory_collection, all_attribute_keys, [hash])
)
inventory_object.id = result_id
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,137 @@ module Saver
class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(_inventory_collection, _association)
raise "saver_strategy :concurent_safe_batch is not implemented"
def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
all_attribute_keys = Set.new

inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
all_attribute_keys.merge(attributes_index[index].keys)
end

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
updated_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
hashes_for_update = []
records_for_destroy = []

# Records that are in the DB, we will be updating or deleting them.
association.find_in_batches do |batch|
batch.each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
if inventory_collection.delete_allowed?
records_for_destroy << record
deleted_counter += 1
end
else
# Record was found in the DB and sent for saving, we will be updating the DB.
next unless assert_referential_integrity(hash, inventory_object)
inventory_object.id = record.id

record.assign_attributes(hash.except(:id, :type))
if !inventory_collection.check_changed? || record.changed?
hashes_for_update << record.attributes.symbolize_keys
end
end
end

# Update in batches
if hashes_for_update.size >= batch_size
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count

hashes_for_update = []
end

# Destroy in batches
if records_for_destroy.size >= batch_size
destroy_records(records)
records_for_destroy = []
end
end

# Update the last batch
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count
hashes_for_update = [] # Cleanup so GC can release it sooner

# Destroy the last batch
destroy_records(records_for_destroy)
records_for_destroy = [] # Cleanup so GC can release it sooner

all_attribute_keys << :type if inventory_collection.supports_sti?
# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(batch_size) do |batch|
create_records!(inventory_collection, all_attribute_keys, batch, attributes_index)
created_counter += batch.size
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{updated_counter}, deleted=#{deleted_counter} *************")
end

def destroy_records(records)
# TODO(lsmola) we need at least batch disconnect. Batch destroy won't be probably possible because of the
# :dependent => :destroy.
ActiveRecord::Base.transaction do
records.each do |record|
delete_record!(inventory_collection, record)
end
end
end

def update_records!(inventory_collection, all_attribute_keys, hashes)
return if hashes.blank?

ActiveRecord::Base.connection.execute(build_update_query(inventory_collection, all_attribute_keys, hashes))
end

def create_records!(inventory_collection, all_attribute_keys, batch, attributes_index)
indexed_inventory_objects = {}
hashes = []
batch.each do |index, inventory_object|
hash = inventory_collection.model_class.new(attributes_index.delete(index)).attributes.symbolize_keys
next unless assert_referential_integrity(hash, inventory_object)

hashes << hash
# Index on Unique Columns values, so we can easily fill in the :id later
indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| hash[x] }] = inventory_object
end

return if hashes.blank?

ActiveRecord::Base.connection.execute(
build_insert_query(inventory_collection, all_attribute_keys, hashes)
)
# TODO(lsmola) we need to do the mapping only if this IC has dependents/dependees
map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects, hashes)
end

def map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects, hashes)
inventory_collection.model_class.where(
build_multi_selection_query(inventory_collection, hashes)
).find_each do |inserted_record|
inventory_object = indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| inserted_record.public_send(x) }]
inventory_object.id = inserted_record.id if inventory_object
end
end
end
end
Expand Down
8 changes: 1 addition & 7 deletions app/models/manager_refresh/save_collection/saver/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def save!(inventory_collection, association)
inventory_objects_index[index] = inventory_object
end

unique_db_indexes = Set.new
unique_db_indexes = Set.new

inventory_collection_size = inventory_collection.size
deleted_counter = 0
Expand Down Expand Up @@ -66,12 +66,6 @@ def save!(inventory_collection, association)
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
true
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))
record.save if !inventory_collection.check_changed? || record.changed?
Expand Down
Loading

0 comments on commit d62dd7d

Please sign in to comment.