diff --git a/app/models/manageiq/providers/inventory/persister.rb b/app/models/manageiq/providers/inventory/persister.rb index f2b01248cb5..92f2db107b1 100644 --- a/app/models/manageiq/providers/inventory/persister.rb +++ b/app/models/manageiq/providers/inventory/persister.rb @@ -94,7 +94,10 @@ def initialize_inventory_collections # @return [Hash] entire Persister object serialized to hash def to_hash collections_data = collections.map do |_, collection| - next if collection.data.blank? && collection.targeted_scope.primary_references.blank? && collection.all_manager_uuids.nil? + next if collection.data.blank? && + collection.targeted_scope.primary_references.blank? && + collection.all_manager_uuids.nil? && + collection.skeletal_primary_index.index_data.blank? collection.to_hash end.compact diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index 5e1620dc601..0b69d7b0ed4 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -82,6 +82,7 @@ class InventoryCollection delegate :<<, :build, + :build_partial, :data, :each, :find_or_build, @@ -570,6 +571,150 @@ def parallel_safe? @parallel_safe_cache ||= %i(concurrent_safe concurrent_safe_batch).include?(saver_strategy) end + # @return [Boolean] true if the model_class supports STI + def supports_sti? + @supports_sti_cache = model_class.column_names.include?("type") if @supports_sti_cache.nil? + @supports_sti_cache + end + + # @return [Boolean] true if the model_class has created_on column + def supports_created_on? + if @supports_created_on_cache.nil? + @supports_created_on_cache = (model_class.column_names.include?("created_on") && ActiveRecord::Base.record_timestamps) + end + @supports_created_on_cache + end + + # @return [Boolean] true if the model_class has updated_on column + def supports_updated_on? + if @supports_updated_on_cache.nil? + @supports_updated_on_cache = (model_class.column_names.include?("updated_on") && ActiveRecord::Base.record_timestamps) + end + @supports_updated_on_cache + end + + # @return [Boolean] true if the model_class has created_at column + def supports_created_at? + if @supports_created_at_cache.nil? + @supports_created_at_cache = (model_class.column_names.include?("created_at") && ActiveRecord::Base.record_timestamps) + end + @supports_created_at_cache + end + + # @return [Boolean] true if the model_class has updated_at column + def supports_updated_at? + if @supports_updated_at_cache.nil? + @supports_updated_at_cache = (model_class.column_names.include?("updated_at") && ActiveRecord::Base.record_timestamps) + end + @supports_updated_at_cache + end + + # @return [Boolean] true if the model_class has resource_timestamps_max column + def supports_resource_timestamps_max? + @supports_resource_timestamps_max_cache ||= model_class.column_names.include?("resource_timestamps_max") + end + + # @return [Boolean] true if the model_class has resource_timestamps column + def supports_resource_timestamps? + @supports_resource_timestamps_cache ||= model_class.column_names.include?("resource_timestamps") + end + + # @return [Boolean] true if the model_class has resource_timestamp column + def supports_resource_timestamp? + @supports_resource_timestamp_cache ||= model_class.column_names.include?("resource_timestamp") + end + + # @return [Boolean] true if the model_class has resource_versions_max column + def supports_resource_versions_max? + @supports_resource_versions_max_cache ||= model_class.column_names.include?("resource_versions_max") + end + + # @return [Boolean] true if the model_class has resource_versions column + def supports_resource_versions? + @supports_resource_versions_cache ||= model_class.column_names.include?("resource_versions") + end + + # @return [Boolean] true if the model_class has resource_version column + def supports_resource_version? + @supports_resource_version_cache ||= model_class.column_names.include?("resource_version") + end + + # @return [Array] all columns that are part of the best fit unique index + def unique_index_columns + return @unique_index_columns if @unique_index_columns + + @unique_index_columns = unique_index_for(unique_index_keys).columns.map(&:to_sym) + end + + def unique_index_keys + @unique_index_keys ||= manager_ref_to_cols.map(&:to_sym) + end + + # @return [Array] array of all unique indexes known to model + def unique_indexes + @unique_indexes_cache if @unique_indexes_cache + + @unique_indexes_cache = model_class.connection.indexes(model_class.table_name).select(&:unique) + + if @unique_indexes_cache.blank? + raise "#{self} and its table #{model_class.table_name} must have a unique index defined, to"\ + " be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch." + end + + @unique_indexes_cache + end + + # Finds an index that fits the list of columns (keys) the best + # + # @param keys [Array] + # @raise [Exception] if the unique index for the columns was not found + # @return [ActiveRecord::ConnectionAdapters::IndexDefinition] unique index fitting the keys + def unique_index_for(keys) + @unique_index_for_keys_cache ||= {} + @unique_index_for_keys_cache[keys] if @unique_index_for_keys_cache[keys] + + # Find all uniq indexes that that are covering our keys + uniq_key_candidates = unique_indexes.each_with_object([]) { |i, obj| obj << i if (keys - i.columns.map(&:to_sym)).empty? } + + if @unique_indexes_cache.blank? + raise "#{self} and its table #{model_class.table_name} must have a unique index defined "\ + "covering columns #{keys} to be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch." + end + + # Take the uniq key having the least number of columns + @unique_index_for_keys_cache[keys] = uniq_key_candidates.min_by { |x| x.columns.count } + end + + def internal_columns + return @internal_columns if @internal_columns + + @internal_columns = [] + internal_timestamp_columns + @internal_columns << :type if supports_sti? + @internal_columns << :resource_timestamps_max if supports_resource_timestamps_max? + @internal_columns << :resource_timestamps if supports_resource_timestamps? + @internal_columns << :resource_timestamp if supports_resource_timestamp? + @internal_columns << :resource_versions_max if supports_resource_versions_max? + @internal_columns << :resource_versions if supports_resource_versions? + @internal_columns << :resource_version if supports_resource_version? + @internal_columns + end + + def internal_timestamp_columns + return @internal_timestamp_columns if @internal_timestamp_columns + + @internal_timestamp_columns = [] + @internal_timestamp_columns << :created_on if supports_created_on? + @internal_timestamp_columns << :created_at if supports_created_at? + @internal_timestamp_columns << :updated_on if supports_updated_on? + @internal_timestamp_columns << :updated_at if supports_updated_at? + + @internal_timestamp_columns + end + + def base_columns + @base_columns ||= unique_index_columns + internal_columns + end + # @return [Boolean] true if no more data will be added to this InventoryCollection object, that usually happens # after the parsing step is finished def data_collection_finalized? diff --git a/app/models/manager_refresh/inventory_collection/data_storage.rb b/app/models/manager_refresh/inventory_collection/data_storage.rb index 14bacf7056c..19d305151db 100644 --- a/app/models/manager_refresh/inventory_collection/data_storage.rb +++ b/app/models/manager_refresh/inventory_collection/data_storage.rb @@ -106,6 +106,15 @@ def build(hash) inventory_object end + # Finds of builds a new InventoryObject with incomplete data. + # + # @param hash [Hash] Hash that needs to contain attributes defined in :manager_ref of the + # InventoryCollection + # @return [ManagerRefresh::InventoryObject] Found or built InventoryObject object + def build_partial(hash) + skeletal_primary_index.build(hash) + end + # Returns array of built InventoryObject objects # # @return [Array] Array of built InventoryObject objects diff --git a/app/models/manager_refresh/inventory_collection/index/type/base.rb b/app/models/manager_refresh/inventory_collection/index/type/base.rb index 9b9f3b659ab..6b1a7438298 100644 --- a/app/models/manager_refresh/inventory_collection/index/type/base.rb +++ b/app/models/manager_refresh/inventory_collection/index/type/base.rb @@ -35,6 +35,11 @@ def reindex! end end + # @return [Array] Returns index data + def index_data + index.values + end + # Find value based on index_value # # @param _index_value [String] a index_value of the InventoryObject we search for diff --git a/app/models/manager_refresh/inventory_collection/index/type/data.rb b/app/models/manager_refresh/inventory_collection/index/type/data.rb index af5c7505483..2b598bb25b8 100644 --- a/app/models/manager_refresh/inventory_collection/index/type/data.rb +++ b/app/models/manager_refresh/inventory_collection/index/type/data.rb @@ -9,6 +9,14 @@ class Data < ManagerRefresh::InventoryCollection::Index::Type::Base def find(index_value) index[index_value] end + + # Deletes and returns the value on the index_value + # + # @param index_value [String] a index_value of the InventoryObject we search for + # @return [InventoryObject|nil] Returns found value or nil + def delete(index_value) + index.delete(index_value) + end end end end diff --git a/app/models/manager_refresh/inventory_collection/index/type/skeletal.rb b/app/models/manager_refresh/inventory_collection/index/type/skeletal.rb index 34441016ce9..933fc0a3acf 100644 --- a/app/models/manager_refresh/inventory_collection/index/type/skeletal.rb +++ b/app/models/manager_refresh/inventory_collection/index/type/skeletal.rb @@ -18,6 +18,7 @@ def initialize(inventory_collection, index_name, attribute_names, primary_index) delegate :blank?, :each, + :each_value, :to => :index # Find value based on index_value @@ -36,21 +37,39 @@ def delete(index_value) index.delete(index_value) end - # Builds index record with skeletal InventoryObject and returns it, or returns nil if it's already present - # in primary_index or skeletal_primary_index + # Takes value from primary_index and inserts it to skeletal index + # + # @param index_value [String] a index_value of the InventoryObject we search for + # @return [InventoryObject|nil] Returns found value or nil + def skeletonize_primary_index(index_value) + inventory_object = primary_index.delete(index_value) + return unless inventory_object + fill_versions!(inventory_object.data) + + index[index_value] = inventory_object + end + + # Builds index record with skeletal InventoryObject and returns it. Or it returns existing InventoryObject + # that is found in primary_index or skeletal_primary_index. # # @param attributes [Hash] Skeletal data of the index, must contain unique index keys and everything else # needed for creating the record in the Database - # @return [InventoryObject|nil] Returns built value or nil + # @return [InventoryObject] Returns built InventoryObject or existing InventoryObject with new attributes + # assigned def build(attributes) attributes = {}.merge!(default_values).merge!(attributes) + fill_versions!(attributes) # If the primary index is already filled, we don't want populate skeletal index uuid = ::ManagerRefresh::InventoryCollection::Reference.build_stringified_reference(attributes, named_ref) - return if primary_index.find(uuid) + if (inventory_object = primary_index.find(uuid)) + return inventory_object.assign_attributes(attributes) + end # Return if skeletal index already exists - return if index[uuid] + if (inventory_object = index[uuid]) + return inventory_object.assign_attributes(attributes) + end # We want to populate a new skeletal index inventory_object = new_inventory_object(attributes) @@ -60,6 +79,33 @@ def build(attributes) private attr_reader :primary_index + + # Add versions columns into the passed attributes + # + # @param attributes [Hash] Attributes we want to extend with version related attributes + def fill_versions!(attributes) + if inventory_collection.supports_resource_timestamps_max? && attributes[:resource_timestamp] + fill_specific_version_attr(:resource_timestamps, :resource_timestamp, attributes) + elsif inventory_collection.supports_resource_versions_max? && attributes[:resource_version] + fill_specific_version_attr(:resource_versions, :resource_version, attributes) + end + end + + # Add specific versions columns into the passed attributes + # + # @param partial_row_version_attr [Symbol] Attr name for partial rows, allowed values are + # [:resource_timestamps, :resource_versions] + # @param full_row_version_attr [Symbol] Attr name for full rows, allowed values are + # [:resource_timestamp, :resource_version] + # @param attributes [Hash] Attributes we want to extend with version related attributes + def fill_specific_version_attr(partial_row_version_attr, full_row_version_attr, attributes) + # We have to symbolize, since serializing persistor makes these strings + (attributes[partial_row_version_attr] ||= {}).symbolize_keys! + + (attributes.keys - inventory_collection.base_columns).each do |key| + attributes[partial_row_version_attr][key] ||= attributes[full_row_version_attr] + end + end end end end diff --git a/app/models/manager_refresh/inventory_collection/serialization.rb b/app/models/manager_refresh/inventory_collection/serialization.rb index d3fc99d5df1..7efd77acdf6 100644 --- a/app/models/manager_refresh/inventory_collection/serialization.rb +++ b/app/models/manager_refresh/inventory_collection/serialization.rb @@ -8,6 +8,7 @@ class Serialization :inventory_object_lazy?, :inventory_object?, :name, + :skeletal_primary_index, :to => :inventory_collection attr_reader :inventory_collection @@ -30,6 +31,10 @@ def from_hash(inventory_objects_data, available_inventory_collections) build(hash_to_data(inventory_object_data, available_inventory_collections).symbolize_keys!) end + inventory_objects_data['partial_data'].each do |inventory_object_data| + skeletal_primary_index.build(hash_to_data(inventory_object_data, available_inventory_collections).symbolize_keys!) + end + # TODO(lsmola) add support for all_manager_uuids serialization # self.all_manager_uuids = inventory_objects_data['all_manager_uuids'] end @@ -43,7 +48,8 @@ def to_hash # TODO(lsmola) we do not support nested references here, should we? :manager_uuids => targeted_scope.primary_references.values.map(&:full_reference), :all_manager_uuids => all_manager_uuids, - :data => data.map { |x| data_to_hash(x.data) } + :data => data.map { |x| data_to_hash(x.data) }, + :partial_data => skeletal_primary_index.index_data.map { |x| data_to_hash(x.data) }, } end diff --git a/app/models/manager_refresh/inventory_object.rb b/app/models/manager_refresh/inventory_object.rb index 13234ece9be..2395e026310 100644 --- a/app/models/manager_refresh/inventory_object.rb +++ b/app/models/manager_refresh/inventory_object.rb @@ -137,7 +137,26 @@ def attributes_with_keys(inventory_collection_scope = nil, all_attribute_keys = # @param attributes [Hash] attributes we want to assign # @return [ManagerRefresh::InventoryObject] self def assign_attributes(attributes) - attributes.each { |k, v| public_send("#{k}=", v) } + attributes.each do |k, v| + # We don't want timestamps or resource versions to be overwritten here, since those are driving the conditions + next if %i(resource_timestamps resource_timestamps_max resource_timestamp).include?(k) + next if %i(resource_versions resource_versions_max resource_version).include?(k) + + if data[:resource_timestamp] && attributes[:resource_timestamp] + assign_only_newest(:resource_timestamp, :resource_timestamps, attributes, data, k, v) + elsif data[:resource_version] && attributes[:resource_version] + assign_only_newest(:resource_version, :resource_versions, attributes, data, k, v) + else + public_send("#{k}=", v) + end + end + + if attributes[:resource_timestamp] + assign_full_row_version_attr(:resource_timestamp, attributes, data) + elsif attributes[:resource_version] + assign_full_row_version_attr(:resource_version, attributes, data) + end + self end @@ -180,6 +199,64 @@ def self.add_attributes(inventory_object_attributes) private + # Assigns value based on the version attributes. If versions are specified, it asigns attribute only if it's + # newer than existing attribute. + # + # @param full_row_version_attr [Symbol] Attr name for full rows, allowed values are + # [:resource_timestamp, :resource_version] + # @param partial_row_version_attr [Symbol] Attr name for partial rows, allowed values are + # [:resource_timestamps, :resource_versions] + # @param attributes [Hash] New attributes we are assigning + # @param data [Hash] Existing attributes of the InventoryObject + # @param k [Symbol] Name of the attribute we are assigning + # @param v [Object] Value of the attribute we are assigning + def assign_only_newest(full_row_version_attr, partial_row_version_attr, attributes, data, k, v) + # If timestamps are in play, we will set only attributes that are newer + specific_attr_timestamp = attributes[partial_row_version_attr].try(:[], k) + specific_data_timestamp = data[partial_row_version_attr].try(:[], k) + + assign = if !specific_attr_timestamp + # Data have no timestamp, we will ignore the check + true + elsif specific_attr_timestamp && !specific_data_timestamp + # Data specific timestamp is nil and we have new specific timestamp + if data.key?(k) + if attributes[full_row_version_attr] >= data[full_row_version_attr] + # We can save if the full timestamp is bigger, if the data already contains the attribute + true + end + else + # Data do not contain the attribute, so we are saving the newest + true + end + true + elsif specific_attr_timestamp > specific_data_timestamp + # both partial timestamps are there, newer must be bigger + true + end + + if assign + public_send("#{k}=", v) # Attribute is newer than current one, lets use it + (data[partial_row_version_attr] ||= {})[k] = specific_attr_timestamp if specific_attr_timestamp # and set the latest timestamp + end + end + + # Assigns attribute representing version of the whole row + # + # @param full_row_version_attr [Symbol] Attr name for full rows, allowed values are + # [:resource_timestamp, :resource_version] + # @param attributes [Hash] New attributes we are assigning + # @param data [Hash] Existing attributes of the InventoryObject + def assign_full_row_version_attr(full_row_version_attr, attributes, data) + if attributes[full_row_version_attr] && data[full_row_version_attr] + # If both timestamps are present, store the bigger one + data[full_row_version_attr] = attributes[full_row_version_attr] if attributes[full_row_version_attr] > data[full_row_version_attr] + elsif attributes[full_row_version_attr] && !data[full_row_version_attr] + # We are assigning timestamp that was missing + data[full_row_version_attr] = attributes[full_row_version_attr] + end + end + # Return true passed key representing a getter is an association # # @param inventory_collection_scope [ManagerRefresh::InventoryCollection] diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index 8965f8c3ba2..a072c0139b7 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -14,9 +14,10 @@ def initialize(inventory_collection) # Private attrs @model_class = inventory_collection.model_class @table_name = @model_class.table_name + @q_table_name = get_connection.quote_table_name(@table_name) @primary_key = @model_class.primary_key @arel_primary_key = @model_class.arel_attribute(@primary_key) - @unique_index_keys = inventory_collection.manager_ref_to_cols.map(&:to_sym) + @unique_index_keys = inventory_collection.unique_index_keys @unique_index_keys_to_s = inventory_collection.manager_ref_to_cols.map(&:to_s) @select_keys = [@primary_key] + @unique_index_keys_to_s @unique_db_primary_keys = Set.new @@ -51,6 +52,7 @@ def initialize(inventory_collection) @deserializable_keys[key.to_sym] = attribute_type elsif attribute_type.respond_to?(:coder) || attribute_type.type == :int4range || + attribute_type.type == :jsonb || pg_type == "text[]" || pg_type == "character varying[]" # Identify columns that needs to be encoded by type.serialize(value), it's a costy operations so lets do @@ -90,6 +92,8 @@ def save_inventory_collection! # @return [Hash] modified hash from parameter attributes with casted values def values_for_database!(all_attribute_keys, attributes) all_attribute_keys.each do |key| + next unless attributes.key?(key) + if (type = serializable_keys[key]) attributes[key] = type.serialize(attributes[key]) end @@ -97,11 +101,25 @@ def values_for_database!(all_attribute_keys, attributes) attributes end + def transform_to_hash!(all_attribute_keys, hash) + if inventory_collection.use_ar_object? + record = inventory_collection.model_class.new(hash) + values_for_database!(all_attribute_keys, + record.attributes.slice(*record.changed_attributes.keys).symbolize_keys) + elsif serializable_keys? + values_for_database!(all_attribute_keys, + hash) + else + hash + end + end + private attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes, :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes, - :batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :deserializable_keys, :pg_types, :table_name + :batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :deserializable_keys, :pg_types, :table_name, + :q_table_name # Saves the InventoryCollection # @@ -275,6 +293,7 @@ def time_now # @param hash [Hash] data hash # @param update_time [Time] data hash def assign_attributes_for_update!(hash, update_time) + hash[:type] = model_class.name if supports_sti? && hash[:type].nil? hash[:updated_on] = update_time if supports_updated_on? hash[:updated_at] = update_time if supports_updated_at? end @@ -284,24 +303,13 @@ def assign_attributes_for_update!(hash, update_time) # @param hash [Hash] data hash # @param create_time [Time] data hash def assign_attributes_for_create!(hash, create_time) - hash[:type] = model_class.name if supports_sti? && hash[:type].nil? hash[:created_on] = create_time if supports_created_on? hash[:created_at] = create_time if supports_created_at? assign_attributes_for_update!(hash, create_time) end - # @return [Array] array of all unique indexes known to model - def unique_indexes - @unique_indexes_cache if @unique_indexes_cache - - @unique_indexes_cache = model_class.connection.indexes(model_class.table_name).select(&:unique) - - if @unique_indexes_cache.blank? - raise "#{inventory_collection} and its table #{model_class.table_name} must have a unique index defined, to"\ - " be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch." - end - - @unique_indexes_cache + def internal_columns + @internal_columns ||= inventory_collection.internal_columns end # Finds an index that fits the list of columns (keys) the best @@ -310,64 +318,44 @@ def unique_indexes # @raise [Exception] if the unique index for the columns was not found # @return [ActiveRecord::ConnectionAdapters::IndexDefinition] unique index fitting the keys def unique_index_for(keys) - @unique_index_for_keys_cache ||= {} - @unique_index_for_keys_cache[keys] if @unique_index_for_keys_cache[keys] - - # Find all uniq indexes that that are covering our keys - uniq_key_candidates = unique_indexes.each_with_object([]) { |i, obj| obj << i if (keys - i.columns.map(&:to_sym)).empty? } - - if @unique_indexes_cache.blank? - raise "#{inventory_collection} and its table #{model_class.table_name} must have a unique index defined "\ - "covering columns #{keys} to be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch." - end - - # Take the uniq key having the least number of columns - @unique_index_for_keys_cache[keys] = uniq_key_candidates.min_by { |x| x.columns.count } + inventory_collection.unique_index_for(keys) end # @return [Array] all columns that are part of the best fit unique index def unique_index_columns - return @unique_index_columns if @unique_index_columns + @unique_index_columns ||= inventory_collection.unique_index_columns + end - @unique_index_columns = unique_index_for(unique_index_keys).columns.map(&:to_sym) + # @return [Array] all columns that are part of the best fit unique index + def unique_index_columns_to_s + return @unique_index_columns_to_s if @unique_index_columns_to_s + + @unique_index_columns_to_s = unique_index_columns.map(&:to_s) end # @return [Boolean] true if the model_class supports STI def supports_sti? - @supports_sti_cache = model_class.column_names.include?("type") if @supports_sti_cache.nil? - @supports_sti_cache + @supports_sti_cache ||= inventory_collection.supports_sti? end # @return [Boolean] true if the model_class has created_on column def supports_created_on? - if @supports_created_on_cache.nil? - @supports_created_on_cache = (model_class.column_names.include?("created_on") && ActiveRecord::Base.record_timestamps) - end - @supports_created_on_cache + @supports_created_on_cache ||= inventory_collection.supports_created_on? end # @return [Boolean] true if the model_class has updated_on column def supports_updated_on? - if @supports_updated_on_cache.nil? - @supports_updated_on_cache = (model_class.column_names.include?("updated_on") && ActiveRecord::Base.record_timestamps) - end - @supports_updated_on_cache + @supports_updated_on_cache ||= inventory_collection.supports_updated_on? end # @return [Boolean] true if the model_class has created_at column def supports_created_at? - if @supports_created_at_cache.nil? - @supports_created_at_cache = (model_class.column_names.include?("created_at") && ActiveRecord::Base.record_timestamps) - end - @supports_created_at_cache + @supports_created_at_cache ||= inventory_collection.supports_created_at? end # @return [Boolean] true if the model_class has updated_at column def supports_updated_at? - if @supports_updated_at_cache.nil? - @supports_updated_at_cache = (model_class.column_names.include?("updated_at") && ActiveRecord::Base.record_timestamps) - end - @supports_updated_at_cache + @supports_updated_at_cache ||= inventory_collection.supports_updated_at? end # @return [Boolean] true if any serializable keys are present @@ -375,9 +363,24 @@ def serializable_keys? @serializable_keys_bool_cache ||= serializable_keys.present? end - # @return [Boolean] true if the model_class has remote_data_timestamp column + # @return [Boolean] true if the model_class has resource_timestamp column def supports_remote_data_timestamp?(all_attribute_keys) - all_attribute_keys.include?(:remote_data_timestamp) # include? on Set is O(1) + all_attribute_keys.include?(:resource_timestamp) # include? on Set is O(1) + end + + # @return [Boolean] true if the model_class has resource_version column + def supports_remote_data_version?(all_attribute_keys) + all_attribute_keys.include?(:resource_version) # include? on Set is O(1) + end + + # @return [Boolean] true if the model_class has resource_timestamps column + def supports_resource_timestamps_max? + @supports_resource_timestamps_max_cache ||= inventory_collection.supports_resource_timestamps_max? + end + + # @return [Boolean] true if the model_class has resource_versions column + def supports_resource_versions_max? + @supports_resource_versions_max_cache ||= inventory_collection.supports_resource_versions_max? end end end diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb index 4c6c0bf832d..c6db87142e3 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe.rb @@ -13,7 +13,7 @@ class ConcurrentSafe < ManagerRefresh::SaveCollection::Saver::Base # key value def update_record!(record, hash, inventory_object) assign_attributes_for_update!(hash, time_now) - record.assign_attributes(hash.except(:id, :type)) + record.assign_attributes(hash.except(:id)) if !inventory_object.inventory_collection.check_changed? || record.changed? update_query = inventory_object.inventory_collection.model_class.where(:id => record.id) @@ -39,6 +39,7 @@ def create_record!(hash, inventory_object) data = inventory_collection.model_class.new(hash).attributes.symbolize_keys # TODO(lsmola) abstract common behavior into base class + all_attribute_keys << :type if supports_sti? all_attribute_keys << :created_at if supports_created_at? all_attribute_keys << :updated_at if supports_updated_at? all_attribute_keys << :created_on if supports_created_on? diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index b6c76e196e2..9e984aebe03 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -92,6 +92,7 @@ def save!(association) all_attribute_keys << :updated_at if supports_updated_at? all_attribute_keys << :created_on if supports_created_on? all_attribute_keys << :updated_on if supports_updated_on? + all_attribute_keys << :type if supports_sti? _log.debug("Processing #{inventory_collection} of size #{inventory_collection.size}...") @@ -103,7 +104,6 @@ def save!(association) inventory_collection.custom_reconnect_block&.call(inventory_collection, inventory_objects_index, attributes_index) end - all_attribute_keys << :type if supports_sti? # Records that were not found in the DB but sent for saving, we will be creating these in the DB. if inventory_collection.create_allowed? on_conflict = inventory_collection.parallel_safe? ? :do_update : nil @@ -117,21 +117,7 @@ def save!(association) attributes_index = nil if inventory_collection.parallel_safe? - # We will create also remaining skeletal records - skeletal_attributes_index = {} - skeletal_inventory_objects_index = {} - - inventory_collection.skeletal_primary_index.each_value do |inventory_object| - attributes = inventory_object.attributes_with_keys(inventory_collection, all_attribute_keys) - index = build_stringified_reference(attributes, unique_index_keys) - - skeletal_attributes_index[index] = attributes - skeletal_inventory_objects_index[index] = inventory_object - end - - skeletal_inventory_objects_index.each_slice(batch_size_for_persisting) do |batch| - create_records!(all_attribute_keys, batch, skeletal_attributes_index, :on_conflict => :do_nothing) - end + create_or_update_partial_records(all_attribute_keys) end end _log.debug("Processing #{inventory_collection}, "\ @@ -155,6 +141,7 @@ def save!(association) def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys) hashes_for_update = [] records_for_destroy = [] + indexed_inventory_objects = {} records_batch_iterator.find_in_batches(:batch_size => batch_size) do |batch| update_time = time_now @@ -164,22 +151,7 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, next unless assert_distinct_relation(primary_key_value) - # Incoming values are in SQL string form. - # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection - # TODO(lsmola) maybe we can drop the whole pure sql fetching, since everything will be targeted refresh - # with streaming refresh? Maybe just metrics and events will not be, but those should be upsert only - index = unique_index_keys_to_s.map do |attribute| - value = record_key(record, attribute) - if attribute == "timestamp" - # TODO: can this be covered by @deserializable_keys? - type = model_class.type_for_attribute(attribute) - type.cast(value).utc.iso8601.to_s - elsif (type = deserializable_keys[attribute.to_sym]) - type.deserialize(value).to_s - else - value.to_s - end - end.join("__") + index = db_columns_index(record) inventory_object = inventory_objects_index.delete(index) hash = attributes_index.delete(index) @@ -195,8 +167,23 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, next unless assert_referential_integrity(hash) inventory_object.id = primary_key_value + if inventory_collection.parallel_safe? && + (supports_remote_data_timestamp?(all_attribute_keys) || supports_remote_data_version?(all_attribute_keys)) + + version_attr, max_version_attr = if supports_remote_data_timestamp?(all_attribute_keys) + [:resource_timestamp, :resource_timestamps_max] + elsif supports_remote_data_version?(all_attribute_keys) + [:resource_version, :resource_versions_max] + end + + next if skeletonize_or_skip_record(record.try(version_attr) || record.try(:[], version_attr), + hash[version_attr], + record.try(max_version_attr) || record.try(:[], max_version_attr), + inventory_object) + end + hash_for_update = if inventory_collection.use_ar_object? - record.assign_attributes(hash.except(:id, :type)) + record.assign_attributes(hash.except(:id)) values_for_database!(all_attribute_keys, record.attributes.symbolize_keys) elsif serializable_keys? @@ -212,15 +199,17 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, assign_attributes_for_update!(hash_for_update, update_time) hash_for_update[:id] = primary_key_value + indexed_inventory_objects[index] = inventory_object hashes_for_update << hash_for_update end end # Update in batches if hashes_for_update.size >= batch_size_for_persisting - update_records!(all_attribute_keys, hashes_for_update) + update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects) hashes_for_update = [] + indexed_inventory_objects = {} end # Destroy in batches @@ -231,7 +220,7 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, end # Update the last batch - update_records!(all_attribute_keys, hashes_for_update) + update_records!(all_attribute_keys, hashes_for_update, indexed_inventory_objects) hashes_for_update = [] # Cleanup so GC can release it sooner # Destroy the last batch @@ -239,6 +228,36 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, records_for_destroy = [] # Cleanup so GC can release it sooner end + def db_columns_index(record, pure_sql: false) + # Incoming values are in SQL string form. + # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection + # TODO(lsmola) maybe we can drop the whole pure sql fetching, since everything will be targeted refresh + # with streaming refresh? Maybe just metrics and events will not be, but those should be upsert only + # TODO(lsmola) taking ^ in account, we can't drop pure sql, since that is returned by batch insert and + # update queries + unique_index_keys_to_s.map do |attribute| + value = if pure_sql + record[attribute] + else + record_key(record, attribute) + end + + format_value(attribute, value) + end.join("__") + end + + def format_value(attribute, value) + if attribute == "timestamp" + # TODO: can this be covered by @deserializable_keys? + type = model_class.type_for_attribute(attribute) + type.cast(value).utc.iso8601.to_s + elsif (type = deserializable_keys[attribute.to_sym]) + type.deserialize(value).to_s + else + value.to_s + end + end + # Deletes or sof-deletes records. If the model_class supports a custom class delete method, we will use it for # batch soft-delete. # @@ -276,11 +295,172 @@ def destroy_records!(records) # # @param hashes [Array] data used for building a batch update sql query # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row - def update_records!(all_attribute_keys, hashes) + def update_records!(all_attribute_keys, hashes, indexed_inventory_objects) return if hashes.blank? - inventory_collection.store_updated_records(hashes) + + unless inventory_collection.parallel_safe? + # We need to update the stored records before we save it, since hashes are modified + inventory_collection.store_updated_records(hashes) + end + query = build_update_query(all_attribute_keys, hashes) - get_connection.execute(query) + result = get_connection.execute(query) + + if inventory_collection.parallel_safe? + # We will check for timestamp clashes of full row update and we will fallback to skeletal update + inventory_collection.store_updated_records(result) + + skeletonize_ignored_records!(indexed_inventory_objects, result) + end + + result + end + + # Taking result from update or upsert of the row. The records that were not saved will be turned into skeletal + # records and we will save them attribute by attribute. + # + # @param hash [Hash{String => InventoryObject}>] Hash with indexed data we want to save + # @param result [Array] Result from the DB containing the data that were actually saved + # @param all_unique_columns [Boolean] True if index is consisted from all columns of the unique index. False if + # index is just made from manager_ref turned in DB column names. + def skeletonize_ignored_records!(hash, result, all_unique_columns: false) + updated = if all_unique_columns + result.map { |x| unique_index_columns_to_s.map { |key| x[key] } } + else + result.map { |x| db_columns_index(x, :pure_sql => true) } + end + + updated.each { |x| hash.delete(x) } + + # Now lets skeletonize all inventory_objects that were not saved by update or upsert. Old rows that can't be + # saved are not being sent here. We have only rows that are new, but become old as we send the query (so other + # parallel process saved the data in the meantime). Or if some attributes are newer than the whole row + # being sent. + hash.each_key do |db_index| + inventory_collection.skeletal_primary_index.skeletonize_primary_index(hash[db_index].manager_uuid) + end + end + + # Saves partial records using upsert, taking records from skeletal_primary_index. This is used both for + # skeletal precreate as well as for saving partial rows. + # + # @param all_attribute_keys [Set] Superset of all keys of all records being saved + def create_or_update_partial_records(all_attribute_keys) + skeletal_attributes_index = {} + skeletal_inventory_objects_index = {} + + inventory_collection.skeletal_primary_index.each_value do |inventory_object| + attributes = inventory_object.attributes_with_keys(inventory_collection, all_attribute_keys) + index = build_stringified_reference(attributes, unique_index_keys) + + skeletal_attributes_index[index] = attributes + skeletal_inventory_objects_index[index] = inventory_object + end + + if supports_remote_data_timestamp?(all_attribute_keys) + all_attribute_keys << :resource_timestamps + all_attribute_keys << :resource_timestamps_max + elsif supports_remote_data_version?(all_attribute_keys) + all_attribute_keys << :resource_versions + all_attribute_keys << :resource_versions_max + end + + indexed_inventory_objects = {} + hashes = [] + create_time = time_now + + skeletal_inventory_objects_index.each do |index, inventory_object| + hash = skeletal_attributes_index.delete(index) + # Partial create or update must never set a timestamp for the whole row + timestamps = if supports_remote_data_timestamp?(all_attribute_keys) && supports_resource_timestamps_max? + assign_partial_row_version_attributes!(:resource_timestamp, + :resource_timestamps, + :resource_timestamps_max, + hash, + all_attribute_keys) + elsif supports_remote_data_version?(all_attribute_keys) && supports_resource_versions_max? + assign_partial_row_version_attributes!(:resource_version, + :resource_versions, + :resource_versions_max, + hash, + all_attribute_keys) + end + # Transform hash to DB format + hash = transform_to_hash!(all_attribute_keys, hash) + + assign_attributes_for_create!(hash, create_time) + + next unless assert_referential_integrity(hash) + + hash[:__non_serialized_versions] = timestamps # store non serialized timestamps for the partial updates + hashes << hash + # Index on Unique Columns values, so we can easily fill in the :id later + indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object + end + + return if hashes.blank? + + # First, lets try to create all partial records + hashes.each_slice(batch_size_for_persisting) do |batch| + result = create_partial!(all_attribute_keys, + batch, + :on_conflict => :do_nothing) + inventory_collection.store_created_records(result) + end + + # We need only skeletal records with timestamp. We can't save the ones without timestamp, because e.g. skeletal + # precreate would be updating records with default values, that are not correct. + pre_filtered = hashes.select { |x| x[:resource_timestamps_max] || x[:resource_versions_max] } + + results = {} + # TODO(lsmola) we don't need to process rows that were save by the create -> oncoflict do nothing + (all_attribute_keys - inventory_collection.base_columns).each do |column_name| + filtered = pre_filtered.select { |x| x.key?(column_name) } + + filtered.each_slice(batch_size_for_persisting) do |batch| + # We need to set correct timestamps_max for this particular attribute, based on what is in timestamps + if supports_remote_data_timestamp?(all_attribute_keys) + batch.each { |x| x[:resource_timestamps_max] = x[:__non_serialized_versions][column_name] if x[:__non_serialized_versions][column_name] } + elsif supports_remote_data_version?(all_attribute_keys) + batch.each { |x| x[:resource_versions_max] = x[:__non_serialized_versions][column_name] if x[:__non_serialized_versions][column_name] } + end + + result = create_partial!(inventory_collection.base_columns + [column_name], + batch, + :on_conflict => :do_update, + :column_name => column_name) + result.each do |res| + results[res["id"]] = res + end + end + end + + inventory_collection.store_updated_records(results.values) + + # TODO(lsmola) we need to move here the hash loading ar object etc. otherwise the lazy_find with key will not + # be correct + if inventory_collection.dependees.present? + # We need to get primary keys of the created objects, but only if there are dependees that would use them + map_ids_to_inventory_objects(indexed_inventory_objects, + all_attribute_keys, + hashes, + nil, + :on_conflict => :do_nothing) + end + end + + # Batch upserts 1 data column of the row, plus the internal columns + # + # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row + # @param hashes [Array] Array of InventoryObject objects we will be inserting + # into the DB + # @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values + # are :do_update, :do_nothing, nil + # @param column_name [Symbol] Name of the data column we will be upserting + def create_partial!(all_attribute_keys, hashes, on_conflict: nil, column_name: nil) + get_connection.execute( + build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict, :mode => :partial, :column_name => column_name) + ) end # Batch inserts records using attributes_index data. With on_conflict option using :do_update, this method @@ -299,14 +479,14 @@ def create_records!(all_attribute_keys, batch, attributes_index, on_conflict: ni create_time = time_now batch.each do |index, inventory_object| hash = if inventory_collection.use_ar_object? - record = inventory_collection.model_class.new(attributes_index.delete(index)) + record = inventory_collection.model_class.new(attributes_index[index]) values_for_database!(all_attribute_keys, record.attributes.symbolize_keys) elsif serializable_keys? values_for_database!(all_attribute_keys, - attributes_index.delete(index)) + attributes_index[index]) else - attributes_index.delete(index) + attributes_index[index] end assign_attributes_for_create!(hash, create_time) @@ -321,12 +501,41 @@ def create_records!(all_attribute_keys, batch, attributes_index, on_conflict: ni return if hashes.blank? result = get_connection.execute( - build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict) + build_insert_query(all_attribute_keys, hashes, :on_conflict => on_conflict, :mode => :full) ) - inventory_collection.store_created_records(result) + + if inventory_collection.parallel_safe? + # We've done upsert, so records were either created or updated. We can recognize that by checking if + # created and updated timestamps are the same + created_attr = "created_on" if inventory_collection.supports_created_on? + created_attr ||= "created_at" if inventory_collection.supports_created_at? + updated_attr = "updated_on" if inventory_collection.supports_updated_on? + updated_attr ||= "updated_at" if inventory_collection.supports_updated_at? + + if created_attr && updated_attr + created, updated = result.to_a.partition { |x| x[created_attr] == x[updated_attr] } + inventory_collection.store_created_records(created) + inventory_collection.store_updated_records(updated) + else + # The record doesn't have both created and updated attrs, so we'll take all as created + inventory_collection.store_created_records(result) + end + else + # We've done just insert, so all records were created + inventory_collection.store_created_records(result) + end + if inventory_collection.dependees.present? # We need to get primary keys of the created objects, but only if there are dependees that would use them - map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result, :on_conflict => on_conflict) + map_ids_to_inventory_objects(indexed_inventory_objects, + all_attribute_keys, + hashes, + result, + :on_conflict => on_conflict) + end + + if inventory_collection.parallel_safe? + skeletonize_ignored_records!(indexed_inventory_objects, result, :all_unique_columns => true) end end @@ -342,6 +551,8 @@ def create_records!(all_attribute_keys, batch, attributes_index, on_conflict: ni # are :do_update, :do_nothing, nil def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result, on_conflict:) if on_conflict == :do_nothing + # TODO(lsmola) is the comment below still accurate? We will update some partial rows, the actual skeletal + # precreate will still do nothing. # For ON CONFLICT DO NOTHING, we need to always fetch the records plus the attribute_references. This path # applies only for skeletal precreate. inventory_collection.model_class.where( @@ -390,6 +601,29 @@ def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, end end end + + def skeletonize_or_skip_record(record_version, hash_version, record_versions_max, inventory_object) + # Skip updating this record, because it is old + return true if record_version && hash_version && record_version >= hash_version + + # Some column has bigger version than the whole row, we need to store the row partially + if record_versions_max && hash_version && record_versions_max > hash_version + inventory_collection.skeletal_primary_index.skeletonize_primary_index(inventory_object.manager_uuid) + return true + end + + false + end + + def assign_partial_row_version_attributes!(full_row_version_attr, partial_row_version_attr, + partial_row_version_attr_max, hash, all_attribute_keys) + hash[partial_row_version_attr_max] = hash.delete(full_row_version_attr) + + if hash[partial_row_version_attr].present? + # Lets clean to only what we save, since when we build the skeletal object, we can set more + hash[partial_row_version_attr] = hash[partial_row_version_attr].slice(*all_attribute_keys) + end + end end end end diff --git a/app/models/manager_refresh/save_collection/saver/default.rb b/app/models/manager_refresh/save_collection/saver/default.rb index cd414954855..19de0bd5468 100644 --- a/app/models/manager_refresh/save_collection/saver/default.rb +++ b/app/models/manager_refresh/save_collection/saver/default.rb @@ -10,7 +10,7 @@ class Default < ManagerRefresh::SaveCollection::Saver::Base # @param inventory_object [ManagerRefresh::InventoryObject] InventoryObject instance where we will store primary # key value def update_record!(record, hash, inventory_object) - record.assign_attributes(hash.except(:id, :type)) + record.assign_attributes(hash.except(:id)) if !inventory_collection.check_changed? || record.changed? record.save inventory_collection.store_updated_records(record) diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index 28cba2d664a..15858923c75 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -4,80 +4,11 @@ module SqlHelper # TODO(lsmola) all below methods should be rewritten to arel, but we need to first extend arel to be able to do # this - # Builds ON CONFLICT UPDATE updating branch for one column identified by the passed key - # - # @param key [Symbol] key that is column name - # @return [String] SQL clause for upserting one column - def build_insert_set_cols(key) - "#{quote_column_name(key)} = EXCLUDED.#{quote_column_name(key)}" - end - - # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row - # @param hashes [Array] data used for building a batch insert sql query - # @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values - # are :do_update, :do_nothing, nil - def build_insert_query(all_attribute_keys, hashes, on_conflict: nil) - # Cache the connection for the batch - connection = get_connection - - # Make sure we don't send a primary_key for INSERT in any form, it could break PG sequencer - all_attribute_keys_array = all_attribute_keys.to_a - [primary_key.to_s, primary_key.to_sym] - values = hashes.map do |hash| - "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x) }.join(",")})" - end.join(",") - col_names = all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",") - - insert_query = %{ - INSERT INTO #{table_name} (#{col_names}) - VALUES - #{values} - } - - if inventory_collection.parallel_safe? - if on_conflict == :do_nothing - insert_query += %{ - ON CONFLICT DO NOTHING - } - elsif on_conflict == :do_update - index_where_condition = unique_index_for(unique_index_keys).where - where_to_sql = index_where_condition ? "WHERE #{index_where_condition}" : "" - - insert_query += %{ - ON CONFLICT (#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}) #{where_to_sql} - DO - UPDATE - SET #{all_attribute_keys_array.map { |key| build_insert_set_cols(key) }.join(", ")} - } - end - end - - # TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change - # the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of - # changing ems_id could lead to putting it back by a refresh. - # TODO(lsmola) should we add :deleted => false to the update clause? That should handle a reconnect, without a - # a need to list :deleted anywhere in the parser. We just need to check that a model has the :deleted attribute - - # This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a - # part of the data, since for the fake records, we just want to update ems_ref. - if supports_remote_data_timestamp?(all_attribute_keys) - insert_query += %{ - WHERE EXCLUDED.remote_data_timestamp IS NULL OR (EXCLUDED.remote_data_timestamp > #{table_name}.remote_data_timestamp) - } - end - - insert_query += %{ - RETURNING "id",#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")} - } + extend ActiveSupport::Concern - insert_query - end - - # Builds update clause for one column identified by the passed key - # - # @param key [Symbol] key that is column name - # @return [String] SQL clause for updating one column - def build_update_set_cols(key) - "#{quote_column_name(key)} = updated_values.#{quote_column_name(key)}" + included do + include SqlHelperUpsert + include SqlHelperUpdate end # Returns quoted column name @@ -92,47 +23,6 @@ def get_connection ActiveRecord::Base.connection end - # Build batch update query - # - # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row - # @param hashes [Array] data used for building a batch update sql query - def build_update_query(all_attribute_keys, hashes) - # Cache the connection for the batch - connection = get_connection - - # We want to ignore type and create timestamps when updating - all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(type created_at created_on).include?(x) } - all_attribute_keys_array << :id - - values = hashes.map! do |hash| - "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x, true) }.join(",")})" - end.join(",") - - update_query = %{ - UPDATE #{table_name} - SET - #{all_attribute_keys_array.map { |key| build_update_set_cols(key) }.join(",")} - FROM ( - VALUES - #{values} - ) AS updated_values (#{all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",")}) - WHERE updated_values.id = #{table_name}.id - } - - # TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change - # the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of - # changing ems_id could lead to putting it back by a refresh. - - # This conditional will avoid rewriting new data by old data. But we want it only when remote_data_timestamp is a - # part of the data, since for the fake records, we just want to update ems_ref. - if supports_remote_data_timestamp?(all_attribute_keys) - update_query += %{ - AND (updated_values.remote_data_timestamp IS NULL OR (updated_values.remote_data_timestamp > #{table_name}.remote_data_timestamp)) - } - end - update_query - end - # Builds a multiselection conditions like (table1.a = a1 AND table2.b = b1) OR (table1.a = a2 AND table2.b = b2) # # @param hashes [Array] data we want to use for the query diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper_update.rb b/app/models/manager_refresh/save_collection/saver/sql_helper_update.rb new file mode 100644 index 00000000000..9b62bec87da --- /dev/null +++ b/app/models/manager_refresh/save_collection/saver/sql_helper_update.rb @@ -0,0 +1,120 @@ +module ManagerRefresh::SaveCollection + module Saver + module SqlHelperUpdate + # Builds update clause for one column identified by the passed key + # + # @param key [Symbol] key that is column name + # @return [String] SQL clause for updating one column + def build_update_set_cols(key) + "#{quote_column_name(key)} = updated_values.#{quote_column_name(key)}" + end + + # Build batch update query + # + # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row + # @param hashes [Array] data used for building a batch update sql query + def build_update_query(all_attribute_keys, hashes) + _log.debug("Building update query for #{inventory_collection} of size #{inventory_collection.size}...") + # Cache the connection for the batch + connection = get_connection + + # We want to ignore create timestamps when updating + all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(created_at created_on).include?(x) } + all_attribute_keys_array << :id + + # If there is not version attribute, the version conditions will be ignored + version_attribute = if inventory_collection.parallel_safe? && supports_remote_data_timestamp?(all_attribute_keys) + :resource_timestamp + elsif inventory_collection.parallel_safe? && supports_remote_data_version?(all_attribute_keys) + :resource_version + end + + update_query = update_query_beginning(all_attribute_keys_array) + update_query += update_query_reset_version_columns(version_attribute) + update_query += update_query_from_values(hashes, all_attribute_keys_array, connection) + update_query += update_query_version_conditions(version_attribute) + update_query += update_query_returning + + _log.debug("Building update query for #{inventory_collection} of size #{inventory_collection.size}...Complete") + + update_query + end + + private + + def update_query_beginning(all_attribute_keys_array) + <<-SQL + UPDATE #{table_name} + SET + #{all_attribute_keys_array.map { |key| build_update_set_cols(key) }.join(",")} + SQL + end + + def update_query_reset_version_columns(version_attribute) + if version_attribute + attr_partial = version_attribute.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps + attr_partial_max = "#{attr_partial}_max" + + # Quote the column names + attr_partial = quote_column_name(attr_partial) + attr_partial_max = quote_column_name(attr_partial_max) + + # Full row update will reset the partial update timestamps + <<-SQL + , #{attr_partial} = '{}', #{attr_partial_max} = NULL + SQL + else + "" + end + end + + def update_query_from_values(hashes, all_attribute_keys_array, connection) + values = hashes.map! do |hash| + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x, true) }.join(",")})" + end.join(",") + + <<-SQL + FROM ( + VALUES + #{values} + ) AS updated_values (#{all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",")}) + WHERE updated_values.id = #{q_table_name}.id + SQL + end + + def update_query_version_conditions(version_attribute) + if version_attribute + # This conditional will avoid rewriting new data by old data. But we want it only when version_attribute is + # a part of the data, since for the fake records, we just want to update ems_ref. + attr_partial = version_attribute.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps + attr_partial_max = "#{attr_partial}_max" + + # Quote the column names + attr_full = quote_column_name(version_attribute) + attr_partial_max = quote_column_name(attr_partial_max) + + <<-SQL + AND ( + updated_values.#{attr_full} IS NULL OR ( + (#{q_table_name}.#{attr_full} IS NULL OR updated_values.#{attr_full} > #{q_table_name}.#{attr_full}) AND + (#{q_table_name}.#{attr_partial_max} IS NULL OR updated_values.#{attr_full} >= #{q_table_name}.#{attr_partial_max}) + ) + ) + SQL + else + "" + end + end + + def update_query_returning + if inventory_collection.parallel_safe? + <<-SQL + RETURNING updated_values.#{quote_column_name("id")}, #{unique_index_columns.map { |x| "updated_values.#{quote_column_name(x)}" }.join(",")} + SQL + else + "" + end + end + end + end +end diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper_upsert.rb b/app/models/manager_refresh/save_collection/saver/sql_helper_upsert.rb new file mode 100644 index 00000000000..f940219d1f6 --- /dev/null +++ b/app/models/manager_refresh/save_collection/saver/sql_helper_upsert.rb @@ -0,0 +1,196 @@ +module ManagerRefresh::SaveCollection + module Saver + module SqlHelperUpsert + # Builds ON CONFLICT UPDATE updating branch for one column identified by the passed key + # + # @param key [Symbol] key that is column name + # @return [String] SQL clause for upserting one column + def build_insert_set_cols(key) + "#{quote_column_name(key)} = EXCLUDED.#{quote_column_name(key)}" + end + + # @param all_attribute_keys [Array] Array of all columns we will be saving into each table row + # @param hashes [Array] data used for building a batch insert sql query + # @param mode [Symbol] Mode for saving, allowed values are [:full, :partial], :full is when we save all + # columns of a row, :partial is when we save only few columns, so a partial row. + # @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values + # are :do_update, :do_nothing, nil + def build_insert_query(all_attribute_keys, hashes, on_conflict: nil, mode:, column_name: nil) + _log.debug("Building insert query for #{inventory_collection} of size #{inventory_collection.size}...") + + # Cache the connection for the batch + connection = get_connection + # Ignore versioning columns that are set separately + ignore_cols = mode == :partial ? [:resource_timestamp, :resource_version] : [] + # Make sure we don't send a primary_key for INSERT in any form, it could break PG sequencer + all_attribute_keys_array = all_attribute_keys.to_a - [primary_key.to_s, primary_key.to_sym] - ignore_cols + + insert_query = insert_query_insert_values(hashes, all_attribute_keys_array, connection) + insert_query += insert_query_on_conflict_behavior(all_attribute_keys, on_conflict, mode, ignore_cols, column_name) + insert_query += insert_query_returning + + _log.debug("Building insert query for #{inventory_collection} of size #{inventory_collection.size}...Complete") + + insert_query + end + + private + + def insert_query_insert_values(hashes, all_attribute_keys_array, connection) + values = hashes.map do |hash| + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x) }.join(",")})" + end.join(",") + + col_names = all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",") + + <<-SQL + INSERT INTO #{q_table_name} (#{col_names}) + VALUES + #{values} + SQL + end + + def insert_query_on_conflict_behavior(all_attribute_keys, on_conflict, mode, ignore_cols, column_name) + return "" unless inventory_collection.parallel_safe? + + insert_query_on_conflict = insert_query_on_conflict_do(on_conflict) + if on_conflict == :do_update + insert_query_on_conflict += insert_query_on_conflict_update(all_attribute_keys, mode, ignore_cols, column_name) + end + insert_query_on_conflict + end + + def insert_query_on_conflict_do(on_conflict) + if on_conflict == :do_nothing + <<-SQL + ON CONFLICT DO NOTHING + SQL + elsif on_conflict == :do_update + index_where_condition = unique_index_for(unique_index_keys).where + where_to_sql = index_where_condition ? "WHERE #{index_where_condition}" : "" + + <<-SQL + ON CONFLICT (#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}) #{where_to_sql} + DO + UPDATE + SQL + end + end + + def insert_query_on_conflict_update(all_attribute_keys, mode, ignore_cols, column_name) + if mode == :partial + ignore_cols += [:resource_timestamps, :resource_timestamps_max, :resource_versions, :resource_versions_max] + end + ignore_cols += [:created_on, :created_at] # Lets not change created for the update clause + + # If there is not version attribute, the update part will be ignored below + version_attribute = if supports_remote_data_timestamp?(all_attribute_keys) + :resource_timestamp + elsif supports_remote_data_version?(all_attribute_keys) + :resource_version + end + + # TODO(lsmola) should we add :deleted => false to the update clause? That should handle a reconnect, without a + # a need to list :deleted anywhere in the parser. We just need to check that a model has the :deleted attribute + query = <<-SQL + SET #{(all_attribute_keys - ignore_cols).map { |key| build_insert_set_cols(key) }.join(", ")} + SQL + + # This conditional will make sure we are avoiding rewriting new data by old data. But we want it only when + # remote_data_timestamp is a part of the data. + query += insert_query_on_conflict_update_mode(mode, version_attribute, column_name) if version_attribute + query + end + + def insert_query_on_conflict_update_mode(mode, version_attribute, column_name) + if mode == :full + full_update_condition(version_attribute) + elsif mode == :partial + raise "Column name must be provided" unless column_name + partial_update_condition(version_attribute, column_name) + end + end + + def full_update_condition(attr_full) + attr_partial = attr_full.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps + attr_partial_max = "#{attr_partial}_max" + + # Quote the column names + attr_full = quote_column_name(attr_full) + attr_partial = quote_column_name(attr_partial) + attr_partial_max = quote_column_name(attr_partial_max) + + <<-SQL + , #{attr_partial} = '{}', #{attr_partial_max} = NULL + + WHERE EXCLUDED.#{attr_full} IS NULL OR ( + (#{q_table_name}.#{attr_full} IS NULL OR EXCLUDED.#{attr_full} > #{q_table_name}.#{attr_full}) AND + (#{q_table_name}.#{attr_partial_max} IS NULL OR EXCLUDED.#{attr_full} >= #{q_table_name}.#{attr_partial_max}) + ) + SQL + end + + def partial_update_condition(attr_full, column_name) + attr_partial = attr_full.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps + attr_partial_max = "#{attr_partial}_max" + cast = if attr_full == :resource_timestamp + "timestamp" + elsif attr_full == :resource_version + "integer" + end + + # Quote the column names + attr_full = quote_column_name(attr_full) + attr_partial = quote_column_name(attr_partial) + attr_partial_max = quote_column_name(attr_partial_max) + column_name = get_connection.quote_string(column_name.to_s) + q_table_name = get_connection.quote_table_name(table_name) + + <<-SQL + #{insert_query_set_jsonb_version(cast, attr_partial, attr_partial_max, column_name)} + , #{attr_partial_max} = greatest(#{q_table_name}.#{attr_partial_max}::#{cast}, EXCLUDED.#{attr_partial_max}::#{cast}) + WHERE EXCLUDED.#{attr_partial_max} IS NULL OR ( + (#{q_table_name}.#{attr_full} IS NULL OR EXCLUDED.#{attr_partial_max} > #{q_table_name}.#{attr_full}) AND ( + (#{q_table_name}.#{attr_partial}->>'#{column_name}')::#{cast} IS NULL OR + EXCLUDED.#{attr_partial_max}::#{cast} > (#{q_table_name}.#{attr_partial}->>'#{column_name}')::#{cast} + ) + ) + SQL + end + + def insert_query_set_jsonb_version(cast, attr_partial, attr_partial_max, column_name) + if cast == "integer" + # If we have integer value, we don't want to encapsulate the value in "" + <<-SQL + , #{attr_partial} = #{q_table_name}.#{attr_partial} || ('{"#{column_name}": ' || EXCLUDED.#{attr_partial_max}::#{cast} || '}')::jsonb + SQL + else + <<-SQL + , #{attr_partial} = #{q_table_name}.#{attr_partial} || ('{"#{column_name}": "' || EXCLUDED.#{attr_partial_max}::#{cast} || '"}')::jsonb + SQL + end + end + + def insert_query_returning + <<-SQL + RETURNING "id",#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")} + #{insert_query_returning_timestamps} + SQL + end + + def insert_query_returning_timestamps + if inventory_collection.parallel_safe? + # For upsert, we'll return also created and updated timestamps, so we can recognize what was created and what + # updated + if inventory_collection.internal_timestamp_columns.present? + <<-SQL + , #{inventory_collection.internal_timestamp_columns.map { |x| quote_column_name(x) }.join(",")} + SQL + end + else + "" + end + end + end + end +end