Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mutex to metric #1644

Merged
1 commit merged into from
May 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 32 additions & 26 deletions cosmos/lib/cosmos/utilities/metric.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# copyright holder

require 'cosmos/models/metric_model'
require 'thread'

module Cosmos
class Metric
Expand Down Expand Up @@ -48,6 +49,7 @@ def initialize(microservice:, scope:)
@scope = scope
@microservice = microservice
@size = 5000
@mutex = Mutex.new
end

def add_sample(name:, value:, labels:)
Expand All @@ -69,15 +71,17 @@ def add_sample(name:, value:, labels:)
# the value is added to @items and the count of the value is increased
# if the count of the values exceed the size of the array it sets the
# count back to zero and the array will over write older data.
key = "#{name}|" + labels.map { |k, v| "#{k}=#{v}" }.join(',')
if not @items.has_key?(key)
Logger.debug("new data for #{@scope}, #{key}")
@items[key] = { 'values' => Array.new(@size), 'count' => 0 }
@mutex.synchronize do
key = "#{name}|" + labels.map { |k, v| "#{k}=#{v}" }.join(',')
if not @items.has_key?(key)
Logger.debug("new data for #{@scope}, #{key}")
@items[key] = { 'values' => Array.new(@size), 'count' => 0 }
end
count = @items[key]['count']
# Logger.info("adding data for #{@scope}, #{count} #{key}, #{value}")
@items[key]['values'][count] = value
@items[key]['count'] = count + 1 >= @size ? 0 : count + 1
end
count = @items[key]['count']
# Logger.info("adding data for #{@scope}, #{count} #{key}, #{value}")
@items[key]['values'][count] = value
@items[key]['count'] = count + 1 >= @size ? 0 : count + 1
end

def percentile(sorted_values, percentile)
Expand Down Expand Up @@ -106,24 +110,26 @@ def output
# array. to store the array as the value with the metric name again joined
# with the @microservice and @scope.
Logger.debug("#{@microservice} #{@scope} sending metrics to redis, #{@items.length}") if @items.length > 0
@items.each do |key, values|
label_list = []
name, labels = key.split('|')
metric_labels = labels.nil? ? {} : labels.split(',').map { |x| x.split('=') }.map { |k, v| { k => v } }.reduce({}, :merge)
sorted_values = values['values'].compact.sort
for percentile_value in [10, 50, 90, 95, 99]
percentile_result = percentile(sorted_values, percentile_value)
labels = metric_labels.clone.merge({ 'scope' => @scope, 'microservice' => @microservice })
labels['percentile'] = percentile_value
labels['metric__value'] = percentile_result
label_list.append(labels)
end
begin
Logger.debug("sending metrics summary to redis key: #{@microservice}")
metric = MetricModel.new(name: @microservice, scope: @scope, metric_name: name, label_list: label_list)
metric.create(force: true)
rescue RuntimeError
Logger.error("failed attempt to update metric, #{key}, #{name} #{@scope}")
@mutex.synchronize do
@items.each do |key, values|
label_list = []
name, labels = key.split('|')
metric_labels = labels.nil? ? {} : labels.split(',').map { |x| x.split('=') }.map { |k, v| { k => v } }.reduce({}, :merge)
sorted_values = values['values'].compact.sort
for percentile_value in [10, 50, 90, 95, 99]
percentile_result = percentile(sorted_values, percentile_value)
labels = metric_labels.clone.merge({ 'scope' => @scope, 'microservice' => @microservice })
labels['percentile'] = percentile_value
labels['metric__value'] = percentile_result
label_list.append(labels)
end
begin
Logger.debug("sending metrics summary to redis key: #{@microservice}")
metric = MetricModel.new(name: @microservice, scope: @scope, metric_name: name, label_list: label_list)
metric.create(force: true)
rescue RuntimeError
Logger.error("failed attempt to update metric, #{key}, #{name} #{@scope}")
end
end
end
end
Expand Down