Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch saving strategy that does not require unique indexes #15627

17 changes: 12 additions & 5 deletions app/models/manager_refresh/inventory_collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class InventoryCollection
:inventory_object_attributes, :name, :saver_strategy, :parent_inventory_collections, :manager_uuids,
:skeletal_manager_uuids, :targeted_arel, :targeted, :manager_ref_allowed_nil, :use_ar_object,
:secondary_refs, :secondary_indexes, :created_records, :updated_records, :deleted_records,
:custom_reconnect_block
:custom_reconnect_block, :batch_extra_attributes

delegate :each, :size, :to => :to_a

Expand Down Expand Up @@ -321,6 +321,8 @@ class InventoryCollection
# Allowed saver strategies are:
# - :default => Using Rails saving methods, this way is not safe to run in multiple workers concurrently,
# since it will lead to non consistent data.
# - :batch => Using batch SQL queries, this way is not safe to run in multiple workers
# concurrently, since it will lead to non consistent data.
# - :concurrent_safe => This method is designed for concurrent saving. It uses atomic upsert to avoid
# data duplication and it uses timestamp based atomic checks to avoid new data being overwritten by the
# the old data.
Expand Down Expand Up @@ -368,14 +370,18 @@ class InventoryCollection
# @param use_ar_object [Boolean] True or False. Whether we need to initialize AR object as part of the saving
# it's needed if the model have special setters, serialize of columns, etc. This setting is relevant only
# for the batch saver strategy.
# @param batch_extra_attributes [Array] Array of symbols marking which extra attributes we want to store into the
# db. These extra attributes might be a product of :use_ar_object assignment and we need to specify them
# manually, if we want to use a batch saving strategy and we have models that populate attributes as a side
# effect.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ladas do you have an example of needing this currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for vm, when we change :raw_power_state, other attributes are set, so we need to have

:batch_extra_attributes => [:power_state, :state_changed_on, :previous_state],

hopefully this can be autodiscovered later, while passing the same specs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks

def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil, strategy: nil, saved: nil,
custom_save_block: nil, delete_method: nil, data_index: nil, data: nil, dependency_attributes: nil,
attributes_blacklist: nil, attributes_whitelist: nil, complete: nil, update_only: nil,
check_changed: nil, custom_manager_uuid: nil, custom_db_finder: nil, arel: nil, builder_params: {},
inventory_object_attributes: nil, unique_index_columns: nil, name: nil, saver_strategy: nil,
parent_inventory_collections: nil, manager_uuids: [], all_manager_uuids: nil, targeted_arel: nil,
targeted: nil, manager_ref_allowed_nil: nil, secondary_refs: {}, use_ar_object: nil,
custom_reconnect_block: nil)
custom_reconnect_block: nil, batch_extra_attributes: [])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there are a ridiculous number of parameters to this initialize method. Sandy Metz recommends no more than 4. Feels like this is a bad pattern that can be refactored in future PRs. /cc @agrare

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chessbyte indeed, we are at the point where this should be a final list, that should cover all the crazy corner cases we have. :-)

So next will be to refactor these, part will got to the Settings, rest should be divided into more objects we will pass here(also consuming settings). There are already several areas we can group, e..g Saver, DatabaseLoader, AttributesBuilder, RecordsMatcher, etc.

