Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store created updated and deleted records #15603

Merged
33 changes: 32 additions & 1 deletion app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class InventoryCollection
:custom_db_finder, :check_changed, :arel, :builder_params, :loaded_references, :db_data_index,
:inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids,
:skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil, :use_ar_object,
:secondary_refs, :secondary_indexes
:secondary_refs, :secondary_indexes, :created_records, :updated_records, :deleted_records

delegate :each, :size, :to => :to_a

Expand Down Expand Up @@ -394,12 +394,28 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
@db_data_index = nil
@data_collection_finalized = false

@created_records = []
@updated_records = []
@deleted_records = []

blacklist_attributes!(attributes_blacklist) if attributes_blacklist.present?
whitelist_attributes!(attributes_whitelist) if attributes_whitelist.present?

validate_inventory_collection!
end

def store_created_records(records)
@created_records += records_identities(records)
end

def store_updated_records(records)
@updated_records += records_identities(records)
end

def store_deleted_records(records)
@deleted_records += records_identities(records)
end

def to_a
data
end
Expand Down Expand Up @@ -947,6 +963,21 @@ def full_collection_for_comparison

attr_writer :attributes_blacklist, :attributes_whitelist, :db_data_index, :parent_inventory_collections

# Returns array of records identities
def records_identities(records)
records = [records] unless records.kind_of?(Array)
records.map { |record| record_identity(record) }
end

# Returns a hash with a simple record identity
def record_identity(record)
identity = record.try(:[], :id) || record.try(:[], "id") || record.try(:id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks overly complicated, it seems like the caller has a better idea of how to get the id out of what its storing than this. Would it make more sense to pass in the id for the created/updated/deleted records?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I would rather have it here, since I can later easily add more params, like manager_ref

raise "Cannot obtain identity of the #{record}" if identity.blank?
{
:id => identity
}
end

# Finds manager_uuid in the DB. Using a configured strategy we cache obtained data in the db_data_index, so the
# same find will not hit database twice. Also if we use lazy_links and this is called when
# data_collection_finalized?, we load all data from the DB, referenced by lazy_links, in one query.
Expand Down
59 changes: 57 additions & 2 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def initialize(inventory_collection)
# Private attrs
@unique_index_keys = inventory_collection.manager_ref_to_cols
@unique_db_primary_keys = Set.new
@unique_db_indexes = Set.new
end

def save_inventory_collection!
Expand All @@ -30,7 +31,57 @@ def save_inventory_collection!

private

attr_reader :unique_index_keys, :unique_db_primary_keys
attr_reader :unique_index_keys, :unique_db_primary_keys, :unique_db_indexes

def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
end

_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************")
# Records that are in the DB, we will be updating or deleting them.
ActiveRecord::Base.transaction do
association.find_each do |record|
index = inventory_collection.object_index_with_keys(unique_index_keys, record)

next unless assert_distinct_relation(record)
next unless assert_unique_record(record, index)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
delete_record!(inventory_collection, record) if inventory_collection.delete_allowed?
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object) if assert_referential_integrity(hash, inventory_object)
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
ActiveRecord::Base.transaction do
inventory_objects_index.each do |index, inventory_object|
hash = attributes_index.delete(index)

create_record!(inventory_collection, hash, inventory_object) if assert_referential_integrity(hash, inventory_object)
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, "\
"created=#{inventory_collection.created_records.count}, "\
"updated=#{inventory_collection.updated_records.count}, "\
"deleted=#{inventory_collection.deleted_records.count} *************")
end

def batch_size
inventory_collection.batch_size
Expand Down Expand Up @@ -61,8 +112,12 @@ def delete_complement(inventory_collection)
end

def delete_record!(inventory_collection, record)
return false unless inventory_collection.delete_allowed?
record.public_send(inventory_collection.delete_method)
inventory_collection.store_deleted_records(record)
end

def assert_unique_record(_record, _index)
# TODO(lsmola) can go away once we indexed our DB with unique indexes
true
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,6 @@ module Saver
class ConcurrentSafe < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(inventory_collection, association)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for consolidating the save! methods now

attributes_index = {}
inventory_objects_index = {}
inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
end

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
ActiveRecord::Base.transaction do
association.find_each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
ActiveRecord::Base.transaction do
inventory_objects_index.each do |index, inventory_object|
create_record!(inventory_collection, attributes_index.delete(index), inventory_object)
created_counter += 1
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def update_record!(inventory_collection, record, hash, inventory_object)
assign_attributes_for_update!(hash, inventory_collection, time_now)
record.assign_attributes(hash.except(:id, :type))
Expand All @@ -64,14 +15,13 @@ def update_record!(inventory_collection, record, hash, inventory_object)
end

update_query.update_all(hash)
inventory_collection.store_updated_records(record)
end

inventory_object.id = record.id
end

def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

all_attribute_keys = hash.keys
hash = inventory_collection.model_class.new(hash).attributes.symbolize_keys
assign_attributes_for_create!(hash, inventory_collection, time_now)
Expand All @@ -80,6 +30,7 @@ def create_record!(inventory_collection, hash, inventory_object)
build_insert_query(inventory_collection, all_attribute_keys, [hash])
)
inventory_object.id = result_id
inventory_collection.store_created_records(inventory_object)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ def save!(inventory_collection, association)
all_attribute_keys += [:created_on, :updated_on] if inventory_collection.supports_timestamps_on_variant?
all_attribute_keys += [:created_at, :updated_at] if inventory_collection.supports_timestamps_at_variant?

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
updated_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************")
hashes_for_update = []
records_for_destroy = []

