Skip to content

Commit

Permalink
Add efficient delete_complement
Browse files Browse the repository at this point in the history
Add efficient delete_complement, that easily works for hundreds
of thousands records involved
  • Loading branch information
Ladas committed Aug 27, 2018
1 parent a8b2145 commit cd76c80
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 30 deletions.
12 changes: 0 additions & 12 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1089,18 +1089,6 @@ def db_collection_for_comparison_for(references)
full_collection_for_comparison.where(targeted_selection_for(references))
end

# Builds an ActiveRecord::Relation that can fetch complement of all the references from the DB
#
# @param manager_uuids_set [Array<String>] passed references
# @return [ActiveRecord::Relation] relation that can fetch complement of all the references from the DB
def db_collection_for_comparison_for_complement_of(manager_uuids_set)
# TODO(lsmola) this should have the build_multi_selection_condition, like in the method above
# TODO(lsmola) this query will be highly ineffective, we will try approach with updating a timestamp of all
# records, then we can get list of all records that were not update. That would be equivalent to result of this
# more effective query and without need of all manager_uuids
full_collection_for_comparison.where.not(manager_ref.first => manager_uuids_set)
end

# @return [ActiveRecord::Relation] relation that can fetch all the references from the DB
def full_collection_for_comparison
return arel unless arel.nil?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ManagerRefresh
class InventoryCollection
class Serialization
delegate :all_manager_uuids,
:all_manager_uuids=,
:build,
:targeted_scope,
:data,
Expand Down Expand Up @@ -35,8 +36,7 @@ def from_hash(inventory_objects_data, available_inventory_collections)
skeletal_primary_index.build(hash_to_data(inventory_object_data, available_inventory_collections).symbolize_keys!)
end

# TODO(lsmola) add support for all_manager_uuids serialization
# self.all_manager_uuids = inventory_objects_data['all_manager_uuids']
self.all_manager_uuids = inventory_objects_data['all_manager_uuids']
end

# Serializes InventoryCollection's data storage into Array of Hashes
Expand Down
5 changes: 5 additions & 0 deletions app/models/manager_refresh/inventory_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def manager_uuid
reference.stringified_reference
end

# @return [Hash] hash reference having :manager_ref keys, which can uniquely identity entity under a manager
def uuid
reference.full_reference.slice(*reference.keys)
end

# @return [ManagerRefresh::InventoryObject] returns self
def load
self
Expand Down
23 changes: 12 additions & 11 deletions app/models/manager_refresh/save_collection/saver/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,21 +201,22 @@ def delete_complement

_log.debug("Processing :delete_complement of #{inventory_collection} of size "\
"#{all_manager_uuids_size}...")
deleted_counter = 0

inventory_collection.db_collection_for_comparison_for_complement_of(
inventory_collection.all_manager_uuids
).find_in_batches do |batch|
ActiveRecord::Base.transaction do
batch.each do |record|
record.public_send(inventory_collection.delete_method)
deleted_counter += 1
end
end
query = complement_of!(inventory_collection.all_manager_uuids)

ids_of_non_active_entities = ActiveRecord::Base.connection.execute(query.to_sql).to_a
# TODO(lsmola) we should allow only archiving, which is done via update, then the batch can be bigger. Without
# batch archiving, this is a big bottleneck.
ids_of_non_active_entities.each_slice(1000) do |batch|
destroy_records!(batch, :id_extractor => :hash_primary_key_value)
end

_log.debug("Processing :delete_complement of #{inventory_collection} of size "\
"#{all_manager_uuids_size}, deleted=#{deleted_counter}...Complete")
"#{all_manager_uuids_size}, deleted=#{deleted_records.size}...Complete")
end

def hash_primary_key_value(hash)
hash[primary_key]
end

# Deletes/soft-deletes a given record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,23 @@ def format_value(attribute, value)
#
# @param records [Array<ApplicationRecord, Hash>] Records we want to delete. If we have only hashes, we need to
# to fetch ApplicationRecord objects from the DB
def destroy_records!(records)
def destroy_records!(records, id_extractor: :record_primary_key_value)
return false unless inventory_collection.delete_allowed?
return if records.blank?

