From 23c6e47b3aa7a2a8273606892bd2017e7303ae7b Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 1 Aug 2017 09:35:01 +0200 Subject: [PATCH 01/14] Check if IC has serializable_keys? and transform them Check if IC has serializable_keys? and transform them, this saves some computing since we don not have to use :use_ar_object just because the model has: serialize --- .../manager_refresh/inventory_collection.rb | 8 ++++++++ .../saver/concurrent_safe_batch.rb | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index 5e84ab8ccd3..2938e511cb1 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -911,6 +911,14 @@ def fixed_foreign_keys @fixed_foreign_keys_cache end + def serializable_keys?(all_attribute_keys) + return @serializable_keys_cache unless @serializable_keys_cache.nil? + + @serializable_keys_cache = all_attribute_keys.any? do |key| + model_class.type_for_attribute(key.to_s).respond_to?(:coder) + end + end + def base_class_name return "" unless model_class diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 7e56cf2385e..9730f85e6d5 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -32,7 +32,6 @@ def save!(association) next unless assert_distinct_relation(record) index = inventory_collection.object_index_with_keys(unique_index_keys, record) - inventory_object = inventory_objects_index.delete(index) hash = attributes_index.delete(index) @@ -51,7 +50,11 @@ def save!(association) record.assign_attributes(hash.except(:id, :type)) values_for_database(inventory_collection.model_class, all_attribute_keys, - record.attributes) + record.attributes.symbolize_keys) + elsif inventory_collection.serializable_keys?(all_attribute_keys) + values_for_database(inventory_collection.model_class, + all_attribute_keys, + hash.symbolize_keys) else hash.symbolize_keys end @@ -59,7 +62,7 @@ def save!(association) inventory_collection.store_updated_records(record) hash_for_update[:id] = record.id - hashes_for_update << hash_for_update.except(:type) + hashes_for_update << hash_for_update end end @@ -136,7 +139,11 @@ def create_records!(all_attribute_keys, batch, attributes_index) record = inventory_collection.model_class.new(attributes_index.delete(index)) values_for_database(inventory_collection.model_class, all_attribute_keys, - record.attributes) + record.attributes.symbolize_keys) + elsif inventory_collection.serializable_keys?(all_attribute_keys) + values_for_database(inventory_collection.model_class, + all_attribute_keys, + attributes_index.delete(index).symbolize_keys) else attributes_index.delete(index).symbolize_keys end @@ -165,7 +172,7 @@ def create_records!(all_attribute_keys, batch, attributes_index) def values_for_database(model_class, all_attribute_keys, attributes) all_attribute_keys.each_with_object({}) do |attribute_name, db_values| type = model_class.type_for_attribute(attribute_name.to_s) - raw_val = attributes[attribute_name.to_s] + raw_val = attributes[attribute_name] db_values[attribute_name] = type.type == :boolean ? type.cast(raw_val) : type.serialize(raw_val) end end From 374f1fd59ecac43db229c7544850d0fe22a60bf5 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 14:39:46 +0200 Subject: [PATCH 02/14] Add TODOs for noticed possible bugs Add TODOs for noticed possible bugs --- app/models/manager_refresh/inventory_collection.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/app/models/manager_refresh/inventory_collection.rb b/app/models/manager_refresh/inventory_collection.rb index 2938e511cb1..8d635f4356c 100644 --- a/app/models/manager_refresh/inventory_collection.rb +++ b/app/models/manager_refresh/inventory_collection.rb @@ -685,6 +685,8 @@ def stringify_reference(reference) end def manager_ref_to_cols + # TODO(lsmola) this should contain the polymorphic _type, otherwise the IC with polymorphic unique key will get + # conflicts # Convert attributes from unique key to actual db cols manager_ref.map do |ref| association_to_foreign_key_mapping[ref] || ref @@ -994,10 +996,12 @@ def db_collection_for_comparison end def db_collection_for_comparison_for(manager_uuids_set) + # TODO(lsmola) this should have the build_multi_selection_condition, like in the method above full_collection_for_comparison.where(manager_ref.first => manager_uuids_set) end def db_collection_for_comparison_for_complement_of(manager_uuids_set) + # TODO(lsmola) this should have the build_multi_selection_condition, like in the method above full_collection_for_comparison.where.not(manager_ref.first => manager_uuids_set) end From 8c8a9417c14d7c62d559fd913dc7a05c10aab972 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 14:41:22 +0200 Subject: [PATCH 03/14] Use to_sym for individual keys instead of symbolize_keys! Use to_sym for individual keys instead of symbolize_keys! --- app/models/manager_refresh/inventory_object.rb | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/app/models/manager_refresh/inventory_object.rb b/app/models/manager_refresh/inventory_object.rb index 330dd9cae04..d95d70a8ae4 100644 --- a/app/models/manager_refresh/inventory_object.rb +++ b/app/models/manager_refresh/inventory_object.rb @@ -15,6 +15,8 @@ def initialize(inventory_collection, data) end def manager_uuid + # TODO(lsmola) should we have a separate function for uuid containing foreign keys? Probably yes, since it could + # speed up the ID fetching. id_with_keys(manager_ref) end @@ -50,36 +52,36 @@ def attributes(inventory_collection_scope = nil) data[key] = value.compact.map(&:load).compact # We can use built in _ids methods to assign array of ids into has_many relations. So e.g. the :key_pairs= # relation setter will become :key_pair_ids= - attributes_for_saving[key.to_s.singularize + "_ids"] = data[key].map(&:id).compact.uniq + attributes_for_saving[(key.to_s.singularize + "_ids").to_sym] = data[key].map(&:id).compact.uniq elsif loadable?(value) || inventory_collection_scope.association_to_foreign_key_mapping[key] # Lets fill also the original data, so other InventoryObject referring to this attribute gets the right # result data[key] = value.load if value.respond_to?(:load) if (foreign_key = inventory_collection_scope.association_to_foreign_key_mapping[key]) # We have an association to fill, lets fill also the :key, cause some other InventoryObject can refer to it - record_id = data[key].try(:id) - attributes_for_saving[foreign_key] = record_id + record_id = data[key].try(:id) + attributes_for_saving[foreign_key.to_sym] = record_id if (foreign_type = inventory_collection_scope.association_to_foreign_type_mapping[key]) # If we have a polymorphic association, we need to also fill a base class name, but we want to nullify it # if record_id is missing base_class = data[key].try(:base_class_name) || data[key].class.try(:base_class).try(:name) - attributes_for_saving[foreign_type] = record_id ? base_class : nil + attributes_for_saving[foreign_type.to_sym] = record_id ? base_class : nil end elsif data[key].kind_of?(::ManagerRefresh::InventoryObject) # We have an association to fill but not an Activerecord association, so e.g. Ancestry, lets just load # it here. This way of storing ancestry is ineffective in DB call count, but RAM friendly - attributes_for_saving[key] = data[key].base_class_name.constantize.find_by(:id => data[key].id) + attributes_for_saving[key.to_sym] = data[key].base_class_name.constantize.find_by(:id => data[key].id) else # We have a normal attribute to fill - attributes_for_saving[key] = data[key] + attributes_for_saving[key.to_sym] = data[key] end else - attributes_for_saving[key] = value + attributes_for_saving[key.to_sym] = value end end - attributes_for_saving.symbolize_keys + attributes_for_saving end def assign_attributes(attributes) From e6d0c4896adfb1b39d45718c98524ebca49f052e Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 14:43:06 +0200 Subject: [PATCH 04/14] Performance tweaks for the base saver Performance tweaks for the base saver, getting repeated logic computed in initializer. Getting record_key abrtratcion for fetching attributes of non AR records. --- .../save_collection/saver/base.rb | 53 ++++++++++++------- 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index 7d2baa96099..5157a28fcb7 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -4,15 +4,30 @@ class Base include Vmdb::Logging include ManagerRefresh::SaveCollection::Saver::SqlHelper - attr_reader :inventory_collection + attr_reader :inventory_collection, :association def initialize(inventory_collection) @inventory_collection = inventory_collection # Private attrs + @primary_key = inventory_collection.model_class.primary_key + @arel_primary_key = inventory_collection.model_class.arel_attribute(primary_key) @unique_index_keys = inventory_collection.manager_ref_to_cols + @unique_index_keys_to_s = inventory_collection.manager_ref_to_cols.map(&:to_s) + @select_keys = [@primary_key.to_sym] + inventory_collection.manager_ref_to_cols @unique_db_primary_keys = Set.new @unique_db_indexes = Set.new + + # TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations. + inventory_collection.parent.reload if inventory_collection.parent + @association = inventory_collection.db_collection_for_comparison + + # Right not ApplicationRecordIterator in association is used for targeted refresh. Given the small amounts of + # records flowing through there, we probably don't need to optimize that association to fetch pure SQL. + @pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(ManagerRefresh::ApplicationRecordIterator) + + @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key + @select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index } end def save_inventory_collection! @@ -22,16 +37,13 @@ def save_inventory_collection! # job. We want to do 1 :delete_complement job at 1 time, to keep to memory down. return delete_complement if inventory_collection.all_manager_uuids.present? - # TODO(lsmola) do I need to reload every time? Also it should be enough to clear the associations. - inventory_collection.parent.reload if inventory_collection.parent - association = inventory_collection.db_collection_for_comparison - save!(association) end private - attr_reader :unique_index_keys, :unique_db_primary_keys, :unique_db_indexes + attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes, + :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes def save!(association) attributes_index = {} @@ -62,7 +74,7 @@ def save!(association) delete_record!(record) if inventory_collection.delete_allowed? else # Record was found in the DB and sent for saving, we will be updating the DB. - update_record!(record, hash, inventory_object) if assert_referential_integrity(hash, inventory_object) + update_record!(record, hash, inventory_object) if assert_referential_integrity(hash) end end end @@ -77,7 +89,7 @@ def save!(association) inventory_objects_index.each do |index, inventory_object| hash = attributes_index.delete(index) - create_record!(hash, inventory_object) if assert_referential_integrity(hash, inventory_object) + create_record!(hash, inventory_object) if assert_referential_integrity(hash) end end end @@ -95,7 +107,12 @@ def inventory_collection_details end def batch_size - inventory_collection.batch_size + # We can do bigger batch size for the pure SQL, since it will be much smaller than loading AR objects + pure_sql_records_fetching ? 10_000 : inventory_collection.batch_size + end + + def record_key(record, key) + record.public_send(key) end def delete_complement @@ -133,7 +150,7 @@ def assert_unique_record(_record, _index) end def assert_distinct_relation(record) - if unique_db_primary_keys.include?(record.id) # Include on Set is O(1) + if unique_db_primary_keys.include?(record_key(record, primary_key)) # Include on Set is O(1) # Change the InventoryCollection's :association or :arel parameter to return distinct results. The :through # relations can return the same record multiple times. We don't want to do SELECT DISTINCT by default, since # it can be very slow. @@ -145,17 +162,17 @@ def assert_distinct_relation(record) raise("Please update :association or :arel for #{inventory_collection} to return a DISTINCT result. ") end else - unique_db_primary_keys << record.id + unique_db_primary_keys << record_key(record, primary_key) end true end - def assert_referential_integrity(hash, inventory_object) - inventory_object.inventory_collection.fixed_foreign_keys.each do |x| - next unless hash[x].blank? - subject = "#{inventory_object} of #{inventory_object.inventory_collection} because of missing foreign key #{x} for "\ - "#{inventory_object.inventory_collection.parent.class.name}:"\ - "#{inventory_object.inventory_collection.parent.try(:id)}" + def assert_referential_integrity(hash) + inventory_collection.fixed_foreign_keys.each do |x| + next unless hash[x].nil? + subject = "#{hash} of #{inventory_collection} because of missing foreign key #{x} for "\ + "#{inventory_collection.parent.class.name}:"\ + "#{inventory_collection.parent.try(:id)}" if Rails.env.production? _log.warn("Referential integrity check violated, ignoring #{subject}") return false @@ -186,7 +203,7 @@ def assign_attributes_for_update!(hash, update_time) end def assign_attributes_for_create!(hash, create_time) - hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].blank? + hash[:type] = inventory_collection.model_class.name if inventory_collection.supports_sti? && hash[:type].nil? hash[:created_on] = create_time if inventory_collection.supports_timestamps_on_variant? hash[:created_at] = create_time if inventory_collection.supports_timestamps_at_variant? assign_attributes_for_update!(hash, create_time) From 2894ecf8fb04049fdc74806d427447d18de4658e Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 14:47:51 +0200 Subject: [PATCH 05/14] Perf tweaks of the batch saver Perf tweaks of the batch saver: 1. Having iterator that fetches raw SQL without creationg AR objects. 2. Select on the fetched data to reduce the mem size needed. 3. Bumping batch_size to 10k, since the objects are 10x smaller than AR objects. And few small tweaks in the core saving --- .../saver/concurrent_safe_batch.rb | 148 ++++++++++++------ 1 file changed, 104 insertions(+), 44 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 9730f85e6d5..1665bf10c01 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -3,6 +3,50 @@ module Saver class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base private + def record_key(record, key) + send(record_key_method, record, key) + end + + def ar_record_key(record, key) + record.public_send(key) + end + + def pure_sql_record_key(record, key) + record[select_keys_indexes[key]] + end + + def batch_size_for_persisting + 10_000 + end + + def batch_iterator(association) + if pure_sql_records_fetching + # Building fast iterator doing pure SQL query avoiding redundant creation of AR objects, responds to + # find_in_batches. For targeted refresh, the association can already be ApplicationRecordIterator. + pure_sql_iterator = lambda do |&block| + primary_key_offset = nil + loop do + relation = association.select(*select_keys) + .order("#{primary_key} ASC") + .limit(batch_size) + # Using rails way of comparing primary key instead of offset + relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset + records = get_connection.query(relation.to_sql) + last_record = records.last + block.call(records) + + break if records.size < batch_size + primary_key_offset = record_key(last_record, primary_key) + end + end + + ManagerRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator) + else + # Normal rails association where we will call find_in_batches + association + end + end + def save!(association) attributes_index = {} inventory_objects_index = {} @@ -22,18 +66,41 @@ def save!(association) all_attribute_keys += [:created_at, :updated_at] if inventory_collection.supports_timestamps_at_variant? _log.info("*************** PROCESSING #{inventory_collection} of size #{inventory_collection.size} *************") - hashes_for_update = [] + + # TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG + collect_pg_types!(all_attribute_keys) + update_or_destroy_records!(batch_iterator(association), inventory_objects_index, attributes_index, all_attribute_keys) + + all_attribute_keys << :type if inventory_collection.supports_sti? + # Records that were not found in the DB but sent for saving, we will be creating these in the DB. + if inventory_collection.create_allowed? + inventory_objects_index.each_slice(batch_size_for_persisting) do |batch| + create_records!(all_attribute_keys, batch, attributes_index) + end + end + _log.info("*************** PROCESSED #{inventory_collection}, "\ + "created=#{inventory_collection.created_records.count}, "\ + "updated=#{inventory_collection.updated_records.count}, "\ + "deleted=#{inventory_collection.deleted_records.count} *************") + rescue => e + _log.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}") + raise e + end + + def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, attributes_index, all_attribute_keys) + hashes_for_update = [] records_for_destroy = [] - # Records that are in the DB, we will be updating or deleting them. - association.find_in_batches do |batch| + records_batch_iterator.find_in_batches(:batch_size => batch_size) do |batch| update_time = time_now + batch.each do |record| next unless assert_distinct_relation(record) - index = inventory_collection.object_index_with_keys(unique_index_keys, record) + # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection + index = unique_index_keys_to_s.map { |attribute| record_key(record, attribute).to_s }.join(inventory_collection.stringify_joiner) inventory_object = inventory_objects_index.delete(index) - hash = attributes_index.delete(index) + hash = attributes_index[index] if inventory_object.nil? # Record was found in the DB but not sent for saving, that means it doesn't exist anymore and we should @@ -43,8 +110,8 @@ def save!(association) end else # Record was found in the DB and sent for saving, we will be updating the DB. - next unless assert_referential_integrity(hash, inventory_object) - inventory_object.id = record.id + next unless assert_referential_integrity(hash) + inventory_object.id = record_key(record, primary_key) hash_for_update = if inventory_collection.use_ar_object? record.assign_attributes(hash.except(:id, :type)) @@ -54,28 +121,28 @@ def save!(association) elsif inventory_collection.serializable_keys?(all_attribute_keys) values_for_database(inventory_collection.model_class, all_attribute_keys, - hash.symbolize_keys) + hash) else - hash.symbolize_keys + hash end assign_attributes_for_update!(hash_for_update, update_time) - inventory_collection.store_updated_records(record) + inventory_collection.store_updated_records([{:id => record_key(record, primary_key)}]) - hash_for_update[:id] = record.id + hash_for_update[:id] = inventory_object.id hashes_for_update << hash_for_update end end # Update in batches - if hashes_for_update.size >= batch_size + if hashes_for_update.size >= batch_size_for_persisting update_records!(all_attribute_keys, hashes_for_update) hashes_for_update = [] end # Destroy in batches - if records_for_destroy.size >= batch_size - destroy_records(records_for_destroy) + if records_for_destroy.size >= batch_size_for_persisting + destroy_records!(records_for_destroy) records_for_destroy = [] end end @@ -85,26 +152,11 @@ def save!(association) hashes_for_update = [] # Cleanup so GC can release it sooner # Destroy the last batch - destroy_records(records_for_destroy) + destroy_records!(records_for_destroy) records_for_destroy = [] # Cleanup so GC can release it sooner - - all_attribute_keys << :type if inventory_collection.supports_sti? - # Records that were not found in the DB but sent for saving, we will be creating these in the DB. - if inventory_collection.create_allowed? - inventory_objects_index.each_slice(batch_size) do |batch| - create_records!(all_attribute_keys, batch, attributes_index) - end - end - _log.info("*************** PROCESSED #{inventory_collection}, "\ - "created=#{inventory_collection.created_records.count}, "\ - "updated=#{inventory_collection.updated_records.count}, "\ - "deleted=#{inventory_collection.deleted_records.count} *************") - rescue => e - _log.error("Error when saving #{inventory_collection} with #{inventory_collection_details}. Message: #{e.message}") - raise e end - def destroy_records(records) + def destroy_records!(records) return false unless inventory_collection.delete_allowed? return if records.blank? @@ -112,13 +164,21 @@ def destroy_records(records) rails_delete = %i(destroy delete).include?(inventory_collection.delete_method) if !rails_delete && inventory_collection.model_class.respond_to?(inventory_collection.delete_method) # We have custom delete method defined on a class, that means it supports batch destroy - inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map(&:id)) + # TODO(lsmola) store deleted records to IC + inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map { |x| record_key(x, primary_key) }) else # We have either standard :destroy and :delete rails method, or custom instance level delete method # Note: The standard :destroy and :delete rails method can't be batched because of the hooks and cascade destroy ActiveRecord::Base.transaction do - records.each do |record| - delete_record!(record) + if pure_sql_records_fetching + # For pure SQL fetching, we need to get the AR objects again, so we can call destroy + inventory_collection.model_class.where(:id => records.map { |x| record_key(x, primary_key) }).find_each do |record| + delete_record!(record) + end + else + records.each do |record| + delete_record!(record) + end end end end @@ -126,14 +186,14 @@ def destroy_records(records) def update_records!(all_attribute_keys, hashes) return if hashes.blank? - - ActiveRecord::Base.connection.execute(build_update_query(all_attribute_keys, hashes)) + query = build_update_query(all_attribute_keys, hashes) + get_connection.execute(query) end def create_records!(all_attribute_keys, batch, attributes_index) indexed_inventory_objects = {} - hashes = [] - create_time = time_now + hashes = [] + create_time = time_now batch.each do |index, inventory_object| hash = if inventory_collection.use_ar_object? record = inventory_collection.model_class.new(attributes_index.delete(index)) @@ -150,7 +210,7 @@ def create_records!(all_attribute_keys, batch, attributes_index) assign_attributes_for_create!(hash, create_time) - next unless assert_referential_integrity(hash, inventory_object) + next unless assert_referential_integrity(hash) hashes << hash # Index on Unique Columns values, so we can easily fill in the :id later @@ -159,7 +219,7 @@ def create_records!(all_attribute_keys, batch, attributes_index) return if hashes.blank? - result = ActiveRecord::Base.connection.execute( + result = get_connection.execute( build_insert_query(all_attribute_keys, hashes) ) inventory_collection.store_created_records(result) @@ -171,8 +231,8 @@ def create_records!(all_attribute_keys, batch, attributes_index) def values_for_database(model_class, all_attribute_keys, attributes) all_attribute_keys.each_with_object({}) do |attribute_name, db_values| - type = model_class.type_for_attribute(attribute_name.to_s) - raw_val = attributes[attribute_name] + type = model_class.type_for_attribute(attribute_name.to_s) + raw_val = attributes[attribute_name] db_values[attribute_name] = type.type == :boolean ? type.cast(raw_val) : type.serialize(raw_val) end end @@ -183,11 +243,11 @@ def map_ids_to_inventory_objects(indexed_inventory_objects, all_attribute_keys, # we test if the number of results matches the expected batch size. Then if the counts do not match, the only # safe option is to query all the data from the DB, using the unique_indexes. The batch size will also not match # for every remainders(a last batch in a stream of batches) - if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size + if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size_for_persisting result.each do |inserted_record| key = unique_index_columns.map { |x| inserted_record[x.to_s] } inventory_object = indexed_inventory_objects[key] - inventory_object.id = inserted_record["id"] if inventory_object + inventory_object.id = inserted_record[primary_key] if inventory_object end else inventory_collection.model_class.where( From b32d820d4072918d98f80384b0f43306476d29cb Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 14:52:25 +0200 Subject: [PATCH 06/14] Perf tweaks in SQL mixin Perf tweaks in SQL mixin: 1. Store connection for the whole batch 2. Precalculate pg_types for faster type_casting --- .../save_collection/saver/sql_helper.rb | 52 +++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index f533fd534f4..63d6034c89b 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -16,10 +16,13 @@ def build_insert_set_cols(key) end def build_insert_query(all_attribute_keys, hashes) + # Cache the connection for the batch + connection = get_connection + all_attribute_keys_array = all_attribute_keys.to_a table_name = inventory_collection.model_class.table_name values = hashes.map do |hash| - "(#{all_attribute_keys_array.map { |x| quote(hash[x], x) }.join(",")})" + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x) }.join(",")})" end.join(",") col_names = all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",") @@ -64,17 +67,25 @@ def build_update_set_cols(key) end def quote_column_name(key) - ActiveRecord::Base.connection.quote_column_name(key) + get_connection.quote_column_name(key) + end + + def get_connection + # Cache the connection for the batch + ActiveRecord::Base.connection end def build_update_query(all_attribute_keys, hashes) + # Cache the connection for the batch + connection = get_connection + # We want to ignore type and create timestamps when updating all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(type created_at created_on).include?(x) } all_attribute_keys_array << :id table_name = inventory_collection.model_class.table_name - values = hashes.map do |hash| - "(#{all_attribute_keys_array.map { |x| quote(hash[x], x, inventory_collection) }.join(",")})" + values = hashes.map! do |hash| + "(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x, true) }.join(",")})" end.join(",") update_query = %{ @@ -106,35 +117,46 @@ def build_multi_selection_query(hashes) inventory_collection.build_multi_selection_condition(hashes, unique_index_columns) end - def quote(value, name = nil, used_inventory_collection = nil) + def quote(connection, value, name = nil, type_cast_for_pg = nil) # TODO(lsmola) needed only because UPDATE FROM VALUES needs a specific PG typecasting, remove when fixed in PG - if used_inventory_collection.nil? - ActiveRecord::Base.connection.quote(value) + if type_cast_for_pg + quote_and_pg_type_cast(connection, value, name) else - quote_and_pg_type_cast(value, name, used_inventory_collection) + connection.quote(value) end rescue TypeError => e _log.error("Can't quote value: #{value}, of :#{name} and #{inventory_collection}") raise e end - def quote_and_pg_type_cast(value, name, used_inventory_collection) + def quote_and_pg_type_cast(connection, value, name) pg_type_cast( - ActiveRecord::Base.connection.quote(value), - used_inventory_collection.model_class.columns_hash[name.to_s] - .try(:sql_type_metadata) - .try(:instance_values) - .try(:[], "sql_type") + connection.quote(value), + pg_type(name) ) end def pg_type_cast(value, sql_type) - if sql_type.blank? + if sql_type.nil? value else "#{value}::#{sql_type}" end end + + def pg_type(name) + @pg_types_cache[name] + end + + def collect_pg_types!(all_attribute_keys) + @pg_types_cache = {} + all_attribute_keys.each do |key| + @pg_types_cache[key] = inventory_collection.model_class.columns_hash[key.to_s] + .try(:sql_type_metadata) + .try(:instance_values) + .try(:[], "sql_type") + end + end end end end From 84556e44fdef61e0a57b8650dc611cbc28f5f6b1 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 15:41:13 +0200 Subject: [PATCH 07/14] Fix bad comments Fix bad comments --- app/models/manager_refresh/save_collection/saver/base.rb | 4 ++-- .../save_collection/saver/concurrent_safe_batch.rb | 7 ++++--- .../manager_refresh/save_collection/saver/sql_helper.rb | 1 - 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index 5157a28fcb7..b10d7c21b4b 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -22,8 +22,8 @@ def initialize(inventory_collection) inventory_collection.parent.reload if inventory_collection.parent @association = inventory_collection.db_collection_for_comparison - # Right not ApplicationRecordIterator in association is used for targeted refresh. Given the small amounts of - # records flowing through there, we probably don't need to optimize that association to fetch pure SQL. + # Right now ApplicationRecordIterator in association is used for targeted refresh. Given the small amount of + # records flowing through there, we probably don't need to optimize that association to fetch a pure SQL. @pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(ManagerRefresh::ApplicationRecordIterator) @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 1665bf10c01..a6ff7124a01 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -21,8 +21,9 @@ def batch_size_for_persisting def batch_iterator(association) if pure_sql_records_fetching - # Building fast iterator doing pure SQL query avoiding redundant creation of AR objects, responds to - # find_in_batches. For targeted refresh, the association can already be ApplicationRecordIterator. + # Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The + # iterator responds to find_in_batches, so it acts like the AR relation. For targeted refresh, the association + # can already be ApplicationRecordIterator, so we will skip that. pure_sql_iterator = lambda do |&block| primary_key_offset = nil loop do @@ -42,7 +43,7 @@ def batch_iterator(association) ManagerRefresh::ApplicationRecordIterator.new(:iterator => pure_sql_iterator) else - # Normal rails association where we will call find_in_batches + # Normal Rails relation where we can call find_in_batches association end end diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index 63d6034c89b..4389cd181fd 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -71,7 +71,6 @@ def quote_column_name(key) end def get_connection - # Cache the connection for the batch ActiveRecord::Base.connection end From 7e34e96ff700548fbfc5ff685d8df82233b961a3 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Tue, 8 Aug 2017 15:43:36 +0200 Subject: [PATCH 08/14] Fix rubocop issues Fix rubocop issues --- .../manager_refresh/save_collection/saver/sql_helper.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/sql_helper.rb b/app/models/manager_refresh/save_collection/saver/sql_helper.rb index 4389cd181fd..c816856621e 100644 --- a/app/models/manager_refresh/save_collection/saver/sql_helper.rb +++ b/app/models/manager_refresh/save_collection/saver/sql_helper.rb @@ -151,9 +151,9 @@ def collect_pg_types!(all_attribute_keys) @pg_types_cache = {} all_attribute_keys.each do |key| @pg_types_cache[key] = inventory_collection.model_class.columns_hash[key.to_s] - .try(:sql_type_metadata) - .try(:instance_values) - .try(:[], "sql_type") + .try(:sql_type_metadata) + .try(:instance_values) + .try(:[], "sql_type") end end end From 59b789548944dfee0b2122402b02731e8a407707 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Wed, 9 Aug 2017 13:15:22 +0200 Subject: [PATCH 09/14] Correct symbol vs string keys Correct symbol vs string keys --- app/models/manager_refresh/save_collection/saver/base.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index b10d7c21b4b..ff4556bccb8 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -12,9 +12,9 @@ def initialize(inventory_collection) # Private attrs @primary_key = inventory_collection.model_class.primary_key @arel_primary_key = inventory_collection.model_class.arel_attribute(primary_key) - @unique_index_keys = inventory_collection.manager_ref_to_cols + @unique_index_keys = inventory_collection.manager_ref_to_cols.map(&:to_sym) @unique_index_keys_to_s = inventory_collection.manager_ref_to_cols.map(&:to_s) - @select_keys = [@primary_key.to_sym] + inventory_collection.manager_ref_to_cols + @select_keys = [@primary_key] + @unique_index_keys_to_s @unique_db_primary_keys = Set.new @unique_db_indexes = Set.new From c80763d20798d418d586fbfbf1f51a5ebb6f4a32 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Wed, 9 Aug 2017 13:21:06 +0200 Subject: [PATCH 10/14] Move batch size to attributes with setter Move batch size to attributes with setter, so we avoid computing it multiple times. --- .../manager_refresh/save_collection/saver/base.rb | 13 ++++++------- .../save_collection/saver/concurrent_safe_batch.rb | 4 ---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index ff4556bccb8..c58c6ed1fcd 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -26,7 +26,10 @@ def initialize(inventory_collection) # records flowing through there, we probably don't need to optimize that association to fetch a pure SQL. @pure_sql_records_fetching = !inventory_collection.use_ar_object? && !@association.kind_of?(ManagerRefresh::ApplicationRecordIterator) - @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key + @batch_size_for_persisting = 10_000 + + @batch_size = @pure_sql_records_fetching ? @batch_size_for_persisting : inventory_collection.batch_size + @record_key_method = @pure_sql_records_fetching ? :pure_sql_record_key : :ar_record_key @select_keys_indexes = @select_keys.each_with_object({}).with_index { |(key, obj), index| obj[key.to_s] = index } end @@ -43,7 +46,8 @@ def save_inventory_collection! private attr_reader :unique_index_keys, :unique_index_keys_to_s, :select_keys, :unique_db_primary_keys, :unique_db_indexes, - :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes + :primary_key, :arel_primary_key, :record_key_method, :pure_sql_records_fetching, :select_keys_indexes, + :batch_size, :batch_size_for_persisting def save!(association) attributes_index = {} @@ -106,11 +110,6 @@ def inventory_collection_details "strategy: #{inventory_collection.strategy}, saver_strategy: #{inventory_collection.saver_strategy}, targeted: #{inventory_collection.targeted?}" end - def batch_size - # We can do bigger batch size for the pure SQL, since it will be much smaller than loading AR objects - pure_sql_records_fetching ? 10_000 : inventory_collection.batch_size - end - def record_key(record, key) record.public_send(key) end diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index a6ff7124a01..81842c5c129 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -15,10 +15,6 @@ def pure_sql_record_key(record, key) record[select_keys_indexes[key]] end - def batch_size_for_persisting - 10_000 - end - def batch_iterator(association) if pure_sql_records_fetching # Building fast iterator doing pure SQL query and therefore avoiding redundant creation of AR objects. The From 7f09ddd877d27decc67fb4830ee3062c9760715e Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Wed, 9 Aug 2017 13:50:55 +0200 Subject: [PATCH 11/14] Remove redundant symbolize_keys Remove redundant symbolize_keys --- .../save_collection/saver/concurrent_safe_batch.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 81842c5c129..f685610c005 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -200,9 +200,9 @@ def create_records!(all_attribute_keys, batch, attributes_index) elsif inventory_collection.serializable_keys?(all_attribute_keys) values_for_database(inventory_collection.model_class, all_attribute_keys, - attributes_index.delete(index).symbolize_keys) + attributes_index.delete(index)) else - attributes_index.delete(index).symbolize_keys + attributes_index.delete(index) end assign_attributes_for_create!(hash, create_time) From 08ee63f715d190c273b5ed6359a89ddf1328745c Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Wed, 9 Aug 2017 13:58:44 +0200 Subject: [PATCH 12/14] Use faster inventory_object.id call Use faster inventory_object.id call --- .../save_collection/saver/concurrent_safe_batch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index f685610c005..00c5e160640 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -123,7 +123,7 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, hash end assign_attributes_for_update!(hash_for_update, update_time) - inventory_collection.store_updated_records([{:id => record_key(record, primary_key)}]) + inventory_collection.store_updated_records([{:id => inventory_object.id}]) hash_for_update[:id] = inventory_object.id hashes_for_update << hash_for_update From 577e10f681c52e77878afe249045fdc8609899b4 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Wed, 9 Aug 2017 14:16:12 +0200 Subject: [PATCH 13/14] Store primary_key_value to avoid multiple method calls Store primary_key_value to avoid multiple method calls --- .../manager_refresh/save_collection/saver/base.rb | 8 ++++---- .../save_collection/saver/concurrent_safe_batch.rb | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/app/models/manager_refresh/save_collection/saver/base.rb b/app/models/manager_refresh/save_collection/saver/base.rb index c58c6ed1fcd..d39713cedf3 100644 --- a/app/models/manager_refresh/save_collection/saver/base.rb +++ b/app/models/manager_refresh/save_collection/saver/base.rb @@ -66,7 +66,7 @@ def save!(association) association.find_each do |record| index = inventory_collection.object_index_with_keys(unique_index_keys, record) - next unless assert_distinct_relation(record) + next unless assert_distinct_relation(record.id) next unless assert_unique_record(record, index) inventory_object = inventory_objects_index.delete(index) @@ -148,8 +148,8 @@ def assert_unique_record(_record, _index) true end - def assert_distinct_relation(record) - if unique_db_primary_keys.include?(record_key(record, primary_key)) # Include on Set is O(1) + def assert_distinct_relation(primary_key_value) + if unique_db_primary_keys.include?(primary_key_value) # Include on Set is O(1) # Change the InventoryCollection's :association or :arel parameter to return distinct results. The :through # relations can return the same record multiple times. We don't want to do SELECT DISTINCT by default, since # it can be very slow. @@ -161,7 +161,7 @@ def assert_distinct_relation(record) raise("Please update :association or :arel for #{inventory_collection} to return a DISTINCT result. ") end else - unique_db_primary_keys << record_key(record, primary_key) + unique_db_primary_keys << primary_key_value end true end diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 00c5e160640..37fbc44c4a3 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -92,7 +92,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, update_time = time_now batch.each do |record| - next unless assert_distinct_relation(record) + primary_key_value = record_key(record, primary_key) + + next unless assert_distinct_relation(primary_key_value) # TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection index = unique_index_keys_to_s.map { |attribute| record_key(record, attribute).to_s }.join(inventory_collection.stringify_joiner) @@ -108,7 +110,7 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, else # Record was found in the DB and sent for saving, we will be updating the DB. next unless assert_referential_integrity(hash) - inventory_object.id = record_key(record, primary_key) + inventory_object.id = primary_key_value hash_for_update = if inventory_collection.use_ar_object? record.assign_attributes(hash.except(:id, :type)) @@ -123,9 +125,9 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index, hash end assign_attributes_for_update!(hash_for_update, update_time) - inventory_collection.store_updated_records([{:id => inventory_object.id}]) + inventory_collection.store_updated_records([{:id => primary_key_value}]) - hash_for_update[:id] = inventory_object.id + hash_for_update[:id] = primary_key_value hashes_for_update << hash_for_update end end From 60e14d189f4e1f9ec552139265afed623112ed63 Mon Sep 17 00:00:00 2001 From: Ladislav Smola Date: Thu, 10 Aug 2017 12:30:39 +0200 Subject: [PATCH 14/14] Use reorder for find_in_batches working properly Use reorder for find_in_batches working properly --- .../save_collection/saver/concurrent_safe_batch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb index 37fbc44c4a3..aec05a35962 100644 --- a/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb +++ b/app/models/manager_refresh/save_collection/saver/concurrent_safe_batch.rb @@ -24,7 +24,7 @@ def batch_iterator(association) primary_key_offset = nil loop do relation = association.select(*select_keys) - .order("#{primary_key} ASC") + .reorder("#{primary_key} ASC") .limit(batch_size) # Using rails way of comparing primary key instead of offset relation = relation.where(arel_primary_key.gt(primary_key_offset)) if primary_key_offset