Expand All @@ -45,7 +41,6 @@ def save!(inventory_collection, association)
# delete it from the DB.
if inventory_collection.delete_allowed?
records_for_destroy << record
deleted_counter += 1
end
else
# Record was found in the DB and sent for saving, we will be updating the DB.
Expand All @@ -69,21 +64,21 @@ def save!(inventory_collection, association)
# Update in batches
if hashes_for_update.size >= batch_size
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count
inventory_collection.store_updated_records(hashes_for_update)

hashes_for_update = []
end

# Destroy in batches
if records_for_destroy.size >= batch_size
destroy_records(records_for_destroy)
inventory_collection.store_deleted_records(records_for_destroy)
records_for_destroy = []
end
end

# Update the last batch
update_records!(inventory_collection, all_attribute_keys, hashes_for_update)
updated_counter += hashes_for_update.count
hashes_for_update = [] # Cleanup so GC can release it sooner

# Destroy the last batch
Expand All @@ -95,11 +90,12 @@ def save!(inventory_collection, association)
if inventory_collection.create_allowed?
inventory_objects_index.each_slice(batch_size) do |batch|
create_records!(inventory_collection, all_attribute_keys, batch, attributes_index)
created_counter += batch.size
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{updated_counter}, deleted=#{deleted_counter} *************")
_log.info("*************** PROCESSED #{inventory_collection}, "\
"created=#{inventory_collection.created_records.count}, "\
"updated=#{inventory_collection.updated_records.count}, "\
"deleted=#{inventory_collection.deleted_records.count} *************")
end

def destroy_records(records)
Expand Down Expand Up @@ -146,6 +142,7 @@ def create_records!(inventory_collection, all_attribute_keys, batch, attributes_
result = ActiveRecord::Base.connection.execute(
build_insert_query(inventory_collection, all_attribute_keys, hashes)
)
inventory_collection.store_created_records(result)
if inventory_collection.dependees.present?
# We need to get primary keys of the created objects, but only if there are dependees that would use them
map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects, all_attribute_keys, hashes, result)
Expand Down
86 changes: 20 additions & 66 deletions app/models/manager_refresh/save_collection/saver/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,37 @@ module Saver
class Default < ManagerRefresh::SaveCollection::Saver::Base
private

def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid

attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
end

unique_db_indexes = Set.new

inventory_collection_size = inventory_collection.size
deleted_counter = 0
created_counter = 0
_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection_size} *************")
# Records that are in the DB, we will be updating or deleting them.
ActiveRecord::Base.transaction do
association.find_each do |record|
next unless assert_distinct_relation(record)

index = inventory_collection.object_index_with_keys(unique_index_keys, record)

# TODO(lsmola) can go away once we indexed our DB with unique indexes
if unique_db_indexes.include?(index) # Include on Set is O(1)
# We have a duplicate in the DB, destroy it. A find_each method does automatically .order(:id => :asc)
# so we always keep the oldest record in the case of duplicates.
_log.warn("A duplicate record was detected and destroyed, inventory_collection: "\
"'#{inventory_collection}', record: '#{record}', duplicate_index: '#{index}'")
record.destroy
else
unique_db_indexes << index
end

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
# delete it from the DB.
deleted_counter += 1 if delete_record!(inventory_collection, record)
else
# Record was found in the DB and sent for saving, we will be updating the DB.
update_record!(inventory_collection, record, hash, inventory_object)
end
end
end

# Records that were not found in the DB but sent for saving, we will be creating these in the DB.
if inventory_collection.create_allowed?
ActiveRecord::Base.transaction do
inventory_objects_index.each do |index, inventory_object|
hash = attributes_index.delete(index)
create_record!(inventory_collection, hash, inventory_object)
created_counter += 1
end
end
end
_log.info("*************** PROCESSED #{inventory_collection}, created=#{created_counter}, "\
"updated=#{inventory_collection_size - created_counter}, deleted=#{deleted_counter} *************")
end

def update_record!(inventory_collection, record, hash, inventory_object)
record.assign_attributes(hash.except(:id, :type))
record.save if !inventory_collection.check_changed? || record.changed?
if !inventory_collection.check_changed? || record.changed?
record.save
inventory_collection.store_updated_records(record)
end

inventory_object.id = record.id
end

def create_record!(inventory_collection, hash, inventory_object)
return unless assert_referential_integrity(hash, inventory_object)

record = inventory_collection.model_class.create!(hash.except(:id))
inventory_collection.store_created_records(record)

inventory_object.id = record.id
end

def assert_unique_record(record, index)
# TODO(lsmola) can go away once we indexed our DB with unique indexes
if unique_db_indexes.include?(index) # Include on Set is O(1)
# We have a duplicate in the DB, destroy it. A find_each method does automatically .order(:id => :asc)
# so we always keep the oldest record in the case of duplicates.
_log.warn("A duplicate record was detected and destroyed, inventory_collection: "\
"'#{inventory_collection}', record: '#{record}', duplicate_index: '#{index}'")
record.destroy
return false
else
unique_db_indexes << index
end
true
end
end
end
end
Loading