forked from ManageIQ/manageiq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql_helper_upsert.rb
196 lines (168 loc) · 8.78 KB
/
sql_helper_upsert.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
module ManagerRefresh::SaveCollection
module Saver
module SqlHelperUpsert
# Builds ON CONFLICT UPDATE updating branch for one column identified by the passed key
#
# @param key [Symbol] key that is column name
# @return [String] SQL clause for upserting one column
def build_insert_set_cols(key)
"#{quote_column_name(key)} = EXCLUDED.#{quote_column_name(key)}"
end
# @param all_attribute_keys [Array<Symbol>] Array of all columns we will be saving into each table row
# @param hashes [Array<Hash>] data used for building a batch insert sql query
# @param mode [Symbol] Mode for saving, allowed values are [:full, :partial], :full is when we save all
# columns of a row, :partial is when we save only few columns, so a partial row.
# @param on_conflict [Symbol, NilClass] defines behavior on conflict with unique index constraint, allowed values
# are :do_update, :do_nothing, nil
def build_insert_query(all_attribute_keys, hashes, on_conflict: nil, mode:, column_name: nil)
_log.debug("Building insert query for #{inventory_collection} of size #{inventory_collection.size}...")
# Cache the connection for the batch
connection = get_connection
# Ignore versioning columns that are set separately
ignore_cols = mode == :partial ? [:resource_timestamp, :resource_version] : []
# Make sure we don't send a primary_key for INSERT in any form, it could break PG sequencer
all_attribute_keys_array = all_attribute_keys.to_a - [primary_key.to_s, primary_key.to_sym] - ignore_cols
insert_query = insert_query_insert_values(hashes, all_attribute_keys_array, connection)
insert_query += insert_query_on_conflict_behavior(all_attribute_keys, on_conflict, mode, ignore_cols, column_name)
insert_query += insert_query_returning
_log.debug("Building insert query for #{inventory_collection} of size #{inventory_collection.size}...Complete")
insert_query
end
private
def insert_query_insert_values(hashes, all_attribute_keys_array, connection)
values = hashes.map do |hash|
"(#{all_attribute_keys_array.map { |x| quote(connection, hash[x], x) }.join(",")})"
end.join(",")
col_names = all_attribute_keys_array.map { |x| quote_column_name(x) }.join(",")
<<-SQL
INSERT INTO #{q_table_name} (#{col_names})
VALUES
#{values}
SQL
end
def insert_query_on_conflict_behavior(all_attribute_keys, on_conflict, mode, ignore_cols, column_name)
return "" unless inventory_collection.parallel_safe?
insert_query_on_conflict = insert_query_on_conflict_do(on_conflict)
if on_conflict == :do_update
insert_query_on_conflict += insert_query_on_conflict_update(all_attribute_keys, mode, ignore_cols, column_name)
end
insert_query_on_conflict
end
def insert_query_on_conflict_do(on_conflict)
if on_conflict == :do_nothing
<<-SQL
ON CONFLICT DO NOTHING
SQL
elsif on_conflict == :do_update
index_where_condition = unique_index_for(unique_index_keys).where
where_to_sql = index_where_condition ? "WHERE #{index_where_condition}" : ""
<<-SQL
ON CONFLICT (#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}) #{where_to_sql}
DO
UPDATE
SQL
end
end
def insert_query_on_conflict_update(all_attribute_keys, mode, ignore_cols, column_name)
if mode == :partial
ignore_cols += [:resource_timestamps, :resource_timestamps_max, :resource_versions, :resource_versions_max]
end
ignore_cols += [:created_on, :created_at] # Lets not change created for the update clause
# If there is not version attribute, the update part will be ignored below
version_attribute = if supports_remote_data_timestamp?(all_attribute_keys)
:resource_timestamp
elsif supports_remote_data_version?(all_attribute_keys)
:resource_version
end
# TODO(lsmola) should we add :deleted => false to the update clause? That should handle a reconnect, without a
# a need to list :deleted anywhere in the parser. We just need to check that a model has the :deleted attribute
query = <<-SQL
SET #{(all_attribute_keys - ignore_cols).map { |key| build_insert_set_cols(key) }.join(", ")}
SQL
# This conditional will make sure we are avoiding rewriting new data by old data. But we want it only when
# remote_data_timestamp is a part of the data.
query += insert_query_on_conflict_update_mode(mode, version_attribute, column_name) if version_attribute
query
end
def insert_query_on_conflict_update_mode(mode, version_attribute, column_name)
if mode == :full
full_update_condition(version_attribute)
elsif mode == :partial
raise "Column name must be provided" unless column_name
partial_update_condition(version_attribute, column_name)
end
end
def full_update_condition(attr_full)
attr_partial = attr_full.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps
attr_partial_max = "#{attr_partial}_max"
# Quote the column names
attr_full = quote_column_name(attr_full)
attr_partial = quote_column_name(attr_partial)
attr_partial_max = quote_column_name(attr_partial_max)
<<-SQL
, #{attr_partial} = '{}', #{attr_partial_max} = NULL
WHERE EXCLUDED.#{attr_full} IS NULL OR (
(#{q_table_name}.#{attr_full} IS NULL OR EXCLUDED.#{attr_full} > #{q_table_name}.#{attr_full}) AND
(#{q_table_name}.#{attr_partial_max} IS NULL OR EXCLUDED.#{attr_full} >= #{q_table_name}.#{attr_partial_max})
)
SQL
end
def partial_update_condition(attr_full, column_name)
attr_partial = attr_full.to_s.pluralize # Changes resource_version/timestamp to resource_versions/timestamps
attr_partial_max = "#{attr_partial}_max"
cast = if attr_full == :resource_timestamp
"timestamp"
elsif attr_full == :resource_version
"integer"
end
# Quote the column names
attr_full = quote_column_name(attr_full)
attr_partial = quote_column_name(attr_partial)
attr_partial_max = quote_column_name(attr_partial_max)
column_name = get_connection.quote_string(column_name.to_s)
q_table_name = get_connection.quote_table_name(table_name)
<<-SQL
#{insert_query_set_jsonb_version(cast, attr_partial, attr_partial_max, column_name)}
, #{attr_partial_max} = greatest(#{q_table_name}.#{attr_partial_max}::#{cast}, EXCLUDED.#{attr_partial_max}::#{cast})
WHERE EXCLUDED.#{attr_partial_max} IS NULL OR (
(#{q_table_name}.#{attr_full} IS NULL OR EXCLUDED.#{attr_partial_max} > #{q_table_name}.#{attr_full}) AND (
(#{q_table_name}.#{attr_partial}->>'#{column_name}')::#{cast} IS NULL OR
EXCLUDED.#{attr_partial_max}::#{cast} > (#{q_table_name}.#{attr_partial}->>'#{column_name}')::#{cast}
)
)
SQL
end
def insert_query_set_jsonb_version(cast, attr_partial, attr_partial_max, column_name)
if cast == "integer"
# If we have integer value, we don't want to encapsulate the value in ""
<<-SQL
, #{attr_partial} = #{q_table_name}.#{attr_partial} || ('{"#{column_name}": ' || EXCLUDED.#{attr_partial_max}::#{cast} || '}')::jsonb
SQL
else
<<-SQL
, #{attr_partial} = #{q_table_name}.#{attr_partial} || ('{"#{column_name}": "' || EXCLUDED.#{attr_partial_max}::#{cast} || '"}')::jsonb
SQL
end
end
def insert_query_returning
<<-SQL
RETURNING "id",#{unique_index_columns.map { |x| quote_column_name(x) }.join(",")}
#{insert_query_returning_timestamps}
SQL
end
def insert_query_returning_timestamps
if inventory_collection.parallel_safe?
# For upsert, we'll return also created and updated timestamps, so we can recognize what was created and what
# updated
if inventory_collection.internal_timestamp_columns.present?
<<-SQL
, #{inventory_collection.internal_timestamp_columns.map { |x| quote_column_name(x) }.join(",")}
SQL
end
else
""
end
end
end
end
end