Skip to content

Commit

Permalink
Merge pull request #15897 from Ladas/optimize_speed_stabilize_the_bat…
Browse files Browse the repository at this point in the history
…ch_graph_refresh_memory_usage

Optimize speed and stabilize the batch graph refresh memory usage
  • Loading branch information
agrare authored Sep 26, 2017
2 parents 91d3ef4 + 7e5091f commit a373229
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 50 deletions.
8 changes: 0 additions & 8 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -904,14 +904,6 @@ def fixed_foreign_keys
@fixed_foreign_keys_cache
end

def serializable_keys?(all_attribute_keys)
return @serializable_keys_cache unless @serializable_keys_cache.nil?

@serializable_keys_cache = all_attribute_keys.any? do |key|
model_class.type_for_attribute(key.to_s).respond_to?(:coder)
end
end

def base_class_name
return "" unless model_class

Expand Down
42 changes: 42 additions & 0 deletions app/models/manager_refresh/inventory_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,48 @@ def attributes(inventory_collection_scope = nil)
attributes_for_saving
end

def attributes_with_keys(inventory_collection_scope = nil, all_attribute_keys = [])
# We should explicitly pass a scope, since the inventory_object can be mapped to more InventoryCollections with
# different blacklist and whitelist. The generic code always passes a scope.
inventory_collection_scope ||= inventory_collection

attributes_for_saving = {}
# First transform the values
data.each do |key, value|
if !allowed?(inventory_collection_scope, key)
next
elsif loadable?(value) || inventory_collection_scope.association_to_foreign_key_mapping[key]
# Lets fill also the original data, so other InventoryObject referring to this attribute gets the right
# result
data[key] = value.load if value.respond_to?(:load)
if (foreign_key = inventory_collection_scope.association_to_foreign_key_mapping[key])
# We have an association to fill, lets fill also the :key, cause some other InventoryObject can refer to it
record_id = data[key].try(:id)
foreign_key_to_sym = foreign_key.to_sym
attributes_for_saving[foreign_key_to_sym] = record_id
all_attribute_keys << foreign_key_to_sym
if (foreign_type = inventory_collection_scope.association_to_foreign_type_mapping[key])
# If we have a polymorphic association, we need to also fill a base class name, but we want to nullify it
# if record_id is missing
base_class = data[key].try(:base_class_name) || data[key].class.try(:base_class).try(:name)
foreign_type_to_sym = foreign_type.to_sym
attributes_for_saving[foreign_type_to_sym] = record_id ? base_class : nil
all_attribute_keys << foreign_type_to_sym
end
else
# We have a normal attribute to fill
attributes_for_saving[key] = data[key]
all_attribute_keys << key
end
else
attributes_for_saving[key] = value
all_attribute_keys << key
end
end

attributes_for_saving
end

def assign_attributes(attributes)
attributes.each { |k, v| public_send("#{k}=", v) }
self
Expand Down
30 changes: 29 additions & 1 deletion app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ def initialize(inventory_collection)
@batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size
@record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key
@select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index }
@pg_types = @model_class.attribute_names.each_with_object({}) do |key, obj|
obj[key.to_sym] = inventory_collection.model_class.columns_hash[key]
.try(:sql_type_metadata)
.try(:instance_values)
.try(:[], "sql_type")
end

@serializable_keys = @model_class.attribute_names.each_with_object({}) do |key, obj|
attribute_type = @model_class.type_for_attribute(key.to_s)
pg_type = @pg_types[key.to_sym]

if inventory_collection.use_ar_object?
# When using AR object, lets make sure we type.serialize(value) every value, so we have a slow but always
# working way driven by a configuration
obj[key.to_sym] = attribute_type
elsif attribute_type.respond_to?(:coder) ||
attribute_type.type == :int4range ||
pg_type == "text[]" ||
pg_type == "character varying[]"
# Identify columns that needs to be encoded by type.serialize(value), it's a costy operations so lets do
# do it only for columns we need it for.
obj[key.to_sym] = attribute_type
end
end
end

def save_inventory_collection!
Expand All @@ -47,7 +71,7 @@ def save_inventory_collection!

attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes,
:primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes,
:batch_size, :batch_size_for_persisting, :model_class
:batch_size, :batch_size_for_persisting, :model_class, :serializable_keys, :pg_types

def save!(association)
attributes_index = {}
Expand Down Expand Up @@ -258,6 +282,10 @@ def supports_updated_at?
@supports_updated_at_cache
end

def serializable_keys?
@serializable_keys_bool_cache ||= !serializable_keys.blank?
end

