Skip to content

Commit

Permalink
Merge pull request ManageIQ#17861 from Ladas/partial_row_updates_in_p…
Browse files Browse the repository at this point in the history
…arallel

Partial row updates in parallel
  • Loading branch information
agrare authored Aug 27, 2018
2 parents 6b17ea7 + 4f3c6dc commit a8b2145
Show file tree
Hide file tree
Showing 15 changed files with 961 additions and 218 deletions.
5 changes: 4 additions & 1 deletion app/models/manageiq/providers/inventory/persister.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def initialize_inventory_collections
# @return [Hash] entire Persister object serialized to hash
def to_hash
collections_data = collections.map do |_, collection|
next if collection.data.blank? && collection.targeted_scope.primary_references.blank? && collection.all_manager_uuids.nil?
next if collection.data.blank? &&
collection.targeted_scope.primary_references.blank? &&
collection.all_manager_uuids.nil? &&
collection.skeletal_primary_index.index_data.blank?

collection.to_hash
end.compact
Expand Down
145 changes: 145 additions & 0 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class InventoryCollection

delegate :<<,
:build,
:build_partial,
:data,
:each,
:find_or_build,
Expand Down Expand Up @@ -570,6 +571,150 @@ def parallel_safe?
@parallel_safe_cache ||= %i(concurrent_safe concurrent_safe_batch).include?(saver_strategy)
end

# @return [Boolean] true if the model_class supports STI
def supports_sti?
@supports_sti_cache = model_class.column_names.include?("type") if @supports_sti_cache.nil?
@supports_sti_cache
end

# @return [Boolean] true if the model_class has created_on column
def supports_created_on?
if @supports_created_on_cache.nil?
@supports_created_on_cache = (model_class.column_names.include?("created_on") && ActiveRecord::Base.record_timestamps)
end
@supports_created_on_cache
end

# @return [Boolean] true if the model_class has updated_on column
def supports_updated_on?
if @supports_updated_on_cache.nil?
@supports_updated_on_cache = (model_class.column_names.include?("updated_on") && ActiveRecord::Base.record_timestamps)
end
@supports_updated_on_cache
end

# @return [Boolean] true if the model_class has created_at column
def supports_created_at?
if @supports_created_at_cache.nil?
@supports_created_at_cache = (model_class.column_names.include?("created_at") && ActiveRecord::Base.record_timestamps)
end
@supports_created_at_cache
end

# @return [Boolean] true if the model_class has updated_at column
def supports_updated_at?
if @supports_updated_at_cache.nil?
@supports_updated_at_cache = (model_class.column_names.include?("updated_at") && ActiveRecord::Base.record_timestamps)
end
@supports_updated_at_cache
end

# @return [Boolean] true if the model_class has resource_timestamps_max column
def supports_resource_timestamps_max?
@supports_resource_timestamps_max_cache ||= model_class.column_names.include?("resource_timestamps_max")
end

# @return [Boolean] true if the model_class has resource_timestamps column
def supports_resource_timestamps?
@supports_resource_timestamps_cache ||= model_class.column_names.include?("resource_timestamps")
end

# @return [Boolean] true if the model_class has resource_timestamp column
def supports_resource_timestamp?
@supports_resource_timestamp_cache ||= model_class.column_names.include?("resource_timestamp")
end

# @return [Boolean] true if the model_class has resource_versions_max column
def supports_resource_versions_max?
@supports_resource_versions_max_cache ||= model_class.column_names.include?("resource_versions_max")
end

# @return [Boolean] true if the model_class has resource_versions column
def supports_resource_versions?
@supports_resource_versions_cache ||= model_class.column_names.include?("resource_versions")
end

# @return [Boolean] true if the model_class has resource_version column
def supports_resource_version?
@supports_resource_version_cache ||= model_class.column_names.include?("resource_version")
end

# @return [Array<Symbol>] all columns that are part of the best fit unique index
def unique_index_columns
return @unique_index_columns if @unique_index_columns

@unique_index_columns = unique_index_for(unique_index_keys).columns.map(&:to_sym)
end

