Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve metrics saving #15976

Merged
merged 12 commits into from
Sep 26, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
next unless assert_distinct_relation(primary_key_value)

# TODO(lsmola) unify this behavior with object_index_with_keys method in InventoryCollection
index = unique_index_keys_to_s.map { |key| record_key(record, key).to_s }.join(inventory_collection.stringify_joiner)
index = unique_index_keys_to_s.map do |attribute|
if attribute == "timestamp"
type = model_class.type_for_attribute(attribute)
type.cast(record_key(record, attribute)).utc.iso8601.to_s
else
record_key(record, attribute).to_s
end
end.join(inventory_collection.stringify_joiner)

inventory_object = inventory_objects_index.delete(index)
hash = attributes_index.delete(index)

Expand All @@ -122,9 +130,13 @@ def update_or_destroy_records!(records_batch_iterator, inventory_objects_index,
values_for_database!(all_attribute_keys,
record.attributes.symbolize_keys)
elsif serializable_keys?
# TODO(lsmola) hash data with current DB data to allow subset of data being sent,
# otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
values_for_database!(all_attribute_keys,
hash)
else
# TODO(lsmola) hash data with current DB data to allow subset of data being sent,
# otherwise we would nullify the not sent attributes. Test e.g. on disks in cloud
hash
end
assign_attributes_for_update!(hash_for_update, update_time)
Expand Down
8 changes: 8 additions & 0 deletions app/models/metric/ci_mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,12 @@ def get_daily_time_profile_in_my_region_from_tz(tz)
def log_target
"#{self.class.name} name: [#{name}], id: [#{id}]"
end

def log_specific_target(target)
"#{target.class.name} name: [#{target.name}], id: [#{target.id}]"
end

def log_specific_targets(targets)
targets.map { |target| log_specific_target(target) }.join(" | ")
end
end
11 changes: 10 additions & 1 deletion app/models/metric/ci_mixin/capture.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,16 @@ def perf_capture(interval_name, start_time = nil, end_time = nil)
:start_range => start_range
)
end
perf_process(interval_name, start_range, end_range, counters, counter_values)