# Is the delete_method rails standard deleting method?
rails_delete = %i(destroy delete).include?(inventory_collection.delete_method)
if !rails_delete && inventory_collection.model_class.respond_to?(inventory_collection.delete_method)
# We have custom delete method defined on a class, that means it supports batch destroy
inventory_collection.store_deleted_records(records.map { |x| {:id => record_key(x, primary_key)} })
inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map { |x| record_key(x, primary_key) })
inventory_collection.store_deleted_records(records.map { |x| {:id => send(id_extractor, x) } })
inventory_collection.model_class.public_send(inventory_collection.delete_method, records.map { |x| send(id_extractor, x) })
else
# We have either standard :destroy and :delete rails method, or custom instance level delete method
# Note: The standard :destroy and :delete rails method can't be batched because of the hooks and cascade destroy
ActiveRecord::Base.transaction do
if pure_sql_records_fetching
if pure_sql_records_fetching || id_extractor != :record_primary_key_value
# For pure SQL fetching, we need to get the AR objects again, so we can call destroy
inventory_collection.model_class.where(:id => records.map { |x| record_key(x, primary_key) }).find_each do |record|
inventory_collection.model_class.where(:id => records.map { |x| send(id_extractor, x) }).find_each do |record|
delete_record!(record)
end
else
Expand All @@ -291,6 +291,10 @@ def destroy_records!(records)
end
end

def record_primary_key_value(record)
record_key(record, primary_key)
end

# Batch updates existing records
#
# @param hashes [Array<Hash>] data used for building a batch update sql query
Expand Down
61 changes: 61 additions & 0 deletions app/models/manager_refresh/save_collection/saver/sql_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,67 @@ def pg_type_cast(value, sql_type)
"#{value}::#{sql_type}"
end
end

# Effective way of doing multiselect
#
# If we use "(col1, col2) IN [(a,e), (b,f), (b,e)]" it's not great, just with 10k batch, we see
# *** ActiveRecord::StatementInvalid Exception: PG::StatementTooComplex: ERROR: stack depth limit exceeded
# HINT: Increase the configuration parameter "max_stack_depth" (currently 2048kB), after ensuring the
# platform's stack depth limit is adequate.
#
# If we use "(col1 = a AND col2 = e) OR (col1 = b AND col2 = f) OR (col1 = b AND col2 = e)" with 10k batch, it
# takes about 6s and consumes 300MB, with 100k it takes ~1h and consume 3GB in Postgre process
#
# The best way seems to be using CTE, where the list of values we want to map is turned to 'table' and we just
# do RIGHT OUTER JOIN to get the complement of given identifiers. Tested on getting complement of 100k items,
# using 2 cols (:ems_ref and :uid_ems) from total 150k rows. It takes ~1s and 350MB in Postgre process
#
# @param manager_uuids [Array<String>, Array[Hash]] Array with manager_uuids of entities. The keys have to match
# inventory_collection.manager_ref. We allow passing just array of strings, if manager_ref.size ==1, to
# spare some memory
# @return [Arel::SelectManager] Arel for getting complement of uuids. This method modifies the passed
# manager_uuids to spare some memory
def complement_of!(manager_uuids)
# manager_uuids = inventory_collection.full_collection_for_comparison.select(:id, :ems_ref, :uid_ems).limit(100000).to_a.map {|x| {"ems_ref" => x.ems_ref, "uid_ems" => x.uid_ems} }

connection = ActiveRecord::Base.connection
all_attribute_keys = inventory_collection.manager_ref
all_attribute_keys_array = inventory_collection.manager_ref.map(&:to_s)
all_attribute_keys_array_q = all_attribute_keys_array.map { |x| quote_column_name(x) }

# For Postgre, only first set of values should contain the type casts
first_value = manager_uuids.shift.to_h
first_value = "(#{all_attribute_keys_array.map { |x| quote(connection, first_value[x], x, true) }.join(",")})"

# Rest of the values, without the type cast
values = manager_uuids.map! do |hash|
"(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x, false) }.join(",")})"
end.join(",")

values = [first_value, values].join(",")
active_entities_query = <<-SQL
SELECT *
FROM (VALUES #{values}) AS active_entities_table(#{all_attribute_keys_array_q.join(",")})
SQL

active_entities = Arel::Table.new(:active_entities)
active_entities_cte = Arel::Nodes::As.new(active_entities, Arel.sql("(#{active_entities_query})"))
all_entities = Arel::Table.new(:all_entities)
all_entities_cte = Arel::Nodes::As.new(
all_entities,
Arel.sql("(#{inventory_collection.full_collection_for_comparison.select(:id, *all_attribute_keys_array).to_sql})")
)

join_condition = all_attribute_keys.map { |key| active_entities[key].eq(all_entities[key]) }.inject(:and)
where_condition = all_attribute_keys.map { |key| active_entities[key].eq(nil) }.inject(:and)

active_entities
.project(all_entities[:id])
.join(all_entities, Arel::Nodes::RightOuterJoin)
.on(join_condition)
.with(active_entities_cte, all_entities_cte)
.where(where_condition)
end
end
end
end

0 comments on commit cd76c80

Please sign in to comment.