def unique_index_keys
@unique_index_keys ||= manager_ref_to_cols.map(&:to_sym)
end

# @return [Array<ActiveRecord::ConnectionAdapters::IndexDefinition>] array of all unique indexes known to model
def unique_indexes
@unique_indexes_cache if @unique_indexes_cache

@unique_indexes_cache = model_class.connection.indexes(model_class.table_name).select(&:unique)

if @unique_indexes_cache.blank?
raise "#{self} and its table #{model_class.table_name} must have a unique index defined, to"\
" be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch."
end

@unique_indexes_cache
end

# Finds an index that fits the list of columns (keys) the best
#
# @param keys [Array<Symbol>]
# @raise [Exception] if the unique index for the columns was not found
# @return [ActiveRecord::ConnectionAdapters::IndexDefinition] unique index fitting the keys
def unique_index_for(keys)
@unique_index_for_keys_cache ||= {}
@unique_index_for_keys_cache[keys] if @unique_index_for_keys_cache[keys]

# Find all uniq indexes that that are covering our keys
uniq_key_candidates = unique_indexes.each_with_object([]) { |i, obj| obj << i if (keys - i.columns.map(&:to_sym)).empty? }

if @unique_indexes_cache.blank?
raise "#{self} and its table #{model_class.table_name} must have a unique index defined "\
"covering columns #{keys} to be able to use saver_strategy :concurrent_safe or :concurrent_safe_batch."
end

# Take the uniq key having the least number of columns
@unique_index_for_keys_cache[keys] = uniq_key_candidates.min_by { |x| x.columns.count }
end

def internal_columns
return @internal_columns if @internal_columns

@internal_columns = [] + internal_timestamp_columns
@internal_columns << :type if supports_sti?
@internal_columns << :resource_timestamps_max if supports_resource_timestamps_max?
@internal_columns << :resource_timestamps if supports_resource_timestamps?
@internal_columns << :resource_timestamp if supports_resource_timestamp?
@internal_columns << :resource_versions_max if supports_resource_versions_max?
@internal_columns << :resource_versions if supports_resource_versions?
@internal_columns << :resource_version if supports_resource_version?
@internal_columns
end

def internal_timestamp_columns
return @internal_timestamp_columns if @internal_timestamp_columns

@internal_timestamp_columns = []
@internal_timestamp_columns << :created_on if supports_created_on?
@internal_timestamp_columns << :created_at if supports_created_at?
@internal_timestamp_columns << :updated_on if supports_updated_on?
@internal_timestamp_columns << :updated_at if supports_updated_at?

@internal_timestamp_columns
end

def base_columns
@base_columns ||= unique_index_columns + internal_columns
end

# @return [Boolean] true if no more data will be added to this InventoryCollection object, that usually happens
# after the parsing step is finished
def data_collection_finalized?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ def build(hash)
inventory_object
end

# Finds of builds a new InventoryObject with incomplete data.
#
# @param hash [Hash] Hash that needs to contain attributes defined in :manager_ref of the
# InventoryCollection
# @return [ManagerRefresh::InventoryObject] Found or built InventoryObject object
def build_partial(hash)
skeletal_primary_index.build(hash)
end

# Returns array of built InventoryObject objects
#
# @return [Array<ManagerRefresh::InventoryObject>] Array of built InventoryObject objects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def reindex!
end
end

# @return [Array] Returns index data
def index_data
index.values
end

# Find value based on index_value
#
# @param _index_value [String] a index_value of the InventoryObject we search for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ class Data < ManagerRefresh::InventoryCollection::Index::Type::Base
def find(index_value)
index[index_value]
end

# Deletes and returns the value on the index_value
#
# @param index_value [String] a index_value of the InventoryObject we search for
# @return [InventoryObject|nil] Returns found value or nil
def delete(index_value)
index.delete(index_value)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def initialize(inventory_collection, index_name, attribute_names, primary_index)

delegate :blank?,
:each,
:each_value,
:to => :index

# Find value based on index_value
Expand All @@ -36,21 +37,39 @@ def delete(index_value)
index.delete(index_value)
end