def supports_remote_data_timestamp?(all_attribute_keys)
all_attribute_keys.include?(:remote_data_timestamp) # include? on Set is O(1)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ def save!(association)
all_attribute_keys = Set.new + inventory_collection.batch_extra_attributes

inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
index = inventory_object.manager_uuid
attributes = inventory_object.attributes_with_keys(inventory_collection, all_attribute_keys)
# TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
index = unique_index_keys.map { |key| attributes[key].to_s }.join(inventory_collection.stringify_joiner)

# Interesting fact: not building attributes_index and using only inventory_objects_index doesn't do much
# of a difference, since the most objects inside are shared.
attributes_index[index] = attributes
inventory_objects_index[index] = inventory_object
all_attribute_keys.merge(attributes_index[index].keys)
end

all_attribute_keys << :created_at if supports_created_at?
Expand All @@ -65,8 +67,6 @@ def save!(association)

_log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************")

# TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG
collect_pg_types!(all_attribute_keys)
update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys)

unless inventory_collection.custom_reconnect_block.nil?
Expand Down Expand Up @@ -102,9 +102,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
next unless assert_distinct_relation(primary_key_value)

# TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
index = unique_index_keys_to_s.map { |attribute| record_key(record, attribute).to_s }.join(inventory_collection.stringify_joiner)
index = unique_index_keys_to_s.map { |key| record_key(record, key).to_s }.join(inventory_collection.stringify_joiner)
inventory_object = inventory_objects_index.delete(index)
hash = attributes_index[index]
hash = attributes_index.delete(index)

if inventory_object.nil?
# Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should
Expand All @@ -119,13 +119,11 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,

hash_for_update = if inventory_collection.use_ar_object?
record.assign_attributes(hash.except(:id, :type))
values_for_database(inventory_collection.model_class,
all_attribute_keys,
record.attributes.symbolize_keys)
elsif inventory_collection.serializable_keys?(all_attribute_keys)
values_for_database(inventory_collection.model_class,
all_attribute_keys,
hash)
values_for_database!(all_attribute_keys,
record.attributes.symbolize_keys)
elsif serializable_keys?
values_for_database!(all_attribute_keys,
hash)
else
hash
end
Expand Down Expand Up @@ -201,13 +199,11 @@ def create_records!(all_attribute_keys, batch, attributes_index)
batch.each do |index, inventory_object|
hash = if inventory_collection.use_ar_object?
record = inventory_collection.model_class.new(attributes_index.delete(index))
values_for_database(inventory_collection.model_class,
all_attribute_keys,
record.attributes.symbolize_keys)
elsif inventory_collection.serializable_keys?(all_attribute_keys)
values_for_database(inventory_collection.model_class,
all_attribute_keys,
attributes_index.delete(index))
values_for_database!(all_attribute_keys,
record.attributes.symbolize_keys)
elsif serializable_keys?
values_for_database!(all_attribute_keys,
attributes_index.delete(index))
else
attributes_index.delete(index)
end
Expand All @@ -233,12 +229,13 @@ def create_records!(all_attribute_keys, batch, attributes_index)
end
end

def values_for_database(model_class, all_attribute_keys, attributes)
all_attribute_keys.each_with_object({}) do |attribute_name, db_values|
type = model_class.type_for_attribute(attribute_name.to_s)
raw_val = attributes[attribute_name]
db_values[attribute_name] = type.type == :boolean ? type.cast(raw_val) : type.serialize(raw_val)
def values_for_database!(all_attribute_keys, attributes)
all_attribute_keys.each do |key|
if (type = serializable_keys[key])
attributes[key] = type.serialize(attributes[key])
end
end
attributes
end

def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, hashes, result)
Expand Down
16 changes: 1 addition & 15 deletions app/models/manager_refresh/save_collection/saver/sql_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def quote(connection, value, name = nil, type_cast_for_pg = nil)
def quote_and_pg_type_cast(connection, value, name)
pg_type_cast(
connection.quote(value),
pg_type(name)
pg_types[name]
)
end

Expand All @@ -138,20 +138,6 @@ def pg_type_cast(value, sql_type)
"#{value}::#{sql_type}"
end
end

def pg_type(name)
@pg_types_cache[name]
end

def collect_pg_types!(all_attribute_keys)
@pg_types_cache = {}
all_attribute_keys.each do |key|
@pg_types_cache[key] = inventory_collection.model_class.columns_hash[key.to_s]
.try(:sql_type_metadata)
.try(:instance_values)
.try(:[], "sql_type")
end
end
end
end
end

0 comments on commit a373229

Please sign in to comment.