@model_class = model_class
@manager_ref = manager_ref || [:ems_ref]
@secondary_refs = secondary_refs
Expand All @@ -402,6 +408,7 @@ def initialize(model_class: nil, manager_ref: nil, association: nil, parent: nil
@name = name || association || model_class.to_s.demodulize.tableize
@saver_strategy = process_saver_strategy(saver_strategy)
@use_ar_object = use_ar_object || false
@batch_extra_attributes = batch_extra_attributes

@manager_ref_allowed_nil = manager_ref_allowed_nil || []

Expand Down Expand Up @@ -502,11 +509,11 @@ def process_saver_strategy(saver_strategy)
return :default unless saver_strategy

case saver_strategy
when :default, :concurrent_safe, :concurrent_safe_batch
when :default, :batch, :concurrent_safe, :concurrent_safe_batch
saver_strategy
else
raise "Unknown InventoryCollection saver strategy: :#{saver_strategy}, allowed strategies are "\
":default, :concurrent_safe and :concurrent_safe_batch"
":default, :batch, :concurrent_safe and :concurrent_safe_batch"
end
end

Expand Down Expand Up @@ -996,7 +1003,7 @@ def full_collection_for_comparison

# Returns array of records identities
def records_identities(records)
records = [records] unless records.kind_of?(Array)
records = [records] unless records.respond_to?(:map)
records.map { |record| record_identity(record) }
end

Expand Down
15 changes: 15 additions & 0 deletions app/models/manager_refresh/save_collection/saver/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module ManagerRefresh::SaveCollection
module Saver
class Batch < ManagerRefresh::SaveCollection::Saver::ConcurrentSafeBatch
private

def unique_index_columns
inventory_collection.manager_ref_to_cols.map(&:to_sym)
end

def on_conflict_update
false
Copy link
Member

@agrare agrare Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we do on_conflict_update with this strategy?

Scratch that, needs unique indexes https://www.postgresql.org/docs/9.5/static/sql-insert.html#SQL-ON-CONFLICT

end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class ConcurrentSafeBatch < ManagerRefresh::SaveCollection::Saver::Base
def save!(inventory_collection, association)
attributes_index = {}
inventory_objects_index = {}
all_attribute_keys = Set.new
all_attribute_keys = Set.new + inventory_collection.batch_extra_attributes

inventory_collection.each do |inventory_object|
attributes = inventory_object.attributes(inventory_collection)
Expand Down Expand Up @@ -58,7 +58,8 @@ def save!(inventory_collection, association)
assign_attributes_for_update!(hash_for_update, inventory_collection, update_time)
inventory_collection.store_updated_records(record)

hashes_for_update << hash_for_update.except(:id, :type)
hash_for_update[:id] = record.id
hashes_for_update << hash_for_update.except(:type)
end
end

Expand All @@ -72,7 +73,6 @@ def save!(inventory_collection, association)
# Destroy in batches
if records_for_destroy.size >= batch_size
destroy_records(records_for_destroy)
inventory_collection.store_deleted_records(records_for_destroy)
records_for_destroy = []
end
end
Expand All @@ -83,7 +83,6 @@ def save!(inventory_collection, association)

# Destroy the last batch
destroy_records(records_for_destroy)
inventory_collection.store_deleted_records(records_for_destroy)
records_for_destroy = [] # Cleanup so GC can release it sooner

all_attribute_keys << :type if inventory_collection.supports_sti?
Expand Down Expand Up @@ -135,7 +134,7 @@ def create_records!(inventory_collection, all_attribute_keys, batch, attributes_

hashes << hash
# Index on Unique Columns values, so we can easily fill in the :id later
indexed_inventory_objects[inventory_collection.unique_index_columns.map { |x| hash[x] }] = inventory_object
indexed_inventory_objects[unique_index_columns.map { |x| hash[x] }] = inventory_object
end

return if hashes.blank?
Expand Down Expand Up @@ -166,15 +165,15 @@ def map_ids_to_inventory_objects(inventory_collection, indexed_inventory_objects
# for every remainders(a last batch in a stream of batches)
if !supports_remote_data_timestamp?(all_attribute_keys) || result.count == batch_size
result.each do |inserted_record|
key = inventory_collection.unique_index_columns.map { |x| inserted_record[x.to_s] }
key = unique_index_columns.map { |x| inserted_record[x.to_s] }
inventory_object = indexed_inventory_objects[key]
inventory_object.id = inserted_record["id"] if inventory_object
end
else
inventory_collection.model_class.where(
build_multi_selection_query(inventory_collection, hashes)
).select(inventory_collection.unique_index_columns + [:id]).each do |inserted_record|
key = inventory_collection.unique_index_columns.map { |x| inserted_record.public_send(x) }
).select(unique_index_columns + [:id]).each do |inserted_record|
key = unique_index_columns.map { |x| inserted_record.public_send(x) }
inventory_object = indexed_inventory_objects[key]
inventory_object.id = inserted_record.id if inventory_object
end
Expand Down
39 changes: 23 additions & 16 deletions app/models/manager_refresh/save_collection/saver/sql_helper.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
module ManagerRefresh::SaveCollection
module Saver
module SqlHelper
def unique_index_columns
inventory_collection.unique_index_columns
end

def on_conflict_update
true
end

# TODO(lsmola) all below methods should be rewritten to arel, but we need to first extend arel to be able to do
# this
def build_insert_set_cols(key)
Expand All @@ -19,12 +27,17 @@ def build_insert_query(inventory_collection, all_attribute_keys, hashes)
INSERT INTO #{table_name} (#{col_names})
VALUES
#{values}
ON CONFLICT (#{inventory_collection.unique_index_columns.map { |x| quote_column_name(x) }.join(",")})
DO
UPDATE
SET #{all_attribute_keys_array.map { |key| build_insert_set_cols(key) }.join(", ")}
}

if on_conflict_update
insert_query += %{
ON CONFLICT (#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")})
DO
UPDATE
SET #{all_attribute_keys_array.map { |key| build_insert_set_cols(key) }.join(", ")}
}
end

# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
# the ems_id as a cross manager migration, since ems_id should be there as part of the insert. The attempt of
# changing ems_id could lead to putting it back by a refresh.
Expand All @@ -39,11 +52,9 @@ def build_insert_query(inventory_collection, all_attribute_keys, hashes)
}
end

if inventory_collection.dependees.present?
insert_query += %{
RETURNING id,#{inventory_collection.unique_index_columns.map { |x| quote_column_name(x) }.join(",")}
}
end
insert_query += %{
RETURNING "id",#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}
}

insert_query
end
Expand All @@ -59,17 +70,13 @@ def quote_column_name(key)
def build_update_query(inventory_collection, all_attribute_keys, hashes)
# We want to ignore type and create timestamps when updating
all_attribute_keys_array = all_attribute_keys.to_a.delete_if { |x| %i(type created_at created_on).include?(x) }
all_attribute_keys_array << :id
table_name = inventory_collection.model_class.table_name
used_unique_index_keys = inventory_collection.unique_index_columns & all_attribute_keys.to_a

values = hashes.map do |hash|
"(#{all_attribute_keys_array.map { |x| quote(hash[x], x, inventory_collection) }.join(",")})"
end.join(",")

where_cond = used_unique_index_keys.map do |x|
"updated_values.#{quote_column_name(x)} = #{table_name}.#{quote_column_name(x)}"
end.join(" AND ")

update_query = %{
UPDATE #{table_name}
SET
Expand All @@ -78,7 +85,7 @@ def build_update_query(inventory_collection, all_attribute_keys, hashes)
VALUES
#{values}
) AS updated_values (#{all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",")})
WHERE #{where_cond}
WHERE updated_values.id = #{table_name}.id
}

# TODO(lsmola) do we want to exclude the ems_id from the UPDATE clause? Otherwise it might be difficult to change
Expand All @@ -96,7 +103,7 @@ def build_update_query(inventory_collection, all_attribute_keys, hashes)
end

def build_multi_selection_query(inventory_collection, hashes)
inventory_collection.build_multi_selection_condition(hashes, inventory_collection.unique_index_columns)
inventory_collection.build_multi_selection_condition(hashes, unique_index_columns)
end

def quote(value, name = nil, inventory_collection = nil)
Expand Down