# Builds index record with skeletal InventoryObject and returns it, or returns nil if it's already present
# in primary_index or skeletal_primary_index
# Takes value from primary_index and inserts it to skeletal index
#
# @param index_value [String] a index_value of the InventoryObject we search for
# @return [InventoryObject|nil] Returns found value or nil
def skeletonize_primary_index(index_value)
inventory_object = primary_index.delete(index_value)
return unless inventory_object
fill_versions!(inventory_object.data)

index[index_value] = inventory_object
end

# Builds index record with skeletal InventoryObject and returns it. Or it returns existing InventoryObject
# that is found in primary_index or skeletal_primary_index.
#
# @param attributes [Hash] Skeletal data of the index, must contain unique index keys and everything else
# needed for creating the record in the Database
# @return [InventoryObject|nil] Returns built value or nil
# @return [InventoryObject] Returns built InventoryObject or existing InventoryObject with new attributes
# assigned
def build(attributes)
attributes = {}.merge!(default_values).merge!(attributes)
fill_versions!(attributes)

# If the primary index is already filled, we don't want populate skeletal index
uuid = ::ManagerRefresh::InventoryCollection::Reference.build_stringified_reference(attributes, named_ref)
return if primary_index.find(uuid)
if (inventory_object = primary_index.find(uuid))
return inventory_object.assign_attributes(attributes)
end

# Return if skeletal index already exists
return if index[uuid]
if (inventory_object = index[uuid])
return inventory_object.assign_attributes(attributes)
end

# We want to populate a new skeletal index
inventory_object = new_inventory_object(attributes)
Expand All @@ -60,6 +79,33 @@ def build(attributes)
private

attr_reader :primary_index

# Add versions columns into the passed attributes
#
# @param attributes [Hash] Attributes we want to extend with version related attributes
def fill_versions!(attributes)
if inventory_collection.supports_resource_timestamps_max? && attributes[:resource_timestamp]
fill_specific_version_attr(:resource_timestamps, :resource_timestamp, attributes)
elsif inventory_collection.supports_resource_versions_max? && attributes[:resource_version]
fill_specific_version_attr(:resource_versions, :resource_version, attributes)
end
end

# Add specific versions columns into the passed attributes
#
# @param partial_row_version_attr [Symbol] Attr name for partial rows, allowed values are
# [:resource_timestamps, :resource_versions]
# @param full_row_version_attr [Symbol] Attr name for full rows, allowed values are
# [:resource_timestamp, :resource_version]
# @param attributes [Hash] Attributes we want to extend with version related attributes
def fill_specific_version_attr(partial_row_version_attr, full_row_version_attr, attributes)
# We have to symbolize, since serializing persistor makes these strings
(attributes[partial_row_version_attr] ||= {}).symbolize_keys!

(attributes.keys - inventory_collection.base_columns).each do |key|
attributes[partial_row_version_attr][key] ||= attributes[full_row_version_attr]
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Serialization
:inventory_object_lazy?,
:inventory_object?,
:name,
:skeletal_primary_index,
:to => :inventory_collection

attr_reader :inventory_collection
Expand All @@ -30,6 +31,10 @@ def from_hash(inventory_objects_data, available_inventory_collections)
build(hash_to_data(inventory_object_data, available_inventory_collections).symbolize_keys!)
end

inventory_objects_data['partial_data'].each do |inventory_object_data|
skeletal_primary_index.build(hash_to_data(inventory_object_data, available_inventory_collections).symbolize_keys!)
end

# TODO(lsmola) add support for all_manager_uuids serialization
# self.all_manager_uuids = inventory_objects_data['all_manager_uuids']
end
Expand All @@ -43,7 +48,8 @@ def to_hash
# TODO(lsmola) we do not support nested references here, should we?
:manager_uuids => targeted_scope.primary_references.values.map(&:full_reference),
:all_manager_uuids => all_manager_uuids,
:data => data.map { |x| data_to_hash(x.data) }
:data => data.map { |x| data_to_hash(x.data) },
:partial_data => skeletal_primary_index.index_data.map { |x| data_to_hash(x.data) },
}
end

Expand Down
Loading

0 comments on commit a8b2145

Please sign in to comment.