# Convert to format allowing to send multiple resources at once
counters_data = {
Copy link
Contributor Author

@Ladas Ladas Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agrare @kbrock so something like this can easily go through a queue, having multiple resources inside

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this will be perfect to queue perf_process for the metrics_processor_worker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool 👍

For 5.0, I would like to move to the generic Persistor format, so we can reuse inventory Persistors (and using grouping, we should be able to make generic worker a dedicated worker?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are going over to using the queue before 5.0 - so would be nice to go over to the generic Persistor format within the next week or so here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbrock for 4.6, I think we will just keep this format. Generic Persistors are 5.0 thing, right @agrare ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this format is fine since we're going to be invoking perf_process directly in the metrics_processor_worker not some generic persister.

[self.class.base_class.name, self.id] => {
:counters => counters,
:counter_values => counter_values
}
}

perf_process(interval_name, start_range, end_range, counters_data)
end
end

Expand Down
218 changes: 153 additions & 65 deletions app/models/metric/ci_mixin/processing.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Metric::CiMixin::Processing
def perf_process(interval_name, start_time, end_time, counters, counter_values)
def perf_process(interval_name, start_time, end_time, counters_data)
unless Metric::Capture::VALID_CAPTURE_INTERVALS.include?(interval_name)
raise ArgumentError, _("invalid interval_name '%{name}'") % {:name => interval_name}
end
Expand All @@ -10,93 +10,181 @@ def perf_process(interval_name, start_time, end_time, counters, counter_values)
interval_name = 'hourly' if interval_name == 'historical'

affected_timestamps = []
_log.info("#{log_header} Processing for #{log_target}, for range [#{start_time} - #{end_time}]...")

transform_resources!(counters_data)
resources = counters_data.keys

_log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...")

dummy, t = Benchmark.realtime_block(:total_time) do
# Take the raw metrics and create hashes out of them
rt_rows = {}
Benchmark.realtime_block(:process_counter_values) do
counter_values.each do |ts, cv|
affected_timestamps << ts
ts = Metric::Helper.nearest_realtime_timestamp(ts) if interval_name == 'realtime'

col_vals = {}
cv.each do |counter_id, value|
counter = counters[counter_id]
next if counter.nil? || counter[:capture_interval_name] != interval_name

col = counter[:counter_key].to_sym
unless Metric.column_names_symbols.include?(col)
_log.debug("#{log_header} Column [#{col}] is not defined, skipping")
next
end

col_vals.store_path(col, counter[:instance], [value, counter])
end
counters_data.each do |resource, data|
counters = data[:counters]
counter_values = data[:counter_values]

Benchmark.realtime_block(:process_counter_values) do
counter_values.each do |ts, cv|
affected_timestamps << ts
ts = Metric::Helper.nearest_realtime_timestamp(ts) if interval_name == 'realtime'

col_vals.each do |col, values_by_instance|
# If there are multiple instances for a column, use the aggregate
# instance, if available, otherwise roll it up ourselves.
value, counter = values_by_instance[""]
if value.nil?
value = 0
counter = nil
values_by_instance.each_value { |v, c| value += v; counter = c }
col_vals = {}
cv.each do |counter_id, value|
counter = counters[counter_id]
next if counter.nil? || counter[:capture_interval_name] != interval_name

col = counter[:counter_key].to_sym
unless Metric.column_names_symbols.include?(col)
_log.debug("#{log_header} Column [#{col}] is not defined, skipping")
next
end

col_vals.store_path(col, counter[:instance], [value, counter])
end

# Create the hashes for the rows
rt = (rt_rows[ts] ||= {
:capture_interval_name => interval_name,
:capture_interval => counter[:capture_interval],
:resource_name => name,
:timestamp => ts
})
rt[col], message = normalize_value(value, counter)
_log.warn("#{log_header} #{log_target} Timestamp: [#{ts}], Column [#{col}]: '#{message}'") if message
col_vals.each do |col, values_by_instance|
# If there are multiple instances for a column, use the aggregate
# instance, if available, otherwise roll it up ourselves.
value, counter = values_by_instance[""]
if value.nil?
value = 0
counter = nil
values_by_instance.each_value do |v, c|
value += v
counter = c
end
end

# Create the hashes for the rows
rt = (rt_rows[ts] ||= {
:capture_interval_name => interval_name,
:capture_interval => counter[:capture_interval],
:resource => resource,
:resource_name => resource.name,
:timestamp => ts
})
rt[col], message = normalize_value(value, counter)
_log.warn("#{log_header} #{log_target} Timestamp: [#{ts}], Column [#{col}]: '#{message}'") if message
end
end
end
end

ActiveMetrics::Base.connection.write_multiple(
rt_rows.flat_map do |ts, rt|
rt.merge!(Metric::Processing.process_derived_columns(self, rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
rt.delete_nils
rt_tags = rt.slice(:capture_interval_name, :capture_interval, :resource_name).symbolize_keys
rt_fields = rt.except(:capture_interval_name,
:capture_interval,
:resource_name,
:timestamp,
:instance_id,
:class_name,
:resource_type,
:resource_id)

rt_fields.map do |k, v|
{
:timestamp => ts,
:metric_name => k,
:value => v,
:resource => self,
:tags => rt_tags
}
end
end
)
parameters = if ActiveMetrics::Base.connection.kind_of?(ActiveMetrics::ConnectionAdapters::MiqPostgresAdapter)
# We can just pass original data to PG, with metrics grouped by timestamps, since that is how
# we store to PG now. It will spare quite some memory and time to not convert it to row_per_metric
# and than back to original format.
transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
else
transform_parameters_row_per_metric(resources, interval_name, start_time, end_time, rt_rows)
end

ActiveMetrics::Base.connection.write_multiple(parameters)

update_attribute(:last_perf_capture_on, end_time) if last_perf_capture_on.nil? || last_perf_capture_on.utc.iso8601 < end_time
resources.each do |resource|
resource.update_attribute(:last_perf_capture_on, end_time) if resource.last_perf_capture_on.nil? || resource.last_perf_capture_on.utc.iso8601 < end_time
end

# Raise <class>_perf_complete alert event if realtime so alerts can be evaluated.
MiqEvent.raise_evm_alert_event_queue(self, MiqEvent.event_name_for_target(self, "perf_complete"))
resources.each do |resource|
MiqEvent.raise_evm_alert_event_queue(resource, MiqEvent.event_name_for_target(resource, "perf_complete"))
end

perf_rollup_to_parents(interval_orig, start_time, end_time)
resources.each do |resource|
resource.perf_rollup_to_parents(interval_orig, start_time, end_time)
end
end
_log.info("#{log_header} Processing for #{log_target}, for range [#{start_time} - #{end_time}]...Complete - Timings: #{t.inspect}")
_log.info("#{log_header} Processing for #{log_specific_targets(resources)}, for range [#{start_time} - #{end_time}]...Complete - Timings: #{t.inspect}")

affected_timestamps
end

private

def transform_resources!(counters_data)
# Fetch ActiveRecord object by 1 query per Model
grouped_resource_refs = counters_data.keys.each_with_object({}) { |x, obj| (obj[x.first] ||= []) << x.second }
fetched_records = grouped_resource_refs.keys.each_with_object({}) do |x, obj|
x.constantize.where(:id => grouped_resource_refs[x]).each { |rec| obj[[x, rec.id]] = rec }
end
# Transforming [Class, id] that were sent via the counters_data into the ActiveRecord objects
counters_data.transform_keys! { |x| fetched_records[x] }
end

def transform_parameters_row_per_metric(_resources, interval_name, _start_time, _end_time, rt_rows)
rt_rows.flat_map do |ts, rt|
rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
rt.delete_nils
rt_tags = rt.slice(:capture_interval_name, :capture_interval, :resource_name).symbolize_keys
rt_fields = rt.except(:capture_interval_name,
:capture_interval,
:resource_name,
:timestamp,
:instance_id,
:class_name,
:resource,
:resource_type,
:resource_id)

rt_fields.map do |k, v|
{
:timestamp => ts,
:metric_name => k,
:value => v,
:resource => rt[:resource],
:tags => rt_tags
}
end
end
end

def transform_parameters_row_with_all_metrics(resources, interval_name, start_time, end_time, rt_rows)
obj_perfs, = Benchmark.realtime_block(:db_find_prev_perfs) do
Metric::Finders.find_all_by_range(resources, start_time, end_time, interval_name).find_each.each_with_object({}) do |p, h|
data, = Benchmark.realtime_block(:get_attributes) do
# TODO(lsmola) calling .attributes takes more time than actually saving all the samples, try to fetch pure
# arrays from the PG
p.attributes.delete_nils
end
h.store_path([p.resource_type, p.resource_id, p.capture_interval_name, p.timestamp.utc.iso8601], data.symbolize_keys)
end
end

Benchmark.realtime_block(:preload_vim_performance_state_for_ts) do
# Make sure we preload all vim_performance_state_for_ts to avoid n+1 queries
condition = if start_time.nil?
nil
elsif start_time == end_time
{:timestamp => start_time}
elsif end_time.nil?
VimPerformanceState.arel_table[:timestamp].gteq(start_time)
else
{:timestamp => start_time..end_time}
end

resources.each { |r| r.preload_vim_performance_state_for_ts_iso8601(condition) }
end

Benchmark.realtime_block(:process_perfs) do
rt_rows.each do |ts, rt|
rt[:resource_id] = rt[:resource].id
rt[:resource_type] = rt[:resource].class.base_class.name

if (perf = obj_perfs.fetch_path([rt[:resource_type], rt[:resource_id], interval_name, ts]))
rt.reverse_merge!(perf)
rt.delete(:id) # Remove protected attributes
end

rt.merge!(Metric::Processing.process_derived_columns(rt[:resource], rt, interval_name == 'realtime' ? Metric::Helper.nearest_hourly_timestamp(ts) : nil))
end
end
# Assign nil so GC can clean it up
obj_perfs = nil

return resources, interval_name, start_time, end_time, rt_rows
end

def normalize_value(value, counter)
return counter[:rollup] == 'latest' ? nil : 0 if value < 0
value = value.to_f * counter[:precision]
Expand Down
7 changes: 7 additions & 0 deletions app/models/metric/ci_mixin/state_finders.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ def vim_performance_state_for_ts(ts)
@states_by_ts ||= {}
state = @states_by_ts[ts]
if state.nil?
# TODO: vim_performance_states.loaded? works only when doing resource.vim_performance_states.all, not when loading
# a subset based on available timestamps
if vim_performance_states.loaded?
# Look for requested time in cache
t = ts.to_time(:utc)
Expand All @@ -18,6 +20,7 @@ def vim_performance_state_for_ts(ts)
else
state = vim_performance_states.find_by(:timestamp => ts)
end
# TODO: index perf_capture_state to avoid fetching it everytime for a missing ts and hour
state ||= perf_capture_state
@states_by_ts[ts] = state
end
Expand All @@ -29,6 +32,10 @@ def preload_vim_performance_state_for_ts(conditions = {})
@states_by_ts = vim_performance_states.where(conditions).index_by(&:timestamp)
end

def preload_vim_performance_state_for_ts_iso8601(conditions = {})
@states_by_ts = vim_performance_states.where(conditions).index_by { |x| x.timestamp.utc.iso8601 }
end

def hosts_from_vim_performance_state_for_ts(ts)
vim_performance_state_for_ts(ts).hosts
end
Expand Down
Loading