diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index e386183ab23..05a2c28a020 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -10,7 +10,7 @@ class InventoryCollection :inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids, :skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil, :use_ar_object, :secondary_refs, :secondary_indexes, :created_records, :updated_records, :deleted_records, - :custom_reconnect_block + :custom_reconnect_block, :batch_extra_attributes delegate :each, :size, :to => :to_a @@ -321,6 +321,8 @@ class InventoryCollection # Allowed saver strategies are: # - :default => Using Rails saving methods, this way is not safe to run in multiple workers concurrently, # since it will lead to non consistent data. + # - :batch => Using batch SQL queries, this way is not safe to run in multiple workers + # concurrently, since it will lead to non consistent data. # - :concurrent_safe => This method is designed for concurrent saving. It uses atomic upsert to avoid # data duplication and it uses timestamp based atomic checks to avoid new data being overwritten by the # the old data. @@ -368,6 +370,10 @@ class InventoryCollection # @param use_ar_object [Boolean] True or False. Whether we need to initialize AR object as part of the saving # it's needed if the model have special setters, serialize of columns, etc. This setting is relevant only # for the batch saver strategy. + # @param batch_extra_attributes [Array] Array of symbols marking which extra attributes we want to store into the + # db. These extra attributes might be a product of :use_ar_object assignment and we need to specify them + # manually, if we want to use a batch saving strategy and we have models that populate attributes as a side + # effect. def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil, strategy: nil, saved: nil, custom_save_block: nil, delete_method: nil, data_index: nil, data: nil, dependency_attributes: nil, attributes_blacklist: nil, attributes_whitelist: nil, complete: nil, update_only: nil, @@ -375,7 +381,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil inventory_object_attributes: nil, unique_index_columns: nil, name: nil, saver_strategy: nil, parent_inventory_collections: nil, manager_uuids: [], all_manager_uuids: nil, targeted_arel: nil, targeted: nil, manager_ref_allowed_nil: nil, secondary_refs: {}, use_ar_object: nil, - custom_reconnect_block: nil) + custom_reconnect_block: nil, batch_extra_attributes: []) @model_class = model_class @manager_ref = manager_ref || [:ems_ref] @secondary_refs = secondary_refs @@ -402,6 +408,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil @name = name || association || model_class.to_s.demodulize.tableize @saver_strategy = process_saver_strategy(saver_strategy) @use_ar_object = use_ar_object || false + @batch_extra_attributes = batch_extra_attributes @manager_ref_allowed_nil = manager_ref_allowed_nil || [] @@ -502,11 +509,11 @@ def process_saver_strategy(saver_strategy) return :default unless saver_strategy case saver_strategy - when :default, :concurrent_safe, :concurrent_safe_batch + when :default, :batch, :concurrent_safe, :concurrent_safe_batch saver_strategy else raise "Unknown InventoryCollection saver strategy: :#{saver_strategy}, allowed strategies are "\ - ":default, :concurrent_safe and :concurrent_safe_batch" + ":default, :batch, :concurrent_safe and :concurrent_safe_batch" end end @@ -996,7 +1003,7 @@ def full_collection_for_comparison # Returns array of records identities def records_identities(records) - records = [records] unless records.kind_of?(Array) + records = [records] unless records.respond_to?(:map) records.map { |record| record_identity(record) } end diff --git a/app/models/manager_refresh/save_collection/saver/batch.rb b/app/models/manager_refresh/save_collection/saver/batch.rb new file mode 100644 index 00000000000..fbe4af82256 --- /dev/null +++ b/app/models/manager_refresh/save_collection/saver/batch.rb @@ -0,0 +1,15 @@ +module ManagerRefresh::SaveCollection + module Saver + class Batch < ManagerRefresh::SaveCollection::Saver::ConcurrentSafeBatch + private + + def unique_index_columns + inventory_collection.manager_ref_to_cols.map(&:to_sym) + end + + def on_conflict_update + false + end + end + end +end diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 2809e0003f4..ad1d9efa612 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -6,7 +6,7 @@ class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base def save!(inventory_collection, association) attributes_index = {} inventory_objects_index = {} - all_attribute_keys = Set.new + all_attribute_keys = Set.new + inventory_collection.batch_extra_attributes inventory_collection.each do |inventory_object| attributes = inventory_object.attributes(inventory_collection) @@ -58,7 +58,8 @@ def save!(inventory_collection, association) assign_attributes_for_update!(hash_for_update, inventory_collection, update_time) inventory_collection.store_updated_records(record) - hashes_for_update << hash_for_update.except(:id, :type) + hash_for_update[:id] = record.id + hashes_for_update << hash_for_update.except(:type) end end @@ -72,7 +73,6 @@ def save!(inventory_collection, association) # Destroy in batches if records_for_destroy.size >= batch_size destroy_records(records_for_destroy) - inventory_collection.store_deleted_records(records_for_destroy) records_for_destroy = [] end end @@ -83,7 +83,6 @@ def save!(inventory_collection, association) # Destroy the last batch destroy_records(records_for_destroy) - inventory_collection.store_deleted_records(records_for_destroy) records_for_destroy = [] # Cleanup so GC can release it sooner all_attribute_keys << :type if inventory_collection.supports_sti? @@ -135,7 +134,7 @@ def create_records!(inventory_collection, all_attribute_keys, batch, attributes_ hashes << hash # Index on Unique Columns values, so we can easily fill in the :id later - indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| hash[x] }] = inventory_object + indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object end return if hashes.blank? @@ -166,15 +165,15 @@ def map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects # for every remainders(a last batch in a stream of batches) if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size result.each do |inserted_record| - key = inventory_collection.unique_index_columns.map { |x| inserted_record[x.to_s] } + key = unique_index_columns.map { |x| inserted_record[x.to_s] } inventory_object = indexed_inventory_objects[key] inventory_object.id = inserted_record["id"] if inventory_object end else inventory_collection.model_class.where( build_multi_selection_query(inventory_collection, hashes) - ).select(inventory_collection.unique_index_columns + [:id]).each do |inserted_record| - key = inventory_collection.unique_index_columns.map { |x| inserted_record.public_send(x) } + ).select(unique_index_columns + [:id]).each do |inserted_record| + key = unique_index_columns.map { |x| inserted_record.public_send(x) } inventory_object = indexed_inventory_objects[key] inventory_object.id = inserted_record.id if inventory_object end diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index fa1be08f312..f94ddac149e 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -1,6 +1,14 @@ module ManagerRefresh::SaveCollection module Saver module SqlHelper + def unique_index_columns + inventory_collection.unique_index_columns + end + + def on_conflict_update + true + end + # TODO(lsmola) all below methods should be rewritten to arel, but we need to first extend arel to be able to do # this def build_insert_set_cols(key) @@ -19,12 +27,17 @@ def build_insert_query(inventory_collection, all_attribute_keys, hashes) INSERT INTO #{table_name} (#{col_names}) VALUES #{values} - ON CONFLICT (#{inventory_collection.unique_index_columns.map { |x| quote_column_name(x) }.join(",")}) - DO - UPDATE - SET #{all_attribute_keys_array.map { |key| build_insert_set_cols(key) }.join(", ")} } + if on_conflict_update + insert_query += %{ + ON CONFLICT (#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}) + DO + UPDATE + SET #{all_attribute_keys_array.map { |key| build_insert_set_cols(key) }.join(", ")} + } + end + # TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change # the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of # changing ems_id could lead to putting it back by a refresh. @@ -39,11 +52,9 @@ def build_insert_query(inventory_collection, all_attribute_keys, hashes) } end - if inventory_collection.dependees.present? - insert_query += %{ - RETURNING id,#{inventory_collection.unique_index_columns.map { |x| quote_column_name(x) }.join(",")} - } - end + insert_query += %{ + RETURNING "id",#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")} + } insert_query end @@ -59,17 +70,13 @@ def quote_column_name(key) def build_update_query(inventory_collection, all_attribute_keys, hashes) # We want to ignore type and create timestamps when updating all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(type created_at created_on).include?(x) } + all_attribute_keys_array << :id table_name = inventory_collection.model_class.table_name - used_unique_index_keys = inventory_collection.unique_index_columns & all_attribute_keys.to_a values = hashes.map do |hash| "(#{all_attribute_keys_array.map { |x| quote(hash[x], x, inventory_collection) }.join(",")})" end.join(",") - where_cond = used_unique_index_keys.map do |x| - "updated_values.#{quote_column_name(x)} = #{table_name}.#{quote_column_name(x)}" - end.join(" AND ") - update_query = %{ UPDATE #{table_name} SET @@ -78,7 +85,7 @@ def build_update_query(inventory_collection, all_attribute_keys, hashes) VALUES #{values} ) AS updated_values (#{all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",")}) - WHERE #{where_cond} + WHERE updated_values.id = #{table_name}.id } # TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change @@ -96,7 +103,7 @@ def build_update_query(inventory_collection, all_attribute_keys, hashes) end def build_multi_selection_query(inventory_collection, hashes) - inventory_collection.build_multi_selection_condition(hashes, inventory_collection.unique_index_columns) + inventory_collection.build_multi_selection_condition(hashes, unique_index_columns) end def quote(value, name = nil, inventory_collection = nil)