diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index cba51ff349f2..c6ad13169c6b 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -52790,10 +52790,40 @@ Openmetrics module +*`openmetrics.help`*:: ++ +-- +Brief description of the MetricFamily + + +type: keyword + +-- + +*`openmetrics.type`*:: ++ +-- +Metric type + + +type: keyword + +-- + +*`openmetrics.unit`*:: ++ +-- +Metric unit + + +type: keyword + +-- + *`openmetrics.labels.*`*:: + -- -Prometheus metric labels +Openmetrics metric labels type: object @@ -52803,7 +52833,27 @@ type: object *`openmetrics.metrics.*`*:: + -- -Prometheus metric +Openmetrics metric + + +type: object + +-- + +*`openmetrics.exemplar.*`*:: ++ +-- +Openmetrics exemplars + + +type: object + +-- + +*`openmetrics.exemplar.labels.*`*:: ++ +-- +Openmetrics metric exemplar labels type: object diff --git a/metricbeat/docs/modules/openmetrics/collector.asciidoc b/metricbeat/docs/modules/openmetrics/collector.asciidoc index 06000e772915..2b184916aa92 100644 --- a/metricbeat/docs/modules/openmetrics/collector.asciidoc +++ b/metricbeat/docs/modules/openmetrics/collector.asciidoc @@ -9,6 +9,7 @@ beta[] include::../../../module/openmetrics/collector/_meta/docs.asciidoc[] +This is a default metricset. If the host module is unconfigured, this metricset is enabled by default. ==== Fields diff --git a/metricbeat/helper/openmetrics/label.go b/metricbeat/helper/openmetrics/label.go new file mode 100644 index 000000000000..0f0a69054dde --- /dev/null +++ b/metricbeat/helper/openmetrics/label.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package openmetrics + +// LabelMap defines the mapping from OpenMetrics label to a Metricbeat field +type LabelMap interface { + // GetField returns the resulting field name + GetField() string + + // IsKey returns true if the label is a key label + IsKey() bool +} + +// Label maps a OpenMetrics label to a Metricbeat field +func Label(field string) LabelMap { + return &commonLabel{ + field: field, + key: false, + } +} + +// KeyLabel maps a OpenMetrics label to a Metricbeat field. The label is flagged as key. +// Metrics with the same tuple of key labels will be grouped in the same event. +func KeyLabel(field string) LabelMap { + return &commonLabel{ + field: field, + key: true, + } +} + +type commonLabel struct { + field string + key bool +} + +// GetField returns the resulting field name +func (l *commonLabel) GetField() string { + return l.field +} + +// IsKey returns true if the label is a key label +func (l *commonLabel) IsKey() bool { + return l.key +} diff --git a/metricbeat/helper/openmetrics/metric.go b/metricbeat/helper/openmetrics/metric.go new file mode 100644 index 000000000000..4907ab59c9eb --- /dev/null +++ b/metricbeat/helper/openmetrics/metric.go @@ -0,0 +1,495 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package openmetrics + +import ( + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// MetricMap defines the mapping from Openmetrics metric to a Metricbeat field +type MetricMap interface { + // GetOptions returns the list of metric options + GetOptions() []MetricOption + + // GetField returns the resulting field name + GetField() string + + // GetValue returns the resulting value + GetValue(m *OpenMetric) interface{} + GetNilValue() interface{} + + // GetConfiguration returns the configuration for the metric + GetConfiguration() Configuration +} + +// Configuration for mappings that needs extended treatment +type Configuration struct { + // StoreNonMappedLabels indicates if labels found at the metric that are + // not found at the label map should be part of the resulting event. + // This setting should be used when the label name is not known beforehand + StoreNonMappedLabels bool + // NonMappedLabelsPlacement is used when StoreNonMappedLabels is set to true, and + // defines the key path at the event under which to store the dynamically found labels. + // This key path will be added to the events that match this metric along with a subset of + // key/value pairs will be created under it, one for each non mapped label found. + // + // Example: + // + // given a metric family in a Openmetrics resource in the form: + // metric1{label1="value1",label2="value2"} 1 + // and not mapping labels but using this entry on a the MetricMap definition: + // "metric1": ExtendedInfoMetric(Configuration{StoreNonMappedLabels: true, NonMappedLabelsPlacement: "mypath"}), + // would output an event that contains a metricset field as follows + // "mypath": {"label1":"value1","label2":"value2"} + // + NonMappedLabelsPlacement string + // MetricProcessing options are a set of functions that will be + // applied to metrics after they are retrieved + MetricProcessingOptions []MetricOption + // ExtraFields is used to add fields to the + // event where this metric is included + ExtraFields common.MapStr +} + +// MetricOption adds settings to Metric objects behavior +type MetricOption interface { + // Process a tuple of field, value and labels from a metric, return the same tuple updated + Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) +} + +// OpFilterMap only processes metrics matching the given filter +func OpFilterMap(label string, filterMap map[string]string) MetricOption { + return opFilterMap{ + label: label, + filterMap: filterMap, + } +} + +// OpLowercaseValue lowercases the value if it's a string +func OpLowercaseValue() MetricOption { + return opLowercaseValue{} +} + +// OpUnixTimestampValue parses a value into a Unix timestamp +func OpUnixTimestampValue() MetricOption { + return opUnixTimestampValue{} +} + +// OpMultiplyBuckets multiplies bucket labels in histograms, useful to change units +func OpMultiplyBuckets(multiplier float64) MetricOption { + return opMultiplyBuckets{ + multiplier: multiplier, + } +} + +// OpSetSuffix extends the field's name with the given suffix if the value of the metric +// is numeric (and not histogram or quantile), otherwise does nothing +func OpSetNumericMetricSuffix(suffix string) MetricOption { + return opSetNumericMetricSuffix{ + suffix: suffix, + } +} + +// Metric directly maps a Openmetrics metric to a Metricbeat field +func Metric(field string, options ...MetricOption) MetricMap { + return &commonMetric{ + field: field, + config: Configuration{MetricProcessingOptions: options}, + } +} + +// KeywordMetric maps a Openmetrics metric to a Metricbeat field, stores the +// given keyword when source metric value is 1 +func KeywordMetric(field, keyword string, options ...MetricOption) MetricMap { + return &keywordMetric{ + commonMetric{ + field: field, + config: Configuration{MetricProcessingOptions: options}, + }, + keyword, + } +} + +// BooleanMetric maps a Openmetrics metric to a Metricbeat field of bool type +func BooleanMetric(field string, options ...MetricOption) MetricMap { + return &booleanMetric{ + commonMetric{ + field: field, + config: Configuration{MetricProcessingOptions: options}, + }, + } +} + +// LabelMetric maps a Openmetrics metric to a Metricbeat field, stores the value +// of a given label on it if the gauge value is 1 +func LabelMetric(field, label string, options ...MetricOption) MetricMap { + return &labelMetric{ + commonMetric{ + field: field, + config: Configuration{MetricProcessingOptions: options}, + }, + label, + } +} + +// InfoMetric obtains info labels from the given metric and puts them +// into events matching all the key labels present in the metric +func InfoMetric(options ...MetricOption) MetricMap { + return &infoMetric{ + commonMetric{ + config: Configuration{MetricProcessingOptions: options}, + }, + } +} + +// ExtendedInfoMetric obtains info labels from the given metric and puts them +// into events matching all the key labels present in the metric +func ExtendedInfoMetric(configuration Configuration) MetricMap { + return &infoMetric{ + commonMetric{ + config: configuration, + }, + } +} + +// ExtendedMetric is a metric item that allows extended behaviour +// through configuration +func ExtendedMetric(field string, configuration Configuration) MetricMap { + return &commonMetric{ + field: field, + config: configuration, + } +} + +type commonMetric struct { + field string + config Configuration +} + +// GetOptions returns the list of metric options +func (m *commonMetric) GetOptions() []MetricOption { + return m.config.MetricProcessingOptions +} + +// GetField returns the resulting field name +func (m *commonMetric) GetField() string { + return m.field +} + +// GetConfiguration returns the configuration for the metric +func (m *commonMetric) GetConfiguration() Configuration { + return m.config +} +func (m *commonMetric) GetNilValue() interface{} { + return nil +} + +// GetValue returns the resulting value +func (m *commonMetric) GetValue(metric *OpenMetric) interface{} { + info := metric.GetInfo() + if info != nil { + if info.HasValidValue() { + return info.GetValue() + } + } + + stateset := metric.GetStateset() + if stateset != nil { + if stateset.HasValidValue() { + return stateset.GetValue() + } + } + + unknown := metric.GetUnknown() + if unknown != nil { + if !math.IsNaN(unknown.GetValue()) && !math.IsInf(unknown.GetValue(), 0) { + return int64(unknown.GetValue()) + } + } + + counter := metric.GetCounter() + if counter != nil { + if !math.IsNaN(counter.GetValue()) && !math.IsInf(counter.GetValue(), 0) { + return int64(counter.GetValue()) + } + } + + gauge := metric.GetGauge() + if gauge != nil { + if !math.IsNaN(gauge.GetValue()) && !math.IsInf(gauge.GetValue(), 0) { + return gauge.GetValue() + } + } + + summary := metric.GetSummary() + if summary != nil { + value := common.MapStr{} + if !math.IsNaN(summary.GetSampleSum()) && !math.IsInf(summary.GetSampleSum(), 0) { + value["sum"] = summary.GetSampleSum() + value["count"] = summary.GetSampleCount() + } + + quantiles := summary.GetQuantile() + percentileMap := common.MapStr{} + for _, quantile := range quantiles { + if !math.IsNaN(quantile.GetValue()) && !math.IsInf(quantile.GetValue(), 0) { + key := strconv.FormatFloat(100*quantile.GetQuantile(), 'f', -1, 64) + percentileMap[key] = quantile.GetValue() + } + } + + if len(percentileMap) != 0 { + value["percentile"] = percentileMap + } + + return value + } + + histogram := metric.GetHistogram() + if histogram != nil { + value := common.MapStr{} + if !math.IsNaN(histogram.GetSampleSum()) && !math.IsInf(histogram.GetSampleSum(), 0) { + value["sum"] = histogram.GetSampleSum() + value["count"] = histogram.GetSampleCount() + } + + buckets := histogram.GetBucket() + bucketMap := common.MapStr{} + for _, bucket := range buckets { + if bucket.GetCumulativeCount() != uint64(math.NaN()) && bucket.GetCumulativeCount() != uint64(math.Inf(0)) { + key := strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64) + bucketMap[key] = bucket.GetCumulativeCount() + } + } + + if len(bucketMap) != 0 { + value["bucket"] = bucketMap + } + + return value + } + + gaugehistogram := metric.GetGaugeHistogram() + if gaugehistogram != nil { + value := common.MapStr{} + if !math.IsNaN(gaugehistogram.GetSampleSum()) && !math.IsInf(gaugehistogram.GetSampleSum(), 0) { + value["gsum"] = gaugehistogram.GetSampleSum() + value["gcount"] = gaugehistogram.GetSampleCount() + } + + buckets := gaugehistogram.GetBucket() + bucketMap := common.MapStr{} + for _, bucket := range buckets { + if bucket.GetCumulativeCount() != uint64(math.NaN()) && bucket.GetCumulativeCount() != uint64(math.Inf(0)) { + key := strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64) + bucketMap[key] = bucket.GetCumulativeCount() + } + } + + if len(bucketMap) != 0 { + value["bucket"] = bucketMap + } + + return value + } + + // Other types are not supported here + return nil +} + +type keywordMetric struct { + commonMetric + keyword string +} + +// GetValue returns the resulting value +func (m *keywordMetric) GetValue(metric *OpenMetric) interface{} { + if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { + return m.keyword + } + return nil +} + +type booleanMetric struct { + commonMetric +} + +// GetValue returns the resulting value +func (m *booleanMetric) GetValue(metric *OpenMetric) interface{} { + if gauge := metric.GetGauge(); gauge != nil { + return gauge.GetValue() == 1 + } + return nil +} + +type labelMetric struct { + commonMetric + label string +} + +// GetValue returns the resulting value +func (m *labelMetric) GetValue(metric *OpenMetric) interface{} { + if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 { + return getLabel(metric, m.label) + } + return nil +} + +func getLabel(metric *OpenMetric, name string) string { + for _, label := range metric.GetLabel() { + if label.Name == name { + return label.Value + } + } + return "" +} + +type infoMetric struct { + commonMetric +} + +// GetValue returns the resulting value +func (m *infoMetric) GetValue(metric *OpenMetric) interface{} { + return "" +} + +// GetField returns the resulting field name +func (m *infoMetric) GetField() string { + return "" +} + +type opFilterMap struct { + label string + filterMap map[string]string +} + +// Called by the Openmetrics helper to apply extra options on retrieved metrics +// Check whether the value of the specified label is allowed and, if yes, return the metric via the specified mapped field +// Else, if the specified label does not match the filter, return nil +// This is useful in cases where multiple Metricbeat fields need to be defined per Openmetrics metric, based on label values +func (o opFilterMap) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + for k, v := range o.filterMap { + if labels[o.label] == k { + return fmt.Sprintf("%v.%v", field, v), value, labels + } + } + return "", nil, nil +} + +type opLowercaseValue struct{} + +// Process will lowercase the given value if it's a string +func (o opLowercaseValue) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + if val, ok := value.(string); ok { + value = strings.ToLower(val) + } + return field, value, labels +} + +type opMultiplyBuckets struct { + multiplier float64 +} + +// Process will multiply the bucket labels if it is an histogram with numeric labels +func (o opMultiplyBuckets) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + histogram, ok := value.(common.MapStr) + if !ok { + return field, value, labels + } + bucket, ok := histogram["bucket"].(common.MapStr) + if !ok { + return field, value, labels + } + sum, ok := histogram["sum"].(float64) + if !ok { + return field, value, labels + } + multiplied := common.MapStr{} + for k, v := range bucket { + if f, err := strconv.ParseFloat(k, 64); err == nil { + key := strconv.FormatFloat(f*o.multiplier, 'f', -1, 64) + multiplied[key] = v + } else { + multiplied[k] = v + } + } + histogram["bucket"] = multiplied + histogram["sum"] = sum * o.multiplier + return field, histogram, labels +} + +type opSetNumericMetricSuffix struct { + suffix string +} + +// Process will extend the field's name with the given suffix +func (o opSetNumericMetricSuffix) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + _, ok := value.(float64) + if !ok { + return field, value, labels + } + field = fmt.Sprintf("%v.%v", field, o.suffix) + return field, value, labels +} + +type opUnixTimestampValue struct { +} + +// Process converts a value in seconds into an unix time +func (o opUnixTimestampValue) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + return field, common.Time(time.Unix(int64(value.(float64)), 0)), labels +} + +// OpLabelKeyPrefixRemover removes prefix from label keys +func OpLabelKeyPrefixRemover(prefix string) MetricOption { + return opLabelKeyPrefixRemover{prefix} +} + +// opLabelKeyPrefixRemover is a metric option processor that removes a prefix from the key of a label set +type opLabelKeyPrefixRemover struct { + Prefix string +} + +// Process modifies the labels map, removing a prefix when found at keys of the labels set. +// For each label, if the key is found a new key will be created hosting the same value and the +// old key will be deleted. +// Fields, values and not prefixed labels will remain unmodified. +func (o opLabelKeyPrefixRemover) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) { + renameKeys := []string{} + for k := range labels { + if len(k) < len(o.Prefix) { + continue + } + if k[:6] == o.Prefix { + renameKeys = append(renameKeys, k) + } + } + + for i := range renameKeys { + v := labels[renameKeys[i]] + delete(labels, renameKeys[i]) + labels[renameKeys[i][len(o.Prefix):]] = v + } + return "", value, labels +} diff --git a/metricbeat/helper/openmetrics/module.go b/metricbeat/helper/openmetrics/module.go new file mode 100644 index 000000000000..fac374ee1b42 --- /dev/null +++ b/metricbeat/helper/openmetrics/module.go @@ -0,0 +1,61 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package openmetrics + +import ( + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" +) + +const ( + defaultScheme = "http" + defaultPath = "/metrics" +) + +var ( + // HostParser validates OpenMetrics URLs + HostParser = parse.URLHostParserBuilder{ + DefaultScheme: defaultScheme, + DefaultPath: defaultPath, + }.Build() +) + +// MetricSetBuilder returns a builder function for a new OpenMetrics metricset using the given mapping +func MetricSetBuilder(mapping *MetricsMapping) func(base mb.BaseMetricSet) (mb.MetricSet, error) { + return func(base mb.BaseMetricSet) (mb.MetricSet, error) { + openmetrics, err := NewOpenMetricsClient(base) + if err != nil { + return nil, err + } + return &openmetricsMetricSet{ + BaseMetricSet: base, + openmetrics: openmetrics, + mapping: mapping, + }, nil + } +} + +type openmetricsMetricSet struct { + mb.BaseMetricSet + openmetrics OpenMetrics + mapping *MetricsMapping +} + +func (m *openmetricsMetricSet) Fetch(r mb.ReporterV2) error { + return m.openmetrics.ReportProcessedMetrics(m.mapping, r) +} diff --git a/metricbeat/helper/openmetrics/openmetrics.go b/metricbeat/helper/openmetrics/openmetrics.go new file mode 100644 index 000000000000..9e4abc6428b3 --- /dev/null +++ b/metricbeat/helper/openmetrics/openmetrics.go @@ -0,0 +1,1000 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package openmetrics + +import ( + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "math" + "mime" + "net/http" + "regexp" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/pkg/timestamp" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/helper" + "github.com/elastic/beats/v7/metricbeat/mb" +) + +const acceptHeader = `application/openmetrics-text; version=1.0.0; charset=utf-8,text/plain` + +var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName) + +type Gauge struct { + Value *float64 +} + +func (m *Gauge) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Info struct { + Value *int64 +} + +func (m *Info) GetValue() int64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} +func (m *Info) HasValidValue() bool { + return m != nil && *m.Value == 1 +} + +type Stateset struct { + Value *int64 +} + +func (m *Stateset) GetValue() int64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} +func (m *Stateset) HasValidValue() bool { + return m != nil && (*m.Value == 0 || *m.Value == 1) +} + +type Counter struct { + Value *float64 +} + +func (m *Counter) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Quantile struct { + Quantile *float64 + Value *float64 + Exemplar *exemplar.Exemplar +} + +func (m *Quantile) GetQuantile() float64 { + if m != nil && m.Quantile != nil { + return *m.Quantile + } + return 0 +} + +func (m *Quantile) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Summary struct { + SampleCount *uint64 + SampleSum *float64 + Quantile []*Quantile +} + +func (m *Summary) GetSampleCount() uint64 { + if m != nil && m.SampleCount != nil { + return *m.SampleCount + } + return 0 +} + +func (m *Summary) GetSampleSum() float64 { + if m != nil && m.SampleSum != nil { + return *m.SampleSum + } + return 0 +} + +func (m *Summary) GetQuantile() []*Quantile { + if m != nil { + return m.Quantile + } + return nil +} + +type Unknown struct { + Value *float64 +} + +func (m *Unknown) GetValue() float64 { + if m != nil && m.Value != nil { + return *m.Value + } + return 0 +} + +type Bucket struct { + CumulativeCount *uint64 + UpperBound *float64 + Exemplar *exemplar.Exemplar +} + +func (m *Bucket) GetCumulativeCount() uint64 { + if m != nil && m.CumulativeCount != nil { + return *m.CumulativeCount + } + return 0 +} + +func (m *Bucket) GetUpperBound() float64 { + if m != nil && m.UpperBound != nil { + return *m.UpperBound + } + return 0 +} + +type Histogram struct { + SampleCount *uint64 + SampleSum *float64 + Bucket []*Bucket + IsGaugeHistogram bool +} + +func (m *Histogram) GetSampleCount() uint64 { + if m != nil && m.SampleCount != nil { + return *m.SampleCount + } + return 0 +} + +func (m *Histogram) GetSampleSum() float64 { + if m != nil && m.SampleSum != nil { + return *m.SampleSum + } + return 0 +} + +func (m *Histogram) GetBucket() []*Bucket { + if m != nil { + return m.Bucket + } + return nil +} + +type OpenMetric struct { + Label []*labels.Label + Exemplar *exemplar.Exemplar + Name *string + Gauge *Gauge + Counter *Counter + Info *Info + Stateset *Stateset + Summary *Summary + Unknown *Unknown + Histogram *Histogram + TimestampMs *int64 +} + +func (m *OpenMetric) GetName() *string { + if m != nil { + return m.Name + } + return nil +} + +func (m *OpenMetric) GetLabel() []*labels.Label { + if m != nil { + return m.Label + } + return nil +} + +func (m *OpenMetric) GetGauge() *Gauge { + if m != nil { + return m.Gauge + } + return nil +} + +func (m *OpenMetric) GetCounter() *Counter { + if m != nil { + return m.Counter + } + return nil +} + +func (m *OpenMetric) GetInfo() *Info { + if m != nil { + return m.Info + } + return nil +} + +func (m *OpenMetric) GetStateset() *Stateset { + if m != nil { + return m.Stateset + } + return nil +} + +func (m *OpenMetric) GetSummary() *Summary { + if m != nil { + return m.Summary + } + return nil +} + +func (m *OpenMetric) GetUnknown() *Unknown { + if m != nil { + return m.Unknown + } + return nil +} + +func (m *OpenMetric) GetHistogram() *Histogram { + if m != nil && m.Histogram != nil && !m.Histogram.IsGaugeHistogram { + return m.Histogram + } + return nil +} + +func (m *OpenMetric) GetGaugeHistogram() *Histogram { + if m != nil && m.Histogram != nil && m.Histogram.IsGaugeHistogram { + return m.Histogram + } + return nil +} + +func (m *OpenMetric) GetTimestampMs() int64 { + if m != nil && m.TimestampMs != nil { + return *m.TimestampMs + } + return 0 +} + +type OpenMetricFamily struct { + Name *string + Help *string + Type textparse.MetricType + Unit *string + Metric []*OpenMetric +} + +func (m *OpenMetricFamily) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} +func (m *OpenMetricFamily) GetUnit() string { + if m != nil && *m.Unit != "" { + return *m.Unit + } + return "" +} + +func (m *OpenMetricFamily) GetMetric() []*OpenMetric { + if m != nil { + return m.Metric + } + return nil +} + +// OpenMetrics helper retrieves openmetrics formatted metrics +// This interface needs to use TextParse +type OpenMetrics interface { + // GetFamilies requests metric families from openmetrics endpoint and returns them + GetFamilies() ([]*OpenMetricFamily, error) + + GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) + + ProcessMetrics(families []*OpenMetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) + + ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error +} + +type openmetrics struct { + httpfetcher + logger *logp.Logger +} + +type httpfetcher interface { + FetchResponse() (*http.Response, error) +} + +// NewOpenMetricsClient creates new openmetrics helper +func NewOpenMetricsClient(base mb.BaseMetricSet) (OpenMetrics, error) { + httpclient, err := helper.NewHTTP(base) + if err != nil { + return nil, err + } + + httpclient.SetHeaderDefault("Accept", acceptHeader) + httpclient.SetHeaderDefault("Accept-Encoding", "gzip") + return &openmetrics{httpclient, base.Logger()}, nil +} + +// GetFamilies requests metric families from openmetrics endpoint and returns them +func (p *openmetrics) GetFamilies() ([]*OpenMetricFamily, error) { + var reader io.Reader + + resp, err := p.FetchResponse() + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.Header.Get("Content-Encoding") == "gzip" { + greader, err := gzip.NewReader(resp.Body) + if err != nil { + return nil, err + } + defer greader.Close() + reader = greader + } else { + reader = resp.Body + } + + if resp.StatusCode > 399 { + bodyBytes, err := ioutil.ReadAll(reader) + if err == nil { + p.logger.Debug("error received from openmetrics endpoint: ", string(bodyBytes)) + } + return nil, fmt.Errorf("unexpected status code %d from server", resp.StatusCode) + } + + contentType := getContentType(resp.Header) + if contentType == "" { + return nil, fmt.Errorf("Invalid format for response of response") + } + + appendTime := time.Now().Round(0) + b, err := ioutil.ReadAll(reader) + families, err := parseMetricFamilies(b, contentType, appendTime) + + return families, nil +} + +const ( + suffixInfo = "_info" + suffixTotal = "_total" + suffixGCount = "_gcount" + suffixGSum = "_gsum" + suffixCount = "_count" + suffixSum = "_sum" + suffixBucket = "_bucket" +) + +func isInfo(name string) bool { + return len(name) > 5 && name[len(name)-5:] == suffixInfo +} + +// Counters have _total suffix +func isTotal(name string) bool { + return len(name) > 6 && name[len(name)-6:] == suffixTotal +} + +func isGCount(name string) bool { + return len(name) > 7 && name[len(name)-7:] == suffixGCount +} + +func isGSum(name string) bool { + return len(name) > 5 && name[len(name)-5:] == suffixGSum +} + +func isCount(name string) bool { + return len(name) > 6 && name[len(name)-6:] == suffixCount +} + +func isSum(name string) bool { + return len(name) > 4 && name[len(name)-4:] == suffixSum +} + +func isBucket(name string) bool { + return len(name) > 7 && name[len(name)-7:] == suffixBucket +} + +func summaryMetricName(name string, s float64, qv string, lbls string, t *int64, summariesByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { + var summary = &Summary{} + var quantile = []*Quantile{} + var quant = &Quantile{} + + switch { + case isCount(name): + u := uint64(s) + summary.SampleCount = &u + name = name[:len(name)-6] + case isSum(name): + summary.SampleSum = &s + name = name[:len(name)-4] + default: + f, err := strconv.ParseFloat(qv, 64) + if err != nil { + f = -1 + } + quant.Quantile = &f + quant.Value = &s + } + + _, k := summariesByName[name] + if !k { + summariesByName[name] = make(map[string]*OpenMetric) + } + metric, ok := summariesByName[name][lbls] + if !ok { + metric = &OpenMetric{} + metric.Name = &name + metric.Summary = summary + metric.Summary.Quantile = quantile + summariesByName[name][lbls] = metric + } + if metric.Summary.SampleSum == nil && summary.SampleSum != nil { + metric.Summary.SampleSum = summary.SampleSum + } else if metric.Summary.SampleCount == nil && summary.SampleCount != nil { + metric.Summary.SampleCount = summary.SampleCount + } else if quant.Quantile != nil { + metric.Summary.Quantile = append(metric.Summary.Quantile, quant) + } + + return name, metric +} + +func histogramMetricName(name string, s float64, qv string, lbls string, t *int64, isGaugeHistogram bool, e *exemplar.Exemplar, histogramsByName map[string]map[string]*OpenMetric) (string, *OpenMetric) { + var histogram = &Histogram{} + var bucket = []*Bucket{} + var bkt = &Bucket{} + + switch { + case isCount(name): + u := uint64(s) + histogram.SampleCount = &u + name = name[:len(name)-6] + case isSum(name): + histogram.SampleSum = &s + name = name[:len(name)-4] + case isGaugeHistogram && isGCount(name): + u := uint64(s) + histogram.SampleCount = &u + name = name[:len(name)-7] + case isGaugeHistogram && isGSum(name): + histogram.SampleSum = &s + name = name[:len(name)-5] + default: + if isBucket(name) { + name = name[:len(name)-7] + } + f, err := strconv.ParseFloat(qv, 64) + if err != nil { + f = math.MaxUint64 + } + cnt := uint64(s) + bkt.UpperBound = &f + bkt.CumulativeCount = &cnt + + if e != nil { + if !e.HasTs { + e.Ts = *t + } + bkt.Exemplar = e + } + } + + _, k := histogramsByName[name] + if !k { + histogramsByName[name] = make(map[string]*OpenMetric) + } + metric, ok := histogramsByName[name][lbls] + if !ok { + metric = &OpenMetric{} + metric.Name = &name + metric.Histogram = histogram + metric.Histogram.Bucket = bucket + histogramsByName[name][lbls] = metric + } + if metric.Histogram.SampleSum == nil && histogram.SampleSum != nil { + metric.Histogram.SampleSum = histogram.SampleSum + } else if metric.Histogram.SampleCount == nil && histogram.SampleCount != nil { + metric.Histogram.SampleCount = histogram.SampleCount + } else if bkt.UpperBound != nil { + metric.Histogram.Bucket = append(metric.Histogram.Bucket, bkt) + } + + return name, metric +} + +func parseMetricFamilies(b []byte, contentType string, ts time.Time) ([]*OpenMetricFamily, error) { + var ( + parser = textparse.New(b, contentType) + defTime = timestamp.FromTime(ts) + metricFamiliesByName = map[string]*OpenMetricFamily{} + summariesByName = map[string]map[string]*OpenMetric{} + histogramsByName = map[string]map[string]*OpenMetric{} + fam *OpenMetricFamily + mt = textparse.MetricTypeUnknown + ) + var err error + +loop: + for { + var ( + et textparse.Entry + ok bool + e exemplar.Exemplar + ) + if et, err = parser.Next(); err != nil { + if err == io.EOF { + err = nil + } + break + } + switch et { + case textparse.EntryType: + buf, t := parser.Type() + s := string(buf) + fam, ok = metricFamiliesByName[s] + if !ok { + fam = &OpenMetricFamily{Name: &s, Type: t} + metricFamiliesByName[s] = fam + } + mt = t + continue + case textparse.EntryHelp: + buf, t := parser.Help() + s := string(buf) + h := string(t) + fam, ok = metricFamiliesByName[s] + if !ok { + fam = &OpenMetricFamily{Name: &s, Help: &h, Type: textparse.MetricTypeUnknown} + metricFamiliesByName[s] = fam + } + fam.Help = &h + continue + case textparse.EntryUnit: + buf, t := parser.Unit() + s := string(buf) + u := string(t) + fam, ok = metricFamiliesByName[s] + if !ok { + fam = &OpenMetricFamily{Name: &s, Unit: &u, Type: textparse.MetricTypeUnknown} + metricFamiliesByName[string(buf)] = fam + } + fam.Unit = &u + continue + case textparse.EntryComment: + continue + default: + } + + t := defTime + _, tp, v := parser.Series() + + var ( + lset labels.Labels + mets string + ) + + mets = parser.Metric(&lset) + + if !lset.Has(labels.MetricName) { + err = errNameLabelMandatory + break loop + } + + var lbls strings.Builder + lbls.Grow(len(mets)) + var labelPairs = []*labels.Label{} + for _, l := range lset.Copy() { + if l.Name == labels.MetricName { + continue + } + + if l.Name != model.QuantileLabel && l.Name != labels.BucketLabel { // quantile and le are special labels handled below + + lbls.WriteString(l.Name) + lbls.WriteString(l.Value) + } + n := l.Name + v := l.Value + + labelPairs = append(labelPairs, &labels.Label{ + Name: n, + Value: v, + }) + } + + var metric *OpenMetric + + metricName := lset.Get(labels.MetricName) + var lookupMetricName string + var exm *exemplar.Exemplar + + // Suffixes - https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#suffixes + switch mt { + case textparse.MetricTypeCounter: + var counter = &Counter{Value: &v} + mn := lset.Get(labels.MetricName) + metric = &OpenMetric{Name: &mn, Counter: counter, Label: labelPairs} + if isTotal(metricName) { // Remove suffix _total, get lookup metricname + lookupMetricName = metricName[:len(metricName)-6] + } + break + case textparse.MetricTypeGauge: + var gauge = &Gauge{Value: &v} + metric = &OpenMetric{Name: &metricName, Gauge: gauge, Label: labelPairs} + lookupMetricName = metricName + break + case textparse.MetricTypeInfo: + value := int64(v) + var info = &Info{Value: &value} + metric = &OpenMetric{Name: &metricName, Info: info, Label: labelPairs} + lookupMetricName = metricName + break + case textparse.MetricTypeSummary: + lookupMetricName, metric = summaryMetricName(metricName, v, lset.Get(model.QuantileLabel), lbls.String(), &t, summariesByName) + metric.Label = labelPairs + if !isSum(metricName) { + continue + } + metricName = lookupMetricName + break + case textparse.MetricTypeHistogram: + if hasExemplar := parser.Exemplar(&e); hasExemplar { + exm = &e + } + lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, false, exm, histogramsByName) + metric.Label = labelPairs + if !isSum(metricName) { + continue + } + metricName = lookupMetricName + break + case textparse.MetricTypeGaugeHistogram: + if hasExemplar := parser.Exemplar(&e); hasExemplar { + exm = &e + } + lookupMetricName, metric = histogramMetricName(metricName, v, lset.Get(labels.BucketLabel), lbls.String(), &t, true, exm, histogramsByName) + metric.Label = labelPairs + metric.Histogram.IsGaugeHistogram = true + if !isGSum(metricName) { + continue + } + metricName = lookupMetricName + break + case textparse.MetricTypeStateset: + value := int64(v) + var stateset = &Stateset{Value: &value} + metric = &OpenMetric{Name: &metricName, Stateset: stateset, Label: labelPairs} + lookupMetricName = metricName + break + case textparse.MetricTypeUnknown: + var unknown = &Unknown{Value: &v} + metric = &OpenMetric{Name: &metricName, Unknown: unknown, Label: labelPairs} + lookupMetricName = metricName + break + default: + lookupMetricName = metricName + } + + fam, ok = metricFamiliesByName[lookupMetricName] + if !ok { + fam = &OpenMetricFamily{Type: mt} + metricFamiliesByName[lookupMetricName] = fam + } + + fam.Name = &metricName + + if hasExemplar := parser.Exemplar(&e); hasExemplar && mt != textparse.MetricTypeHistogram && metric != nil { + if !e.HasTs { + e.Ts = t + } + metric.Exemplar = &e + } + + if tp != nil && metric != nil { + t = *tp + metric.TimestampMs = &t + } + + fam.Metric = append(fam.Metric, metric) + } + + families := make([]*OpenMetricFamily, 0, len(metricFamiliesByName)) + for _, v := range metricFamiliesByName { + if v.Metric != nil { + families = append(families, v) + } + } + return families, nil +} + +// MetricsMapping defines mapping settings for OpenMetrics metrics, to be used with `GetProcessedMetrics` +type MetricsMapping struct { + // Metrics translates from openmetrics metric name to Metricbeat fields + Metrics map[string]MetricMap + + // Namespace for metrics managed by this mapping + Namespace string + + // Labels translate from openmetrics label names to Metricbeat fields + Labels map[string]LabelMap + + // ExtraFields adds the given fields to all events coming from `GetProcessedMetrics` + ExtraFields map[string]string +} + +func (p *openmetrics) ProcessMetrics(families []*OpenMetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) { + + eventsMap := map[string]common.MapStr{} + infoMetrics := []*infoMetricData{} + for _, family := range families { + for _, metric := range family.GetMetric() { + m, ok := mapping.Metrics[family.GetName()] + if m == nil || !ok { + // Ignore unknown metrics + continue + } + + field := m.GetField() + value := m.GetValue(metric) + + // Ignore retrieval errors (bad conf) + if value == nil { + continue + } + + storeAllLabels := false + labelsLocation := "" + var extraFields common.MapStr + if m != nil { + c := m.GetConfiguration() + storeAllLabels = c.StoreNonMappedLabels + labelsLocation = c.NonMappedLabelsPlacement + extraFields = c.ExtraFields + } + + // Apply extra options + allLabels := getLabels(metric) + for _, option := range m.GetOptions() { + field, value, allLabels = option.Process(field, value, allLabels) + } + + // Convert labels + labels := common.MapStr{} + keyLabels := common.MapStr{} + for k, v := range allLabels { + if l, ok := mapping.Labels[k]; ok { + if l.IsKey() { + keyLabels.Put(l.GetField(), v) + } else { + labels.Put(l.GetField(), v) + } + } else if storeAllLabels { + // if label for this metric is not found at the label mappings but + // it is configured to store any labels found, make it so + labels.Put(labelsLocation+"."+k, v) + } + } + + // if extra fields have been added through metric configuration + // add them to labels. + // + // not considering these extra fields to be keylabels as that case + // have not appeared yet + for k, v := range extraFields { + labels.Put(k, v) + } + + // Keep a info document if it's an infoMetric + if _, ok = m.(*infoMetric); ok { + labels.DeepUpdate(keyLabels) + infoMetrics = append(infoMetrics, &infoMetricData{ + Labels: keyLabels, + Meta: labels, + }) + continue + } + + if field != "" { + event := getEvent(eventsMap, keyLabels) + update := common.MapStr{} + update.Put(field, value) + // value may be a mapstr (for histograms and summaries), do a deep update to avoid smashing existing fields + event.DeepUpdate(update) + + event.DeepUpdate(labels) + } + } + } + + // populate events array from values in eventsMap + events := make([]common.MapStr, 0, len(eventsMap)) + for _, event := range eventsMap { + // Add extra fields + for k, v := range mapping.ExtraFields { + event[k] = v + } + events = append(events, event) + } + + // fill info from infoMetrics + for _, info := range infoMetrics { + for _, event := range events { + found := true + for k, v := range info.Labels.Flatten() { + value, err := event.GetValue(k) + if err != nil || v != value { + found = false + break + } + } + + // fill info from this metric + if found { + event.DeepUpdate(info.Meta) + } + } + } + + return events, nil +} + +func (p *openmetrics) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) { + families, err := p.GetFamilies() + if err != nil { + return nil, err + } + return p.ProcessMetrics(families, mapping) +} + +// infoMetricData keeps data about an infoMetric +type infoMetricData struct { + Labels common.MapStr + Meta common.MapStr +} + +func (p *openmetrics) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error { + events, err := p.GetProcessedMetrics(mapping) + if err != nil { + return errors.Wrap(err, "error getting processed metrics") + } + for _, event := range events { + r.Event(mb.Event{ + MetricSetFields: event, + Namespace: mapping.Namespace, + }) + } + + return nil +} + +func getEvent(m map[string]common.MapStr, labels common.MapStr) common.MapStr { + hash := labels.String() + res, ok := m[hash] + if !ok { + res = labels + m[hash] = res + } + return res +} + +func getLabels(metric *OpenMetric) common.MapStr { + labels := common.MapStr{} + for _, label := range metric.GetLabel() { + if label.Name != "" && label.Value != "" { + labels.Put(label.Name, label.Value) + } + } + return labels +} + +// CompilePatternList compiles a pattern list and returns the list of the compiled patterns +func CompilePatternList(patterns *[]string) ([]*regexp.Regexp, error) { + var compiledPatterns []*regexp.Regexp + compiledPatterns = []*regexp.Regexp{} + if patterns != nil { + for _, pattern := range *patterns { + r, err := regexp.Compile(pattern) + if err != nil { + return nil, errors.Wrapf(err, "compiling pattern '%s'", pattern) + } + compiledPatterns = append(compiledPatterns, r) + } + return compiledPatterns, nil + } + return []*regexp.Regexp{}, nil +} + +// MatchMetricFamily checks if the given family/metric name matches any of the given patterns +func MatchMetricFamily(family string, matchMetrics []*regexp.Regexp) bool { + for _, checkMetric := range matchMetrics { + matched := checkMetric.MatchString(family) + if matched { + return true + } + } + return false +} + +const ( + TextVersion = "0.0.4" + OpenMetricsType = `application/openmetrics-text` + + // The Content-Type values for the different wire protocols. + FmtUnknown string = `` + FmtText string = `text/plain; version=` + TextVersion + `; charset=utf-8` +) + +const ( + hdrContentType = "Content-Type" +) + +func getContentType(h http.Header) string { + ct := h.Get(hdrContentType) + + mediatype, params, err := mime.ParseMediaType(ct) + if err != nil { + return FmtUnknown + } + + const textType = "text/plain" + + switch mediatype { + case OpenMetricsType: + if e, ok := params["encoding"]; ok && e != "delimited" { + return FmtUnknown + } + return OpenMetricsType + + case textType: + if v, ok := params["version"]; ok && v != TextVersion { + return FmtUnknown + } + return FmtText + } + + return FmtUnknown +} diff --git a/metricbeat/helper/openmetrics/openmetrics_test.go b/metricbeat/helper/openmetrics/openmetrics_test.go new file mode 100644 index 000000000000..5ebf1903c0f5 --- /dev/null +++ b/metricbeat/helper/openmetrics/openmetrics_test.go @@ -0,0 +1,1093 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package openmetrics + +import ( + "bytes" + "compress/gzip" + "io/ioutil" + "net/http" + "sort" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" +) + +const ( + openMetricsTestSamples = `# TYPE first_metric gauge +first_metric{label1="value1",label2="value2",label3="Value3",label4="FOO"} 1 +# TYPE second_metric gauge +second_metric{label1="value1",label3="othervalue"} 0 +# TYPE summary_metric summary +summary_metric{quantile="0.5"} 29735 +summary_metric{quantile="0.9"} 47103 +summary_metric{quantile="0.99"} 50681 +summary_metric_sum 234892394 +summary_metric_count 44000 +# TYPE histogram_metric histogram +histogram_metric_bucket{le="1000"} 1 +histogram_metric_bucket{le="10000"} 1 +histogram_metric_bucket{le="100000"} 1 +histogram_metric_bucket{le="1e+06"} 1 +histogram_metric_bucket{le="1e+08"} 1 +histogram_metric_bucket{le="1e+09"} 1 +histogram_metric_bucket{le="+Inf"} 1 +histogram_metric_sum 117 +histogram_metric_count 1 +# TYPE histogram_decimal_metric histogram +histogram_decimal_metric_bucket{le="0.001"} 1 +histogram_decimal_metric_bucket{le="0.01"} 1 +histogram_decimal_metric_bucket{le="0.1"} 2 +histogram_decimal_metric_bucket{le="1"} 3 +histogram_decimal_metric_bucket{le="+Inf"} 5 +histogram_decimal_metric_sum 4.31 +histogram_decimal_metric_count 5 +# TYPE gaugehistogram_metric gaugehistogram +gaugehistogram_metric_bucket{le="0.01"} 20.0 +gaugehistogram_metric_bucket{le="0.1"} 25.0 +gaugehistogram_metric_bucket{le="1"} 34.0 +gaugehistogram_metric_bucket{le="10"} 34.0 +gaugehistogram_metric_bucket{le="+Inf"} 42.0 +gaugehistogram_metric_gcount 42.0 +gaugehistogram_metric_gsum 3289.3 +gaugehistogram_metric_created 1520430000.123 +# TYPE target info +target_info 1 +# TYPE target_with_labels info +target_with_labels_info{env="prod",hostname="myhost"} 1 +` + + openMetricsGaugeKeyLabel = `# TYPE metrics_one_count_total gauge +metrics_one_count_total{name="jane",surname="foster"} 1 +metrics_one_count_total{name="john",surname="williams"} 2 +metrics_one_count_total{name="jahn",surname="baldwin",age="30"} 3 +` + + openMetricsGaugeKeyLabelWithNaNInf = `# TYPE metrics_one_count_errors gauge +metrics_one_count_errors{name="jane",surname="foster"} 0 +# TYPE metrics_one_count_total gauge +metrics_one_count_total{name="jane",surname="foster"} NaN +metrics_one_count_total{name="foo",surname="bar"} +Inf +metrics_one_count_total{name="john",surname="williams"} -Inf +metrics_one_count_total{name="jahn",surname="baldwin",age="30"} 3 +` + + openMetricsCounterKeyLabel = `# TYPE metrics_one_count_total counter +metrics_one_count_total{name="jane",surname="foster"} 1 +metrics_one_count_total{name="john",surname="williams"} 2 +metrics_one_count_total{name="jahn",surname="baldwin",age="30"} 3 +` + + openMetricsCounterKeyLabelWithNaNInf = `# TYPE metrics_one_count_errors counter +metrics_one_count_errors{name="jane",surname="foster"} 1 +# TYPE metrics_one_count_total counter +metrics_one_count_total{name="jane",surname="foster"} NaN +metrics_one_count_total{name="john",surname="williams"} +Inf +metrics_one_count_total{name="jahn",surname="baldwin",age="30"} 3 + +` + + openMetricsHistogramKeyLabel = `# TYPE metrics_one_midichlorians histogram +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="2000"} 52 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="4000"} 70 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="8000"} 78 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="16000"} 84 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="32000"} 86 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="+Inf"} 86 +metrics_one_midichlorians_sum{rank="youngling",alive="yes"} 1000001 +metrics_one_midichlorians_count{rank="youngling",alive="yes"} 86 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="2000"} 16 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="4000"} 20 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="8000"} 23 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="16000"} 27 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="32000"} 27 +metrics_one_midichlorians_bucket{rank="padawan",alive="yes",le="+Inf"} 28 +metrics_one_midichlorians_sum{rank="padawan",alive="yes"} 800001 +metrics_one_midichlorians_count{rank="padawan",alive="yes"} 28 +` + + openMetricsHistogramKeyLabelWithNaNInf = `# TYPE metrics_one_midichlorians histogram +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="2000"} NaN +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="4000"} +Inf +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="8000"} -Inf +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="16000"} 84 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="32000"} 86 +metrics_one_midichlorians_bucket{rank="youngling",alive="yes",le="+Inf"} 86 +metrics_one_midichlorians_sum{rank="youngling",alive="yes"} 1000001 +metrics_one_midichlorians_count{rank="youngling",alive="yes"} 86 +` + + openMetricsSummaryKeyLabel = `# TYPE metrics_force_propagation_ms summary +metrics_force_propagation_ms{kind="jedi",quantile="0"} 35 +metrics_force_propagation_ms{kind="jedi",quantile="0.25"} 22 +metrics_force_propagation_ms{kind="jedi",quantile="0.5"} 7 +metrics_force_propagation_ms{kind="jedi",quantile="0.75"} 20 +metrics_force_propagation_ms{kind="jedi",quantile="1"} 30 +metrics_force_propagation_ms_sum{kind="jedi"} 89 +metrics_force_propagation_ms_count{kind="jedi"} 651 +metrics_force_propagation_ms{kind="sith",quantile="0"} 30 +metrics_force_propagation_ms{kind="sith",quantile="0.25"} 20 +metrics_force_propagation_ms{kind="sith",quantile="0.5"} 12 +metrics_force_propagation_ms{kind="sith",quantile="0.75"} 21 +metrics_force_propagation_ms{kind="sith",quantile="1"} 29 +metrics_force_propagation_ms_sum{kind="sith"} 112 +metrics_force_propagation_ms_count{kind="sith"} 711 +` + + openMetricsSummaryKeyLabelWithNaNInf = `# TYPE metrics_force_propagation_ms summary +metrics_force_propagation_ms{kind="jedi",quantile="0"} NaN +metrics_force_propagation_ms{kind="jedi",quantile="0.25"} +Inf +metrics_force_propagation_ms{kind="jedi",quantile="0.5"} -Inf +metrics_force_propagation_ms{kind="jedi",quantile="0.75"} 20 +metrics_force_propagation_ms{kind="jedi",quantile="1"} 30 +metrics_force_propagation_ms_sum{kind="jedi"} 50 +metrics_force_propagation_ms_count{kind="jedi"} 651 +` + + openMetricsGaugeLabeled = `# TYPE metrics_that_inform_labels gauge +metrics_that_inform_labels{label1="I am 1",label2="I am 2"} 1 +metrics_that_inform_labels{label1="I am 1",label3="I am 3"} 1 +# TYPE metrics_that_use_labels gauge +metrics_that_use_labels{label1="I am 1"} 20 +` + openMetricsStateset = `# TYPE enable_category stateset +enable_category{category="shoes"} 0 +enable_category{category="collectibles"} 1 +` +) + +type mockFetcher struct { + response string +} + +var _ = httpfetcher(&mockFetcher{}) + +// FetchResponse returns an HTTP response but for the Body, which +// returns the mockFetcher.Response contents +func (m mockFetcher) FetchResponse() (*http.Response, error) { + body := bytes.NewBuffer(nil) + writer := gzip.NewWriter(body) + writer.Write([]byte(m.response)) + writer.Close() + + return &http.Response{ + StatusCode: 200, + Header: http.Header{ + "Content-Encoding": []string{"gzip"}, + "Content-Type": []string{"application/openmetrics-text"}, + }, + Body: ioutil.NopCloser(body), + }, nil +} + +func TestOpenMetrics(t *testing.T) { + + p := &openmetrics{mockFetcher{response: openMetricsTestSamples}, logp.NewLogger("test")} + + tests := []struct { + mapping *MetricsMapping + msg string + expected []common.MapStr + }{ + { + msg: "Simple field map", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": 1.0, + }, + }, + }, + }, + { + msg: "Simple field map with labels", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + "label2": Label("labels.label2"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": 1.0, + }, + "labels": common.MapStr{ + "label1": "value1", + "label2": "value2", + }, + }, + }, + }, + { + msg: "Several metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + "second_metric": Metric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label3": KeyLabel("labels.label3"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": 1.0, + }, + "labels": common.MapStr{ + "label3": "Value3", + }, + }, + common.MapStr{ + "second": common.MapStr{ + "metric": 0.0, + }, + "labels": common.MapStr{ + "label3": "othervalue", + }, + }, + }, + }, + { + msg: "Grouping by key labels", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": Metric("first.metric"), + "second_metric": Metric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + "label2": Label("labels.label2"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": 1.0, + }, + "second": common.MapStr{ + "metric": 0.0, + }, + "labels": common.MapStr{ + "label1": "value1", + "label2": "value2", + }, + }, + }, + }, + { + msg: "Keyword metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": KeywordMetric("first.metric", "works"), + "second_metric": KeywordMetric("second.metric", "itsnot"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": "works", + }, + "labels": common.MapStr{ + "label1": "value1", + }, + }, + }, + }, + { + msg: "Boolean metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": BooleanMetric("first.metric"), + "second_metric": BooleanMetric("second.metric"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": true, + }, + "second": common.MapStr{ + "metric": false, + }, + "labels": common.MapStr{ + "label1": "value1", + }, + }, + }, + }, + { + msg: "Label metrics", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label3"), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": "Value3", + }, + "labels": common.MapStr{ + "label1": "value1", + }, + }, + }, + }, + { + msg: "Label metrics, lowercase", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue()), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": "foo", + }, + "labels": common.MapStr{ + "label1": "value1", + }, + }, + }, + }, + { + msg: "Label metrics, filter", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label4", OpFilterMap( + "label1", + map[string]string{"value1": "foo"}, + )), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "first": common.MapStr{ + "metric": common.MapStr{ + "foo": "FOO", + }, + }, + "labels": common.MapStr{ + "label1": "value1", + }, + }, + }, + }, + { + msg: "Label metrics, filter", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue(), OpFilterMap( + "foo", + map[string]string{"Filtered": "filtered"}, + )), + }, + Labels: map[string]LabelMap{ + "label1": Label("labels.label1"), + }, + }, + expected: []common.MapStr{}, + }, + { + msg: "Summary metric", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "summary_metric": Metric("summary.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "summary": common.MapStr{ + "metric": common.MapStr{ + "sum": 234892394.0, + "count": uint64(44000), + "percentile": common.MapStr{ + "50": 29735.0, + "90": 47103.0, + "99": 50681.0, + }, + }, + }, + }, + }, + }, + { + msg: "Histogram metric", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "histogram_metric": Metric("histogram.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "histogram": common.MapStr{ + "metric": common.MapStr{ + "count": uint64(1), + "bucket": common.MapStr{ + "1000000000": uint64(1), + "+Inf": uint64(1), + "1000": uint64(1), + "10000": uint64(1), + "100000": uint64(1), + "1000000": uint64(1), + "100000000": uint64(1), + }, + "sum": 117.0, + }, + }, + }, + }, + }, + { + msg: "Histogram decimal metric", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "histogram_decimal_metric": Metric("histogram.metric", OpMultiplyBuckets(1000)), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "histogram": common.MapStr{ + "metric": common.MapStr{ + "count": uint64(5), + "bucket": common.MapStr{ + "1": uint64(1), + "10": uint64(1), + "100": uint64(2), + "1000": uint64(3), + "+Inf": uint64(5), + }, + "sum": 4310.0, + }, + }, + }, + }, + }, + { + msg: "Gauge histogram metric", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "gaugehistogram_metric": Metric("gaugehistogram.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "gaugehistogram": common.MapStr{ + "metric": common.MapStr{ + "gcount": uint64(42), + "bucket": common.MapStr{ + "0.01": uint64(20), + "0.1": uint64(25), + "1": uint64(34), + "10": uint64(34), + "+Inf": uint64(42), + }, + "gsum": 3289.3, + }, + }, + }, + }, + }, + { + msg: "Info metric", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "target_info": Metric("target_info.metric"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "target_info": common.MapStr{ + "metric": int64(1), + }, + }, + }, + }, + { + msg: "Info metric with labels", + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "target_with_labels_info": Metric("target_with_labels_info.metric"), + }, + Labels: map[string]LabelMap{ + "env": Label("labels.env"), + "hostname": Label("labels.hostname"), + }, + }, + expected: []common.MapStr{ + common.MapStr{ + "target_with_labels_info": common.MapStr{ + "metric": int64(1), + }, + "labels": common.MapStr{ + "env": "prod", + "hostname": "myhost", + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.msg, func(t *testing.T) { + reporter := &mbtest.CapturingReporterV2{} + p.ReportProcessedMetrics(test.mapping, reporter) + assert.Nil(t, reporter.GetErrors(), test.msg) + // Sort slice to avoid randomness + res := reporter.GetEvents() + sort.Slice(res, func(i, j int) bool { + return res[i].MetricSetFields.String() < res[j].MetricSetFields.String() + }) + assert.Equal(t, len(test.expected), len(res)) + for j, ev := range res { + assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg) + } + }) + } +} + +func TestOpenMetricsKeyLabels(t *testing.T) { + + testCases := []struct { + testName string + openmetricsResponse string + mapping *MetricsMapping + expectedEvents []common.MapStr + }{ + { + testName: "Test gauge with KeyLabel", + openmetricsResponse: openMetricsGaugeKeyLabel, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_count_total": Metric("metrics.one.count"), + }, + Labels: map[string]LabelMap{ + "name": KeyLabel("metrics.one.labels.name"), + "surname": KeyLabel("metrics.one.labels.surname"), + "age": KeyLabel("metrics.one.labels.age"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": 1.0, + "labels": common.MapStr{ + "name": "jane", + "surname": "foster", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": 2.0, + "labels": common.MapStr{ + "name": "john", + "surname": "williams", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": 3.0, + "labels": common.MapStr{ + "name": "jahn", + "surname": "baldwin", + "age": "30", + }, + }, + }, + }, + }, + }, + + { + testName: "Test gauge with KeyLabel With NaN Inf", + openmetricsResponse: openMetricsGaugeKeyLabelWithNaNInf, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_count_errors": Metric("metrics.one.count"), + "metrics_one_count_total": Metric("metrics.one.count"), + }, + Labels: map[string]LabelMap{ + "name": KeyLabel("metrics.one.labels.name"), + "surname": KeyLabel("metrics.one.labels.surname"), + "age": KeyLabel("metrics.one.labels.age"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": 0.0, + "labels": common.MapStr{ + "name": "jane", + "surname": "foster", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": 3.0, + "labels": common.MapStr{ + "name": "jahn", + "surname": "baldwin", + "age": "30", + }, + }, + }, + }, + }, + }, + + { + testName: "Test counter with KeyLabel", + openmetricsResponse: openMetricsCounterKeyLabel, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_count_total": Metric("metrics.one.count"), + }, + Labels: map[string]LabelMap{ + "name": KeyLabel("metrics.one.labels.name"), + "surname": KeyLabel("metrics.one.labels.surname"), + "age": KeyLabel("metrics.one.labels.age"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": int64(1), + "labels": common.MapStr{ + "name": "jane", + "surname": "foster", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": int64(2), + "labels": common.MapStr{ + "name": "john", + "surname": "williams", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": int64(3), + "labels": common.MapStr{ + "name": "jahn", + "surname": "baldwin", + "age": "30", + }, + }, + }, + }, + }, + }, + + { + testName: "Test counter with KeyLabel With NaN Inf", + openmetricsResponse: openMetricsCounterKeyLabelWithNaNInf, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_count_errors": Metric("metrics.one.count"), + "metrics_one_count_total": Metric("metrics.one.count"), + }, + Labels: map[string]LabelMap{ + "name": KeyLabel("metrics.one.labels.name"), + "surname": KeyLabel("metrics.one.labels.surname"), + "age": KeyLabel("metrics.one.labels.age"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": int64(1), + "labels": common.MapStr{ + "name": "jane", + "surname": "foster", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "count": int64(3), + "labels": common.MapStr{ + "name": "jahn", + "surname": "baldwin", + "age": "30", + }, + }, + }, + }, + }, + }, + + { + testName: "Test histogram with KeyLabel", + openmetricsResponse: openMetricsHistogramKeyLabel, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_midichlorians": Metric("metrics.one.midichlorians"), + }, + Labels: map[string]LabelMap{ + "rank": KeyLabel("metrics.one.midichlorians.rank"), + "alive": KeyLabel("metrics.one.midichlorians.alive"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "midichlorians": common.MapStr{ + "count": uint64(86), + "sum": 1000001.0, + "bucket": common.MapStr{ + "2000": uint64(52), + "4000": uint64(70), + "8000": uint64(78), + "16000": uint64(84), + "32000": uint64(86), + "+Inf": uint64(86), + }, + + "rank": "youngling", + "alive": "yes", + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "midichlorians": common.MapStr{ + "count": uint64(28), + "sum": 800001.0, + "bucket": common.MapStr{ + "2000": uint64(16), + "4000": uint64(20), + "8000": uint64(23), + "16000": uint64(27), + "32000": uint64(27), + "+Inf": uint64(28), + }, + "rank": "padawan", + "alive": "yes", + }, + }, + }, + }, + }, + }, + + { + testName: "Test histogram with KeyLabel With NaN Inf", + openmetricsResponse: openMetricsHistogramKeyLabelWithNaNInf, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_one_midichlorians": Metric("metrics.one.midichlorians"), + }, + Labels: map[string]LabelMap{ + "rank": KeyLabel("metrics.one.midichlorians.rank"), + "alive": KeyLabel("metrics.one.midichlorians.alive"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "one": common.MapStr{ + "midichlorians": common.MapStr{ + "count": uint64(86), + "sum": 1000001.0, + "bucket": common.MapStr{ + "16000": uint64(84), + "32000": uint64(86), + "+Inf": uint64(86), + }, + + "rank": "youngling", + "alive": "yes", + }, + }, + }, + }, + }, + }, + + { + testName: "Test summary with KeyLabel", + openmetricsResponse: openMetricsSummaryKeyLabel, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_force_propagation_ms": Metric("metrics.force.propagation.ms"), + }, + Labels: map[string]LabelMap{ + "kind": KeyLabel("metrics.force.propagation.ms.labels.kind"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "force": common.MapStr{ + "propagation": common.MapStr{ + "ms": common.MapStr{ + "count": uint64(651), + "sum": 89.0, + "percentile": common.MapStr{ + "0": 35.0, + "25": 22.0, + "50": 7.0, + "75": 20.0, + "100": 30.0, + }, + "labels": common.MapStr{ + "kind": "jedi", + }, + }, + }, + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "force": common.MapStr{ + "propagation": common.MapStr{ + "ms": common.MapStr{ + "count": uint64(711), + "sum": 112.0, + "percentile": common.MapStr{ + "0": 30.0, + "25": 20.0, + "50": 12.0, + "75": 21.0, + "100": 29.0, + }, + "labels": common.MapStr{ + "kind": "sith", + }, + }, + }, + }, + }, + }, + }, + }, + + { + testName: "Test summary with KeyLabel With NaN Inf", + openmetricsResponse: openMetricsSummaryKeyLabelWithNaNInf, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_force_propagation_ms": Metric("metrics.force.propagation.ms"), + }, + Labels: map[string]LabelMap{ + "kind": KeyLabel("metrics.force.propagation.ms.labels.kind"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "force": common.MapStr{ + "propagation": common.MapStr{ + "ms": common.MapStr{ + "count": uint64(651), + "sum": 50.0, + "percentile": common.MapStr{ + "75": 20.0, + "100": 30.0, + }, + "labels": common.MapStr{ + "kind": "jedi", + }, + }, + }, + }, + }, + }, + }, + }, + + { + testName: "Test gauge InfoMetrics using ExtendedInfoMetric", + openmetricsResponse: openMetricsGaugeLabeled, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_that_inform_labels": ExtendedInfoMetric(Configuration{StoreNonMappedLabels: true, NonMappedLabelsPlacement: "metrics.other_labels"}), + "metrics_that_use_labels": Metric("metrics.value"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("metrics.label1"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "value": 20.0, + "label1": "I am 1", + "other_labels": common.MapStr{ + "label2": "I am 2", + "label3": "I am 3", + }, + }, + }, + }, + }, + { + testName: "Test gauge InfoMetrics using ExtendedInfoMetric and extra fields", + openmetricsResponse: openMetricsGaugeLabeled, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "metrics_that_inform_labels": ExtendedInfoMetric(Configuration{ + StoreNonMappedLabels: true, + NonMappedLabelsPlacement: "metrics.other_labels", + ExtraFields: common.MapStr{ + "metrics.extra.field1": "extra1", + "metrics.extra.field2": "extra2", + }}), + "metrics_that_use_labels": Metric("metrics.value"), + }, + Labels: map[string]LabelMap{ + "label1": KeyLabel("metrics.label1"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "value": 20.0, + "label1": "I am 1", + "other_labels": common.MapStr{ + "label2": "I am 2", + "label3": "I am 3", + }, + "extra": common.MapStr{ + "field1": "extra1", + "field2": "extra2", + }, + }, + }, + }, + }, + { + testName: "Stateset metric with labels", + openmetricsResponse: openMetricsStateset, + mapping: &MetricsMapping{ + Metrics: map[string]MetricMap{ + "enable_category": Metric("metrics.count"), + }, + Labels: map[string]LabelMap{ + "category": KeyLabel("metrics.labels.category"), + }, + }, + expectedEvents: []common.MapStr{ + common.MapStr{ + "metrics": common.MapStr{ + "count": int64(0), + "labels": common.MapStr{ + "category": "shoes", + }, + }, + }, + common.MapStr{ + "metrics": common.MapStr{ + "count": int64(1), + "labels": common.MapStr{ + "category": "collectibles", + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + r := &mbtest.CapturingReporterV2{} + p := &openmetrics{mockFetcher{response: tc.openmetricsResponse}, logp.NewLogger("test")} + p.ReportProcessedMetrics(tc.mapping, r) + if !assert.Nil(t, r.GetErrors(), + "error reporting/processing metrics, at %q", tc.testName) { + continue + } + + events := r.GetEvents() + if !assert.Equal(t, len(tc.expectedEvents), len(events), + "number of returned events doesn't match expected, at %q", tc.testName) { + continue + } + + // Sort slices of received and expeected to avoid unmatching + sort.Slice(events, func(i, j int) bool { + return events[i].MetricSetFields.String() < events[j].MetricSetFields.String() + }) + sort.Slice(tc.expectedEvents, func(i, j int) bool { + return tc.expectedEvents[i].String() < tc.expectedEvents[j].String() + }) + + for i := range events { + if !assert.Equal(t, tc.expectedEvents[i], events[i].MetricSetFields, + "mismatch at event #%d, at %q", i, tc.testName) { + + continue + } + } + } +} diff --git a/metricbeat/mb/testing/testdata.go b/metricbeat/mb/testing/testdata.go index 6e398e8e8f96..e5b853d570ea 100644 --- a/metricbeat/mb/testing/testdata.go +++ b/metricbeat/mb/testing/testdata.go @@ -43,6 +43,7 @@ import ( const ( expectedExtension = "-expected.json" + applicationJson = "application/json" ) // DataConfig is the configuration for testdata tests @@ -75,6 +76,9 @@ type DataConfig struct { // URL of the endpoint that must be tested depending on each module URL string + // ContentType of the data being returned by server + ContentType string `yaml:"content_type"` + // Suffix is the extension of the source file with the input contents. Defaults to `json`, `plain` is also a common use. Suffix string @@ -107,9 +111,10 @@ type DataConfig struct { func defaultDataConfig() DataConfig { return DataConfig{ - Path: ".", - WritePath: ".", - Suffix: "json", + Path: ".", + WritePath: ".", + Suffix: "json", + ContentType: applicationJson, } } @@ -189,7 +194,7 @@ func TestMetricsetFieldsDocumented(t *testing.T, metricSet mb.MetricSet, events func runTest(t *testing.T, file string, module, metricSetName string, config DataConfig) { // starts a server serving the given file under the given url - s := server(t, file, config.URL) + s := server(t, file, config.URL, config.ContentType) defer s.Close() moduleConfig := getConfig(module, metricSetName, s.URL, config) @@ -440,7 +445,7 @@ func getConfig(module, metricSet, url string, config DataConfig) map[string]inte } // server starts a server with a mock output -func server(t *testing.T, path string, url string) *httptest.Server { +func server(t *testing.T, path string, url string, contentType string) *httptest.Server { body, err := ioutil.ReadFile(path) if err != nil { @@ -455,7 +460,7 @@ func server(t *testing.T, path string, url string) *httptest.Server { } if r.URL.Path+query == url { - w.Header().Set("Content-Type", "application/json;") + w.Header().Set("Content-Type", contentType) w.WriteHeader(200) w.Write(body) } else { diff --git a/metricbeat/module/openmetrics/_meta/fields.yml b/metricbeat/module/openmetrics/_meta/fields.yml index c83c99f13630..12592096f5e0 100644 --- a/metricbeat/module/openmetrics/_meta/fields.yml +++ b/metricbeat/module/openmetrics/_meta/fields.yml @@ -7,18 +7,41 @@ fields: - name: openmetrics type: group + release: beta description: > `openmetrics` contains metrics from endpoints that are following Openmetrics format. fields: # Order is important here, labels will match first, the rest are double + - name: help + type: keyword + description: > + Brief description of the MetricFamily + - name: type + type: keyword + description: > + Metric type + - name: unit + type: keyword + description: > + Metric unit - name: labels.* type: object object_type: keyword description: > - Prometheus metric labels + Openmetrics metric labels - name: metrics.* type: object object_type: double object_type_mapping_type: "*" description: > - Prometheus metric + Openmetrics metric + - name: exemplar.* + type: object + object_type: keyword + description: > + Openmetrics exemplars + - name: exemplar.labels.* + type: object + object_type: keyword + description: > + Openmetrics metric exemplar labels diff --git a/metricbeat/module/openmetrics/collector/_meta/data.json b/metricbeat/module/openmetrics/collector/_meta/data.json index d20778fec4a1..668e6a887512 100644 --- a/metricbeat/module/openmetrics/collector/_meta/data.json +++ b/metricbeat/module/openmetrics/collector/_meta/data.json @@ -15,11 +15,11 @@ "job": "openmetrics" }, "metrics": { - "node_network_carrier": 0 + "up": 1 } }, "service": { "address": "127.0.0.1:55555", "type": "openmetrics" } -} \ No newline at end of file +} diff --git a/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/config.yml b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/config.yml new file mode 100644 index 000000000000..a8369b90cf22 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/config.yml @@ -0,0 +1,8 @@ +type: http +url: "/metrics" +content_type: "application/openmetrics-text" +suffix: plain +remove_fields_from_comparison: ["openmetrics.labels.instance"] +module: + enable_exemplars: false + enable_metadata: false diff --git a/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain new file mode 100644 index 000000000000..d5f0fd96fab9 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain @@ -0,0 +1,6 @@ +# HELP net_conntrack_listener_conn_accepted Total number of connections opened to the listener of a given name. +# TYPE net_conntrack_listener_conn_accepted counter +net_conntrack_listener_conn_accepted_total{listener_name="http"} 3 +# HELP net_conntrack_listener_conn_closed Total number of connections closed that were made to the listener of a given name. +# TYPE net_conntrack_listener_conn_closed counter +net_conntrack_listener_conn_closed_total{listener_name="http"} 0 diff --git a/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain-expected.json b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain-expected.json new file mode 100644 index 000000000000..e1244391ba74 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/_meta/samelabeltestdata/docs.plain-expected.json @@ -0,0 +1,52 @@ +[ + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics" + }, + "metrics": { + "up": 1 + } + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "listener_name": "http" + }, + "metrics": { + "net_conntrack_listener_conn_accepted_total": 3, + "net_conntrack_listener_conn_closed_total": 0 + } + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + } +] diff --git a/metricbeat/module/openmetrics/collector/_meta/testdata/config.yml b/metricbeat/module/openmetrics/collector/_meta/testdata/config.yml index a5d8ee128afe..37f3a8443ac5 100644 --- a/metricbeat/module/openmetrics/collector/_meta/testdata/config.yml +++ b/metricbeat/module/openmetrics/collector/_meta/testdata/config.yml @@ -1,4 +1,8 @@ type: http url: "/metrics" +content_type: "application/openmetrics-text" suffix: plain remove_fields_from_comparison: ["openmetrics.labels.instance"] +module: + enable_exemplars: true + enable_metadata: true diff --git a/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain b/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain index 47c3b38aedbe..d5f0fd96fab9 100644 --- a/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain +++ b/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain @@ -1,11 +1,6 @@ -# HELP node_network_carrier carrier value of /sys/class/net/. -# TYPE node_network_carrier gauge -node_network_carrier{device="br-0cb306323b90"} 0 -node_network_carrier{device="br-10229e3512d9"} 0 -node_network_carrier{device="br-210476dc4ef8"} 0 -node_network_carrier{device="br-33d819d5f834"} 0 -node_network_carrier{device="br-38425a39f36b"} 0 -node_network_carrier{device="br-38feb0aad6ab"} 0 -node_network_carrier{device="br-3a285aa5e58c"} 0 -node_network_carrier{device="br-425cb4c454a6"} 0 -node_network_carrier{device="br-4e623477470e"} 0 +# HELP net_conntrack_listener_conn_accepted Total number of connections opened to the listener of a given name. +# TYPE net_conntrack_listener_conn_accepted counter +net_conntrack_listener_conn_accepted_total{listener_name="http"} 3 +# HELP net_conntrack_listener_conn_closed Total number of connections closed that were made to the listener of a given name. +# TYPE net_conntrack_listener_conn_closed counter +net_conntrack_listener_conn_closed_total{listener_name="http"} 0 diff --git a/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain-expected.json b/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain-expected.json index 16f5001ba5e6..04dd247087df 100644 --- a/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain-expected.json +++ b/metricbeat/module/openmetrics/collector/_meta/testdata/docs.plain-expected.json @@ -11,137 +11,13 @@ }, "openmetrics": { "labels": { - "device": "br-10229e3512d9", - "instance": "127.0.0.1:50135", + "instance": "127.0.0.1:55922", "job": "openmetrics" }, - "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-425cb4c454a6", - "instance": "127.0.0.1:50135", - "job": "openmetrics" - }, - "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-38425a39f36b", - "instance": "127.0.0.1:50135", - "job": "openmetrics" - }, - "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "instance": "127.0.0.1:50135", - "job": "prometheus" - }, "metrics": { "up": 1 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-33d819d5f834", - "instance": "127.0.0.1:50135", - "job": "openmetrics" - }, - "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-4e623477470e", - "instance": "127.0.0.1:50135", - "job": "openmetrics" }, - "metrics": { - "node_network_carrier": 0 - } + "type":"gauge" }, "service": { "address": "127.0.0.1:55555", @@ -159,39 +35,16 @@ "period": 10000 }, "openmetrics": { + "help": "Total number of connections opened to the listener of a given name.", "labels": { - "device": "br-210476dc4ef8", - "instance": "127.0.0.1:50135", - "job": "openmetrics" + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "listener_name": "http" }, "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-0cb306323b90", - "instance": "127.0.0.1:50135", - "job": "openmetrics" + "net_conntrack_listener_conn_accepted_total": 3 }, - "metrics": { - "node_network_carrier": 0 - } + "type":"counter" }, "service": { "address": "127.0.0.1:55555", @@ -209,43 +62,20 @@ "period": 10000 }, "openmetrics": { + "help": "Total number of connections closed that were made to the listener of a given name.", "labels": { - "device": "br-38feb0aad6ab", - "instance": "127.0.0.1:50135", - "job": "openmetrics" + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "listener_name": "http" }, "metrics": { - "node_network_carrier": 0 - } - }, - "service": { - "address": "127.0.0.1:55555", - "type": "openmetrics" - } - }, - { - "event": { - "dataset": "openmetrics.collector", - "duration": 115000, - "module": "openmetrics" - }, - "metricset": { - "name": "collector", - "period": 10000 - }, - "openmetrics": { - "labels": { - "device": "br-3a285aa5e58c", - "instance": "127.0.0.1:50135", - "job": "openmetrics" + "net_conntrack_listener_conn_closed_total": 0 }, - "metrics": { - "node_network_carrier": 0 - } + "type":"counter" }, "service": { "address": "127.0.0.1:55555", "type": "openmetrics" } } -] \ No newline at end of file +] diff --git a/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain b/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain new file mode 100644 index 000000000000..eb046b4b71d6 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain @@ -0,0 +1,23 @@ +# HELP my_counter_last_increment_timestamp_milliseconds When my_counter was last incremented +# TYPE my_counter_last_increment_timestamp_milliseconds gauge +# UNIT my_counter_last_increment_timestamp_milliseconds milliseconds +my_counter_last_increment_timestamp_milliseconds 123 +# TYPE disk_errors counter +# HELP disk_errors Count total disk errors +disk_errors_total{type="netapp"} 17.0 1520879607.789 +# TYPE app info +app_info{name="open metrics collector",version="6.3.9"} 1 +# TYPE collector info +collector_info{name="metrics collector",version="8.2.7"} 1 1622329674 +# TYPE enable_category stateset +enable_category{category="shoes"} 0 +enable_category{category="shirts"} 1 +enable_category{category="shades"} 0 +# TYPE connection_errors unknown +connection_errors 42 +# TYPE cnt_rulefires_deployment counter +cnt_rulefires_deployment_total 66666.0 # {trace_id="KOO5S4vxi0o"} 0.67 +# TYPE process_cpu_seconds counter +# UNIT process_cpu_seconds seconds +# HELP process_cpu_seconds Total user and system CPU time spent in seconds. Exemplar with timestamp and labels. +process_cpu_seconds_total{entity="controller",build="8.2.7"} 11111 1622301927 # {trace_id="0d482-ac43e-d9320-debfe"} 17.0 1622302012 diff --git a/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain-expected.json b/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain-expected.json new file mode 100644 index 000000000000..ce7febdc874c --- /dev/null +++ b/metricbeat/module/openmetrics/collector/_meta/testdata/openmetrics-features.plain-expected.json @@ -0,0 +1,305 @@ +[ + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "help": "When my_counter was last incremented", + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics" + }, + "metrics": { + "my_counter_last_increment_timestamp_milliseconds": 123 + }, + "type":"gauge", + "unit":"milliseconds" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics" + }, + "metrics": { + "up": 1 + }, + "type":"gauge" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "help": "Count total disk errors", + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "type": "netapp" + }, + "metrics": { + "disk_errors_total": 17 + }, + "type":"counter" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "name": "open metrics collector", + "version": "6.3.9" + }, + "metrics": { + "app_info": 1 + }, + "type":"info" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "name": "metrics collector", + "version": "8.2.7" + }, + "metrics": { + "collector_info": 1 + }, + "type":"info" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "category": "shoes" + }, + "metrics": { + "enable_category": 0 + }, + "type":"stateset" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "category": "shirts" + }, + "metrics": { + "enable_category": 1 + }, + "type":"stateset" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "category": "shades" + }, + "metrics": { + "enable_category": 0 + }, + "type":"stateset" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics" + }, + "metrics": { + "connection_errors": 42 + }, + "type":"unknown" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "exemplar": { + "cnt_rulefires_deployment_total":0.67, + "labels": { + "trace_id":"KOO5S4vxi0o" + } + }, + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics" + }, + "metrics": { + "cnt_rulefires_deployment_total": 66666 + }, + "type":"counter" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + }, + { + "event": { + "dataset": "openmetrics.collector", + "duration": 115000, + "module": "openmetrics" + }, + "metricset": { + "name": "collector", + "period": 10000 + }, + "openmetrics": { + "exemplar": { + "labels": { + "trace_id": "0d482-ac43e-d9320-debfe" + }, + "process_cpu_seconds_total": 17, + "timestamp": 1622302012000 + }, + "help": "Total user and system CPU time spent in seconds. Exemplar with timestamp and labels.", + "labels": { + "instance": "127.0.0.1:55922", + "job": "openmetrics", + "entity": "controller", + "build": "8.2.7" + }, + "metrics": { + "process_cpu_seconds_total": 11111 + }, + "type":"counter", + "unit":"seconds" + }, + "service": { + "address": "127.0.0.1:55555", + "type": "openmetrics" + } + } +] diff --git a/metricbeat/module/openmetrics/collector/collector.go b/metricbeat/module/openmetrics/collector/collector.go index 6ce25dc90688..ab54dca1cfa5 100644 --- a/metricbeat/module/openmetrics/collector/collector.go +++ b/metricbeat/module/openmetrics/collector/collector.go @@ -18,9 +18,16 @@ package collector import ( + "regexp" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + + "github.com/elastic/beats/v7/libbeat/common" + p "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" - "github.com/elastic/beats/v7/metricbeat/module/prometheus/collector" ) const ( @@ -29,14 +36,246 @@ const ( ) var ( + // HostParser parses a OpenMetrics endpoint URL hostParser = parse.URLHostParserBuilder{ DefaultScheme: defaultScheme, DefaultPath: defaultPath, + PathConfigKey: "metrics_path", }.Build() + + upMetricName = "up" + upMetricType = textparse.MetricTypeGauge + upMetricInstanceLabel = "instance" + upMetricJobLabel = "job" + upMetricJobValue = "openmetrics" ) func init() { mb.Registry.MustAddMetricSet("openmetrics", "collector", - collector.MetricSetBuilder("openmetrics", collector.DefaultPromEventsGeneratorFactory), - mb.WithHostParser(hostParser)) + MetricSetBuilder("openmetrics", DefaultOpenMetricsEventsGeneratorFactory), + mb.WithHostParser(hostParser), + mb.DefaultMetricSet(), + ) +} + +// OpenMetricsEventsGenerator converts a OpenMetrics metric family into a OpenMetricEvent list +type OpenMetricsEventsGenerator interface { + // Start must be called before using the generator + Start() + + // converts a OpenMetrics metric family into a list of OpenMetricsEvents + GenerateOpenMetricsEvents(mf *p.OpenMetricFamily) []OpenMetricEvent + + // Stop must be called when the generator won't be used anymore + Stop() +} + +// OpenMetricsEventsGeneratorFactory creates a OpenMetricsEventsGenerator when instanciating a metricset +type OpenMetricsEventsGeneratorFactory func(ms mb.BaseMetricSet) (OpenMetricsEventsGenerator, error) + +// MetricSet for fetching openmetrics data +type MetricSet struct { + mb.BaseMetricSet + openmetrics p.OpenMetrics + includeMetrics []*regexp.Regexp + excludeMetrics []*regexp.Regexp + namespace string + openMetricsEventsGen OpenMetricsEventsGenerator + host string + eventGenStarted bool + enableExemplars bool + enableMetadata bool +} + +// MetricSetBuilder returns a builder function for a new OpenMetrics metricset using +// the given namespace and event generator +func MetricSetBuilder(namespace string, genFactory OpenMetricsEventsGeneratorFactory) func(base mb.BaseMetricSet) (mb.MetricSet, error) { + return func(base mb.BaseMetricSet) (mb.MetricSet, error) { + config := defaultConfig + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + openmetrics, err := p.NewOpenMetricsClient(base) + if err != nil { + return nil, err + } + + openMetricsEventsGen, err := genFactory(base) + if err != nil { + return nil, err + } + + ms := &MetricSet{ + BaseMetricSet: base, + openmetrics: openmetrics, + namespace: namespace, + openMetricsEventsGen: openMetricsEventsGen, + eventGenStarted: false, + enableExemplars: config.EnableExemplars, + enableMetadata: config.EnableMetadata, + } + // store host here to use it as a pointer when building `up` metric + ms.host = ms.Host() + ms.excludeMetrics, err = p.CompilePatternList(config.MetricsFilters.ExcludeMetrics) + if err != nil { + return nil, errors.Wrapf(err, "unable to compile exclude patterns") + } + ms.includeMetrics, err = p.CompilePatternList(config.MetricsFilters.IncludeMetrics) + if err != nil { + return nil, errors.Wrapf(err, "unable to compile include patterns") + } + + return ms, nil + } +} + +// Fetch fetches data and reports it +func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { + if !m.eventGenStarted { + m.openMetricsEventsGen.Start() + m.eventGenStarted = true + } + + families, err := m.openmetrics.GetFamilies() + eventList := map[textparse.MetricType]map[string]common.MapStr{} + if err != nil { + // send up event only + families = append(families, m.upMetricFamily(0.0)) + + // set the error to report it after sending the up event + err = errors.Wrap(err, "unable to decode response from openmetrics endpoint") + } else { + // add up event to the list + families = append(families, m.upMetricFamily(1.0)) + } + + for _, family := range families { + if m.skipFamily(family) { + continue + } + openMetricsEvents := m.openMetricsEventsGen.GenerateOpenMetricsEvents(family) + + for _, openMetricEvent := range openMetricsEvents { + var labelsHash string + if m.enableMetadata { + labelsHash = openMetricEvent.MetaDataHash() + } else { + labelsHash = openMetricEvent.LabelsHash() + } + if _, ok := eventList[openMetricEvent.Type]; !ok { + eventList[openMetricEvent.Type] = make(map[string]common.MapStr) + } + if _, ok := eventList[openMetricEvent.Type][labelsHash]; !ok { + eventList[openMetricEvent.Type][labelsHash] = common.MapStr{} + + // Add default instance label if not already there + if exists, _ := openMetricEvent.Labels.HasKey(upMetricInstanceLabel); !exists { + openMetricEvent.Labels.Put(upMetricInstanceLabel, m.Host()) + } + // Add default job label if not already there + if exists, _ := openMetricEvent.Labels.HasKey("job"); !exists { + openMetricEvent.Labels.Put("job", m.Module().Name()) + } + // Add labels + if len(openMetricEvent.Labels) > 0 { + eventList[openMetricEvent.Type][labelsHash]["labels"] = openMetricEvent.Labels + } + } + + if m.enableMetadata { + if openMetricEvent.Help != "" { + eventList[openMetricEvent.Type][labelsHash]["help"] = openMetricEvent.Help + } + if openMetricEvent.Type != "" { + eventList[openMetricEvent.Type][labelsHash]["type"] = openMetricEvent.Type + } + if openMetricEvent.Unit != "" { + eventList[openMetricEvent.Type][labelsHash]["unit"] = openMetricEvent.Unit + } + } + + if m.enableExemplars && len(openMetricEvent.Exemplars) > 0 { + eventList[openMetricEvent.Type][labelsHash]["exemplar"] = openMetricEvent.Exemplars + } + // Accumulate metrics in the event + eventList[openMetricEvent.Type][labelsHash].DeepUpdate(openMetricEvent.Data) + } + } + + // Report events + for _, e := range eventList { + for _, ev := range e { + isOpen := reporter.Event(mb.Event{ + RootFields: common.MapStr{m.namespace: ev}, + }) + if !isOpen { + break + } + } + } + + return err +} + +// Close stops the metricset +func (m *MetricSet) Close() error { + if m.eventGenStarted { + m.openMetricsEventsGen.Stop() + } + return nil +} + +func (m *MetricSet) upMetricFamily(value float64) *p.OpenMetricFamily { + gauge := p.Gauge{ + Value: &value, + } + label1 := labels.Label{ + Name: upMetricInstanceLabel, + Value: m.host, + } + label2 := labels.Label{ + Name: upMetricJobLabel, + Value: m.Module().Name(), + } + metric := p.OpenMetric{ + Gauge: &gauge, + Label: []*labels.Label{&label1, &label2}, + } + return &p.OpenMetricFamily{ + Name: &upMetricName, + Type: textparse.MetricType(upMetricType), + Metric: []*p.OpenMetric{&metric}, + } +} + +func (m *MetricSet) skipFamily(family *p.OpenMetricFamily) bool { + if family == nil || family.Name == nil { + return false + } + return m.skipFamilyName(*family.Name) +} + +func (m *MetricSet) skipFamilyName(family string) bool { + // example: + // include_metrics: + // - node_* + // exclude_metrics: + // - node_disk_* + // + // This would mean that we want to keep only the metrics that start with node_ prefix but + // are not related to disk so we exclude node_disk_* metrics from them. + + // if include_metrics are defined, check if this metric should be included + if len(m.includeMetrics) > 0 { + if !p.MatchMetricFamily(family, m.includeMetrics) { + return true + } + } + // now exclude the metric if it matches any of the given patterns + if len(m.excludeMetrics) > 0 { + if p.MatchMetricFamily(family, m.excludeMetrics) { + return true + } + } + return false } diff --git a/metricbeat/module/openmetrics/collector/collector_test.go b/metricbeat/module/openmetrics/collector/collector_test.go index 06a48f15a0b7..dd7986766128 100644 --- a/metricbeat/module/openmetrics/collector/collector_test.go +++ b/metricbeat/module/openmetrics/collector/collector_test.go @@ -23,6 +23,15 @@ package collector import ( "testing" + "github.com/golang/protobuf/proto" + prometheuslabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/openmetrics" @@ -31,3 +40,361 @@ import ( func TestData(t *testing.T) { mbtest.TestDataFiles(t, "openmetrics", "collector") } + +func TestSameLabels(t *testing.T) { + dataConfig := mbtest.ReadDataConfig(t, "_meta/samelabeltestdata/config.yml") + mbtest.TestDataFilesWithConfig(t, "openmetrics", "collector", dataConfig) +} +func TestGetOpenMetricsEventsFromMetricFamily(t *testing.T) { + labels := common.MapStr{ + "handler": "query", + } + tests := []struct { + Family *openmetrics.OpenMetricFamily + Event []OpenMetricEvent + }{ + { + Family: &openmetrics.OpenMetricFamily{ + Name: proto.String("http_request_duration_microseconds"), + Help: proto.String("foo"), + Type: textparse.MetricTypeCounter, + Metric: []*openmetrics.OpenMetric{ + { + Name: proto.String("http_request_duration_microseconds_total"), + Label: []*prometheuslabels.Label{ + { + Name: "handler", + Value: "query", + }, + }, + Counter: &openmetrics.Counter{ + Value: proto.Float64(10), + }, + }, + }, + }, + Event: []OpenMetricEvent{ + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds_total": float64(10), + }, + }, + Help: "foo", + Type: textparse.MetricTypeCounter, + Labels: labels, + Exemplars: common.MapStr{}, + }, + }, + }, + { + Family: &openmetrics.OpenMetricFamily{ + Name: proto.String("http_request_duration_microseconds"), + Help: proto.String("foo"), + Type: textparse.MetricTypeGauge, + Metric: []*openmetrics.OpenMetric{ + { + Gauge: &openmetrics.Gauge{ + Value: proto.Float64(10), + }, + }, + }, + }, + Event: []OpenMetricEvent{ + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds": float64(10), + }, + }, + Help: "foo", + Type: textparse.MetricTypeGauge, + Labels: common.MapStr{}, + }, + }, + }, + { + Family: &openmetrics.OpenMetricFamily{ + Name: proto.String("http_request_duration_microseconds"), + Help: proto.String("foo"), + Type: textparse.MetricTypeSummary, + Metric: []*openmetrics.OpenMetric{ + { + Summary: &openmetrics.Summary{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Quantile: []*openmetrics.Quantile{ + { + Quantile: proto.Float64(0.99), + Value: proto.Float64(10), + }, + }, + }, + }, + }, + }, + Event: []OpenMetricEvent{ + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds_count": uint64(10), + "http_request_duration_microseconds_sum": float64(10), + }, + }, + Help: "foo", + Type: textparse.MetricTypeSummary, + Labels: common.MapStr{}, + }, + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds": float64(10), + }, + }, + Labels: common.MapStr{ + "quantile": "0.99", + }, + }, + }, + }, + { + Family: &openmetrics.OpenMetricFamily{ + Name: proto.String("http_request_duration_microseconds"), + Help: proto.String("foo"), + Type: textparse.MetricTypeHistogram, + Metric: []*openmetrics.OpenMetric{ + { + Histogram: &openmetrics.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*openmetrics.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + }, + }, + }, + Event: []OpenMetricEvent{ + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds_count": uint64(10), + "http_request_duration_microseconds_sum": float64(10), + }, + }, + Help: "foo", + Type: textparse.MetricTypeHistogram, + Labels: common.MapStr{}, + }, + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds_bucket": uint64(10), + }, + }, + Labels: common.MapStr{"le": "0.99"}, + Exemplars: common.MapStr{}, + }, + }, + }, + { + Family: &openmetrics.OpenMetricFamily{ + Name: proto.String("http_request_duration_microseconds"), + Help: proto.String("foo"), + Type: textparse.MetricTypeUnknown, + Metric: []*openmetrics.OpenMetric{ + { + Label: []*prometheuslabels.Label{ + { + Name: "handler", + Value: "query", + }, + }, + Unknown: &openmetrics.Unknown{ + Value: proto.Float64(10), + }, + }, + }, + }, + Event: []OpenMetricEvent{ + { + Data: common.MapStr{ + "metrics": common.MapStr{ + "http_request_duration_microseconds": float64(10), + }, + }, + Help: "foo", + Type: textparse.MetricTypeUnknown, + Labels: labels, + }, + }, + }, + } + + p := openmetricEventGenerator{} + for _, test := range tests { + event := p.GenerateOpenMetricsEvents(test.Family) + assert.Equal(t, test.Event, event) + } +} + +func TestSkipMetricFamily(t *testing.T) { + testFamilies := []*openmetrics.OpenMetricFamily{ + { + Name: proto.String("http_request_duration_microseconds_a_a_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeCounter, + Metric: []*openmetrics.OpenMetric{ + { + Label: []*prometheuslabels.Label{ + { + Name: "handler", + Value: "query", + }, + }, + Counter: &openmetrics.Counter{ + Value: proto.Float64(10), + }, + }, + }, + }, + { + Name: proto.String("http_request_duration_microseconds_a_b_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeCounter, + Metric: []*openmetrics.OpenMetric{ + { + Label: []*prometheuslabels.Label{ + { + Name: "handler", + Value: "query", + }, + }, + Counter: &openmetrics.Counter{ + Value: proto.Float64(10), + }, + }, + }, + }, + { + Name: proto.String("http_request_duration_microseconds_b_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeGauge, + Metric: []*openmetrics.OpenMetric{ + { + Gauge: &openmetrics.Gauge{ + Value: proto.Float64(10), + }, + }, + }, + }, + { + Name: proto.String("http_request_duration_microseconds_c_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeSummary, + Metric: []*openmetrics.OpenMetric{ + { + Summary: &openmetrics.Summary{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Quantile: []*openmetrics.Quantile{ + { + Quantile: proto.Float64(0.99), + Value: proto.Float64(10), + }, + }, + }, + }, + }, + }, + { + Name: proto.String("http_request_duration_microseconds_d_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeHistogram, + Metric: []*openmetrics.OpenMetric{ + { + Histogram: &openmetrics.Histogram{ + SampleCount: proto.Uint64(10), + SampleSum: proto.Float64(10), + Bucket: []*openmetrics.Bucket{ + { + UpperBound: proto.Float64(0.99), + CumulativeCount: proto.Uint64(10), + }, + }, + }, + }, + }, + }, + { + Name: proto.String("http_request_duration_microseconds_e_in"), + Help: proto.String("foo"), + Type: textparse.MetricTypeUnknown, + Metric: []*openmetrics.OpenMetric{ + { + Label: []*prometheuslabels.Label{ + { + Name: "handler", + Value: "query", + }, + }, + Unknown: &openmetrics.Unknown{ + Value: proto.Float64(10), + }, + }, + }, + }, + } + + ms := &MetricSet{ + BaseMetricSet: mb.BaseMetricSet{}, + } + + // test with no filters + ms.includeMetrics, _ = openmetrics.CompilePatternList(&[]string{}) + ms.excludeMetrics, _ = openmetrics.CompilePatternList(&[]string{}) + metricsToKeep := 0 + for _, testFamily := range testFamilies { + if !ms.skipFamily(testFamily) { + metricsToKeep++ + } + } + assert.Equal(t, metricsToKeep, len(testFamilies)) + + // test with only one include filter + ms.includeMetrics, _ = openmetrics.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + ms.excludeMetrics, _ = openmetrics.CompilePatternList(&[]string{}) + metricsToKeep = 0 + for _, testFamily := range testFamilies { + if !ms.skipFamily(testFamily) { + metricsToKeep++ + } + } + assert.Equal(t, metricsToKeep, 2) + + // test with only one exclude filter + ms.includeMetrics, _ = openmetrics.CompilePatternList(&[]string{""}) + ms.excludeMetrics, _ = openmetrics.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + metricsToKeep = 0 + for _, testFamily := range testFamilies { + if !ms.skipFamily(testFamily) { + metricsToKeep++ + } + } + assert.Equal(t, len(testFamilies)-2, metricsToKeep) + + // test with one include and one exclude + ms.includeMetrics, _ = openmetrics.CompilePatternList(&[]string{"http_request_duration_microseconds_a_*"}) + ms.excludeMetrics, _ = openmetrics.CompilePatternList(&[]string{"http_request_duration_microseconds_a_b_*"}) + metricsToKeep = 0 + for _, testFamily := range testFamilies { + if !ms.skipFamily(testFamily) { + metricsToKeep++ + } + } + assert.Equal(t, 1, metricsToKeep) + +} diff --git a/metricbeat/module/openmetrics/collector/config.go b/metricbeat/module/openmetrics/collector/config.go new file mode 100644 index 000000000000..0e5a9884db49 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/config.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package collector + +type metricsetConfig struct { + MetricsFilters MetricFilters `config:"metrics_filters" yaml:"metrics_filters,omitempty"` + EnableExemplars bool `config:"enable_exemplars" yaml:"enable_exemplars,omitempty"` + EnableMetadata bool `config:"enable_metadata" yaml:"enable_metadata,omitempty"` +} + +type MetricFilters struct { + IncludeMetrics *[]string `config:"include" yaml:"include,omitempty"` + ExcludeMetrics *[]string `config:"exclude" yaml:"exclude,omitempty"` +} + +var defaultConfig = metricsetConfig{ + MetricsFilters: MetricFilters{ + IncludeMetrics: nil, + ExcludeMetrics: nil}, + EnableExemplars: false, + EnableMetadata: false, +} + +func (c *metricsetConfig) Validate() error { + // validate configuration here + return nil +} diff --git a/metricbeat/module/openmetrics/collector/data.go b/metricbeat/module/openmetrics/collector/data.go new file mode 100644 index 000000000000..a3b83ccd8182 --- /dev/null +++ b/metricbeat/module/openmetrics/collector/data.go @@ -0,0 +1,289 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package collector + +import ( + "math" + "strconv" + + "github.com/prometheus/prometheus/pkg/textparse" + + p "github.com/elastic/beats/v7/metricbeat/helper/openmetrics" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/metricbeat/helper/labelhash" + "github.com/elastic/beats/v7/metricbeat/mb" +) + +// OpenMetricEvent stores a set of one or more metrics with the same labels +type OpenMetricEvent struct { + Data common.MapStr + Labels common.MapStr + Help string + Type textparse.MetricType + Unit string + Exemplars common.MapStr +} + +// LabelsHash returns a repeatable string that is unique for the set of labels in this event +func (p *OpenMetricEvent) LabelsHash() string { + return labelhash.LabelHash(p.Labels) +} +func (p *OpenMetricEvent) MetaDataHash() string { + m := common.MapStr{} + m.DeepUpdate(p.Labels) + if len(p.Help) > 0 { + m["help"] = p.Help + } + if len(p.Type) > 0 { + m["type"] = p.Type + } + if len(p.Unit) > 0 { + m["unit"] = p.Unit + } + return labelhash.LabelHash(m) +} + +// DefaultOpenMetricEventsGeneratorFactory returns the default OpenMetrics events generator +func DefaultOpenMetricsEventsGeneratorFactory(ms mb.BaseMetricSet) (OpenMetricsEventsGenerator, error) { + return &openmetricEventGenerator{}, nil +} + +type openmetricEventGenerator struct{} + +func (p *openmetricEventGenerator) Start() {} +func (p *openmetricEventGenerator) Stop() {} + +// Default openmetricEventsGenerator stores all OpenMetrics metrics using +// only double field type in Elasticsearch. +func (p *openmetricEventGenerator) GenerateOpenMetricsEvents(mf *p.OpenMetricFamily) []OpenMetricEvent { + var events []OpenMetricEvent + + name := *mf.Name + metrics := mf.Metric + help := "" + unit := "" + if mf.Help != nil { + help = *mf.Help + } + if mf.Unit != nil { + unit = *mf.Unit + } + + for _, metric := range metrics { + labels := common.MapStr{} + mn := metric.GetName() + + if len(metric.Label) != 0 { + for _, label := range metric.Label { + if label.Name != "" && label.Value != "" { + labels[label.Name] = label.Value + } + } + } + + exemplars := common.MapStr{} + if metric.Exemplar != nil { + exemplars = common.MapStr{*mn: metric.Exemplar.Value} + if metric.Exemplar.HasTs { + exemplars.Put("timestamp", metric.Exemplar.Ts) + } + for _, label := range metric.Exemplar.Labels { + if label.Name != "" && label.Value != "" { + exemplars.Put("labels."+label.Name, label.Value) + } + } + } + + counter := metric.GetCounter() + if counter != nil { + if !math.IsNaN(counter.GetValue()) && !math.IsInf(counter.GetValue(), 0) { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeCounter, + Help: help, + Unit: unit, + Data: common.MapStr{ + "metrics": common.MapStr{ + *mn: counter.GetValue(), + }, + }, + Labels: labels, + Exemplars: exemplars, + }) + } + } + + gauge := metric.GetGauge() + if gauge != nil { + if !math.IsNaN(gauge.GetValue()) && !math.IsInf(gauge.GetValue(), 0) { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeGauge, + Help: help, + Unit: unit, + Data: common.MapStr{ + "metrics": common.MapStr{ + name: gauge.GetValue(), + }, + }, + Labels: labels, + }) + } + } + + info := metric.GetInfo() + if info != nil { + if info.HasValidValue() { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeInfo, + Data: common.MapStr{ + "metrics": common.MapStr{ + name: info.GetValue(), + }, + }, + Labels: labels, + }) + } + } + + stateset := metric.GetStateset() + if stateset != nil { + if stateset.HasValidValue() { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeStateset, + Data: common.MapStr{ + "metrics": common.MapStr{ + name: stateset.GetValue(), + }, + }, + Labels: labels, + }) + } + } + + summary := metric.GetSummary() + if summary != nil { + if !math.IsNaN(summary.GetSampleSum()) && !math.IsInf(summary.GetSampleSum(), 0) { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeSummary, + Help: help, + Unit: unit, + Data: common.MapStr{ + "metrics": common.MapStr{ + name + "_sum": summary.GetSampleSum(), + name + "_count": summary.GetSampleCount(), + }, + }, + Labels: labels, + }) + } + + for _, quantile := range summary.GetQuantile() { + if math.IsNaN(quantile.GetValue()) || math.IsInf(quantile.GetValue(), 0) { + continue + } + + quantileLabels := labels.Clone() + quantileLabels["quantile"] = strconv.FormatFloat(quantile.GetQuantile(), 'f', -1, 64) + events = append(events, OpenMetricEvent{ + Data: common.MapStr{ + "metrics": common.MapStr{ + name: quantile.GetValue(), + }, + }, + Labels: quantileLabels, + }) + } + } + + histogram := metric.GetHistogram() + if histogram != nil { + if !math.IsNaN(histogram.GetSampleSum()) && !math.IsInf(histogram.GetSampleSum(), 0) { + var sum = "_sum" + var count = "_count" + var typ = textparse.MetricTypeHistogram + if histogram.IsGaugeHistogram { + sum = "_gsum" + count = "_gcount" + typ = textparse.MetricTypeGaugeHistogram + } + + events = append(events, OpenMetricEvent{ + Type: typ, + Help: help, + Unit: unit, + Data: common.MapStr{ + "metrics": common.MapStr{ + name + sum: histogram.GetSampleSum(), + name + count: histogram.GetSampleCount(), + }, + }, + Labels: labels, + }) + } + + for _, bucket := range histogram.GetBucket() { + if bucket.GetCumulativeCount() == uint64(math.NaN()) || bucket.GetCumulativeCount() == uint64(math.Inf(0)) { + continue + } + + if bucket.Exemplar != nil { + exemplars = common.MapStr{name: bucket.Exemplar.Value} + if bucket.Exemplar.HasTs { + exemplars.Put("timestamp", bucket.Exemplar.Ts) + } + for _, label := range bucket.Exemplar.Labels { + if label.Name != "" && label.Value != "" { + exemplars.Put("labels."+label.Name, label.Value) + } + } + } + + bucketLabels := labels.Clone() + bucketLabels["le"] = strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64) + + events = append(events, OpenMetricEvent{ + Data: common.MapStr{ + "metrics": common.MapStr{ + name + "_bucket": bucket.GetCumulativeCount(), + }, + }, + Labels: bucketLabels, + Exemplars: exemplars, + }) + } + } + + unknown := metric.GetUnknown() + if unknown != nil { + if !math.IsNaN(unknown.GetValue()) && !math.IsInf(unknown.GetValue(), 0) { + events = append(events, OpenMetricEvent{ + Type: textparse.MetricTypeUnknown, + Help: help, + Unit: unit, + Data: common.MapStr{ + "metrics": common.MapStr{ + name: unknown.GetValue(), + }, + }, + Labels: labels, + }) + } + } + } + return events +} diff --git a/metricbeat/module/openmetrics/fields.go b/metricbeat/module/openmetrics/fields.go index b54ac18df5f1..27c738406f98 100644 --- a/metricbeat/module/openmetrics/fields.go +++ b/metricbeat/module/openmetrics/fields.go @@ -32,5 +32,5 @@ func init() { // AssetOpenmetrics returns asset data. // This is the base64 encoded zlib format compressed contents of module/openmetrics. func AssetOpenmetrics() string { - return "eJycUsFuqzAQvPMVI94hUpTkAzi8X8i7P1WJwQu4sb3WelGUv68I0JK0PbRznGHYmZH3uNCtAieKgVRckwtAnXqqsDl+sJsCEPJkMlWoSU0BWMqNuKSOY4W/BQCsHAhsB08FkEnVxS5X+F/2qqncoczZly8F0DryNld38x7RBHoOM0JviSp0wkOamS9ujzivvGc0HNW4mLFEaoUDKNrELmqG9kZhhNCy93x1sXso0LIEo4f55+ukI/7gKJYELsOFxKImKnoS2sGbmnzG1XmPYLTp0TrJuoP2BKE8HbU81Pd9JiztJ/Nh+y4s9bl+pUZX9EScJvVCtyuLXcnfTDTin3Ag7WlYppmvfgozT/HjNE/dHtRTMCm52M2fltvyl6FXaR+f5lsAAAD//0qh20E=" + return "eJzElL2O1EAQhHM/RckEJ5327gEcEBCQoXsAhO7Gdttudv7U09bit0f+W+w1BCwIKmzPVH1TM/ITzjQUCJG8IxWuUgYoq6UCDy8/pg8ZIGTJJCpQkpoMqClVwlE5+ALvMwDY7IALdW8pAxKpsm9Tgc95pxrzE/KUbP4lAxomW6di2vwEbxzdwozSIVKBVkIfl8kty6if8Ix62/i9oQpeDfuEFbOR4EC+joG9JmhnFEYITbA2XNi3u0M1QZzR58V8Sz/qHV6kJgEnsItB1HhFR0InWFOSTbiwtXBGqw4NS9ITtCMIpTm0Dn05dTZrbaQjG6/DtY4zDZcg9Wb+iwJGfRCmZrsAoZmiP00H+2gc2+GQOwb9We5svzda3XvP+lfcd0ar+1z48+MhIZRfqdoGz4PXO/J3z31mmWMPNMuq38a5eRC7r6/OxMi+XZbmj/m91Adc+kYuWiP/rr418VjdleU/3eiaf7za/U/oewAAAP//WDeHdA==" }