From 151f6147f51624490fc6f40cc43effd03497f460 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 5 May 2022 16:30:49 -0700 Subject: [PATCH 1/2] Remove PData prefix from prometheus internal struct names Signed-off-by: Bogdan Drutu --- ...util_pdata_test.go => metricsutil_test.go} | 14 +- .../prometheusreceiver/internal/ocastore.go | 8 +- .../internal/otlp_metricfamily.go | 70 +++---- .../internal/otlp_metricfamily_test.go | 12 +- .../internal/otlp_metrics_adjuster.go | 69 ++++--- .../internal/otlp_metrics_adjuster_test.go | 182 +++++++++--------- .../internal/otlp_metricsbuilder.go | 26 +-- .../internal/otlp_metricsbuilder_test.go | 62 +++--- .../internal/otlp_transaction.go | 34 ++-- .../internal/otlp_transaction_test.go | 14 +- .../internal/prom_to_otlp.go | 4 +- .../internal/prom_to_otlp_test.go | 2 +- .../metrics_receiver_helper_test.go | 2 +- 13 files changed, 244 insertions(+), 255 deletions(-) rename receiver/prometheusreceiver/internal/{metricsutil_pdata_test.go => metricsutil_test.go} (80%) diff --git a/receiver/prometheusreceiver/internal/metricsutil_pdata_test.go b/receiver/prometheusreceiver/internal/metricsutil_test.go similarity index 80% rename from receiver/prometheusreceiver/internal/metricsutil_pdata_test.go rename to receiver/prometheusreceiver/internal/metricsutil_test.go index 5b9300099390..440ca55c7b41 100644 --- a/receiver/prometheusreceiver/internal/metricsutil_pdata_test.go +++ b/receiver/prometheusreceiver/internal/metricsutil_test.go @@ -23,7 +23,7 @@ type kv struct { Key, Value string } -func distPointPdata(ts pcommon.Timestamp, bounds []float64, counts []uint64) *pmetric.HistogramDataPoint { +func distPoint(ts pcommon.Timestamp, bounds []float64, counts []uint64) *pmetric.HistogramDataPoint { hdp := pmetric.NewHistogramDataPoint() hdp.SetExplicitBounds(bounds) hdp.SetBucketCounts(counts) @@ -42,7 +42,7 @@ func distPointPdata(ts pcommon.Timestamp, bounds []float64, counts []uint64) *pm return &hdp } -func cumulativeDistMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.HistogramDataPoint) *pmetric.Metric { +func cumulativeDistMetric(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.HistogramDataPoint) *pmetric.Metric { metric := pmetric.NewMetric() metric.SetName(name) metric.SetDataType(pmetric.MetricDataTypeHistogram) @@ -63,7 +63,7 @@ func cumulativeDistMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp return &metric } -func doublePointPdata(ts pcommon.Timestamp, value float64) *pmetric.NumberDataPoint { +func doublePoint(ts pcommon.Timestamp, value float64) *pmetric.NumberDataPoint { ndp := pmetric.NewNumberDataPoint() ndp.SetTimestamp(ts) ndp.SetDoubleVal(value) @@ -71,7 +71,7 @@ func doublePointPdata(ts pcommon.Timestamp, value float64) *pmetric.NumberDataPo return &ndp } -func gaugeMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.NumberDataPoint) *pmetric.Metric { +func gaugeMetric(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.NumberDataPoint) *pmetric.Metric { metric := pmetric.NewMetric() metric.SetName(name) metric.SetDataType(pmetric.MetricDataTypeGauge) @@ -89,7 +89,7 @@ func gaugeMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, points return &metric } -func summaryPointPdata(ts pcommon.Timestamp, count uint64, sum float64, quantiles, values []float64) *pmetric.SummaryDataPoint { +func summaryPoint(ts pcommon.Timestamp, count uint64, sum float64, quantiles, values []float64) *pmetric.SummaryDataPoint { sdp := pmetric.NewSummaryDataPoint() sdp.SetTimestamp(ts) sdp.SetCount(count) @@ -103,7 +103,7 @@ func summaryPointPdata(ts pcommon.Timestamp, count uint64, sum float64, quantile return &sdp } -func summaryMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.SummaryDataPoint) *pmetric.Metric { +func summaryMetric(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.SummaryDataPoint) *pmetric.Metric { metric := pmetric.NewMetric() metric.SetName(name) metric.SetDataType(pmetric.MetricDataTypeSummary) @@ -121,7 +121,7 @@ func summaryMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, point return &metric } -func sumMetricPdata(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.NumberDataPoint) *pmetric.Metric { +func sumMetric(name string, kvp []*kv, startTs pcommon.Timestamp, points ...*pmetric.NumberDataPoint) *pmetric.Metric { metric := pmetric.NewMetric() metric.SetName(name) metric.SetDataType(pmetric.MetricDataTypeSum) diff --git a/receiver/prometheusreceiver/internal/ocastore.go b/receiver/prometheusreceiver/internal/ocastore.go index fdf660d2549b..cfd68d1c0d6e 100644 --- a/receiver/prometheusreceiver/internal/ocastore.go +++ b/receiver/prometheusreceiver/internal/ocastore.go @@ -32,7 +32,7 @@ type OcaStore struct { ctx context.Context sink consumer.Metrics - jobsMap *JobsMapPdata + jobsMap *JobsMap useStartTimeMetric bool startTimeMetricRegex string receiverID config.ComponentID @@ -51,9 +51,9 @@ func NewOcaStore( startTimeMetricRegex string, receiverID config.ComponentID, externalLabels labels.Labels) *OcaStore { - var jobsMap *JobsMapPdata + var jobsMap *JobsMap if !useStartTimeMetric { - jobsMap = NewJobsMapPdata(gcInterval) + jobsMap = NewJobsMap(gcInterval) } return &OcaStore{ ctx: ctx, @@ -68,7 +68,7 @@ func NewOcaStore( } func (o *OcaStore) Appender(ctx context.Context) storage.Appender { - return newTransactionPdata( + return newTransaction( ctx, &txConfig{ jobsMap: o.jobsMap, diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index 919d2b1ed21e..f23b617570a4 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -28,9 +28,9 @@ import ( "go.uber.org/zap" ) -type metricFamilyPdata struct { +type metricFamily struct { mtype pmetric.MetricDataType - groups map[string]*metricGroupPdata + groups map[string]*metricGroup name string mc MetadataCache droppedTimeseries int @@ -40,11 +40,11 @@ type metricFamilyPdata struct { groupOrders map[string]int } -// metricGroupPdata, represents a single metric of a metric family. for example a histogram metric is usually represent by +// metricGroup, represents a single metric of a metric family. for example a histogram metric is usually represent by // a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for // simple types like counter and gauge, each data point is a group of itself -type metricGroupPdata struct { - family *metricFamilyPdata +type metricGroup struct { + family *metricFamily ts int64 ls labels.Labels count float64 @@ -57,16 +57,16 @@ type metricGroupPdata struct { var pdataStaleFlags = pmetric.NewMetricDataPointFlags(pmetric.MetricDataPointFlagNoRecordedValue) -func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger) *metricFamilyPdata { +func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger) *metricFamily { metadata, familyName := metadataForMetric(metricName, mc) - mtype := convToPdataMetricType(metadata.Type) + mtype := convToMetricType(metadata.Type) if mtype == pmetric.MetricDataTypeNone { logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata)) } - return &metricFamilyPdata{ + return &metricFamily{ mtype: mtype, - groups: make(map[string]*metricGroupPdata), + groups: make(map[string]*metricGroup), name: familyName, mc: mc, droppedTimeseries: 0, @@ -80,9 +80,9 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logge // updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus // receiver removes any label with empty value before feeding it to an appender, in order to figure out all the labels // from the same metric family we will need to keep track of what labels have ever been observed. -func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) { +func (mf *metricFamily) updateLabelKeys(ls labels.Labels) { for _, l := range ls { - if isUsefulLabelPdata(mf.mtype, l.Name) { + if isUsefulLabel(mf.mtype, l.Name) { if _, ok := mf.labelKeys[l.Name]; !ok { mf.labelKeys[l.Name] = true // use insertion sort to maintain order @@ -97,8 +97,8 @@ func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) { } // includesMetric returns true if the metric is part of the family -func (mf *metricFamilyPdata) includesMetric(metricName string) bool { - if mf.isCumulativeTypePdata() { +func (mf *metricFamily) includesMetric(metricName string) bool { + if mf.isCumulativeType() { // If it is a merged family type, then it should match the // family name when suffixes are trimmed. return normalizeMetricName(metricName) == mf.name @@ -108,18 +108,18 @@ func (mf *metricFamilyPdata) includesMetric(metricName string) bool { return metricName == mf.name } -func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string { +func (mf *metricFamily) getGroupKey(ls labels.Labels) string { mf.updateLabelKeys(ls) return dpgSignature(mf.labelKeysOrdered, ls) } -func (mg *metricGroupPdata) sortPoints() { +func (mg *metricGroup) sortPoints() { sort.Slice(mg.complexValue, func(i, j int) bool { return mg.complexValue[i].boundary < mg.complexValue[j].boundary }) } -func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest *pmetric.HistogramDataPointSlice) bool { +func (mg *metricGroup) toDistributionPoint(orderedLabelKeys []string, dest *pmetric.HistogramDataPointSlice) bool { if !mg.hasCount || len(mg.complexValue) == 0 { return false } @@ -127,7 +127,7 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest mg.sortPoints() // for OCAgent Proto, the bounds won't include +inf - // TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds? + // TODO: (@odeke-em) should we also check OpenTelemetry for bucket bounds? bounds := make([]float64, len(mg.complexValue)-1) bucketCounts := make([]uint64, len(mg.complexValue)) @@ -164,11 +164,11 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds. tsNanos := pdataTimestampFromMs(mg.ts) - if mg.family.isCumulativeTypePdata() { + if mg.family.isCumulativeType() { point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp } point.SetTimestamp(tsNanos) - populateAttributesPdata(orderedLabelKeys, mg.ls, point.Attributes()) + populateAttributes(orderedLabelKeys, mg.ls, point.Attributes()) return true } @@ -178,7 +178,7 @@ func pdataTimestampFromMs(timeAtMs int64) pcommon.Timestamp { return pcommon.NewTimestampFromTime(time.Unix(secs, ns)) } -func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pmetric.SummaryDataPointSlice) bool { +func (mg *metricGroup) toSummaryPoint(orderedLabelKeys []string, dest *pmetric.SummaryDataPointSlice) bool { // expecting count to be provided, however, in the following two cases, they can be missed. // 1. data is corrupted // 2. ignored by startValue evaluation @@ -216,19 +216,19 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pmet // The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds. tsNanos := pdataTimestampFromMs(mg.ts) point.SetTimestamp(tsNanos) - if mg.family.isCumulativeTypePdata() { + if mg.family.isCumulativeType() { point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp } - populateAttributesPdata(orderedLabelKeys, mg.ls, point.Attributes()) + populateAttributes(orderedLabelKeys, mg.ls, point.Attributes()) return true } -func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *pmetric.NumberDataPointSlice) bool { +func (mg *metricGroup) toNumberDataPoint(orderedLabelKeys []string, dest *pmetric.NumberDataPointSlice) bool { var startTsNanos pcommon.Timestamp tsNanos := pdataTimestampFromMs(mg.ts) // gauge/undefined types have no start time. - if mg.family.isCumulativeTypePdata() { + if mg.family.isCumulativeType() { startTsNanos = tsNanos // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp } @@ -240,12 +240,12 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p } else { point.SetDoubleVal(mg.value) } - populateAttributesPdata(orderedLabelKeys, mg.ls, point.Attributes()) + populateAttributes(orderedLabelKeys, mg.ls, point.Attributes()) return true } -func populateAttributesPdata(orderedKeys []string, ls labels.Labels, dest pcommon.Map) { +func populateAttributes(orderedKeys []string, ls labels.Labels, dest pcommon.Map) { src := ls.Map() for _, key := range orderedKeys { if src[key] == "" { @@ -257,18 +257,18 @@ func populateAttributesPdata(orderedKeys []string, ls labels.Labels, dest pcommo } // Purposefully being referenced to avoid lint warnings about being "unused". -var _ = (*metricFamilyPdata)(nil).updateLabelKeys +var _ = (*metricFamily)(nil).updateLabelKeys -func (mf *metricFamilyPdata) isCumulativeTypePdata() bool { +func (mf *metricFamily) isCumulativeType() bool { return mf.mtype == pmetric.MetricDataTypeSum || mf.mtype == pmetric.MetricDataTypeHistogram || mf.mtype == pmetric.MetricDataTypeSummary } -func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.Labels, ts int64) *metricGroupPdata { +func (mf *metricFamily) loadMetricGroupOrCreate(groupKey string, ls labels.Labels, ts int64) *metricGroup { mg, ok := mf.groups[groupKey] if !ok { - mg = &metricGroupPdata{ + mg = &metricGroup{ family: mf, ts: ts, ls: ls, @@ -281,7 +281,7 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels. return mg } -func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v float64) error { +func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v float64) error { groupKey := mf.getGroupKey(ls) mg := mf.loadMetricGroupOrCreate(groupKey, ls, t) switch mf.mtype { @@ -297,7 +297,7 @@ func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v mg.count = v mg.hasCount = true default: - boundary, err := getBoundaryPdata(mf.mtype, ls) + boundary, err := getBoundary(mf.mtype, ls) if err != nil { mf.droppedTimeseries++ return err @@ -312,15 +312,15 @@ func (mf *metricFamilyPdata) Add(metricName string, ls labels.Labels, t int64, v } // getGroups to return groups in insertion order -func (mf *metricFamilyPdata) getGroups() []*metricGroupPdata { - groups := make([]*metricGroupPdata, len(mf.groupOrders)) +func (mf *metricFamily) getGroups() []*metricGroup { + groups := make([]*metricGroup, len(mf.groupOrders)) for k, v := range mf.groupOrders { groups[v] = mf.groups[k] } return groups } -func (mf *metricFamilyPdata) ToMetricPdata(metrics *pmetric.MetricSlice) (int, int) { +func (mf *metricFamily) ToMetric(metrics *pmetric.MetricSlice) (int, int) { metric := pmetric.NewMetric() metric.SetDataType(mf.mtype) metric.SetName(mf.name) diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go index d359043e75aa..f63e883df5b6 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily_test.go @@ -162,7 +162,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop()) + mp := newMetricFamily(tt.metricName, mc, zap.NewNop()) for _, tv := range tt.scrapes { var lbls labels.Labels if tv.extraLabel.Name != "" { @@ -178,7 +178,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey) sl := pmetric.NewMetricSlice() - mp.ToMetricPdata(&sl) + mp.ToMetric(&sl) require.Equal(t, 1, sl.Len(), "Exactly one metric expected") metric := sl.At(0) @@ -378,7 +378,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop()) + mp := newMetricFamily(tt.name, mc, zap.NewNop()) for _, lbs := range tt.labelsScrapes { for _, scrape := range lbs.scrapes { require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value)) @@ -394,7 +394,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey) sl := pmetric.NewMetricSlice() - mp.ToMetricPdata(&sl) + mp.ToMetric(&sl) require.Equal(t, 1, sl.Len(), "Exactly one metric expected") metric := sl.At(0) @@ -467,7 +467,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop()) + mp := newMetricFamily(tt.metricKind, mc, zap.NewNop()) for _, tv := range tt.scrapes { require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value)) } @@ -477,7 +477,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { require.NotNil(t, mp.groups[groupKey], "Expecting the groupKey to have a value given key:: "+groupKey) sl := pmetric.NewMetricSlice() - mp.ToMetricPdata(&sl) + mp.ToMetric(&sl) require.Equal(t, 1, sl.Len(), "Exactly one metric expected") metric := sl.At(0) diff --git a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go index 3f5bbcf9d75b..db0c0623e146 100644 --- a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go @@ -61,7 +61,7 @@ import ( // timeseriesinfo contains the information necessary to adjust from the initial point and to detect // resets. -type timeseriesinfoPdata struct { +type timeseriesinfo struct { mark bool initial *pmetric.Metric previous *pmetric.Metric @@ -69,21 +69,21 @@ type timeseriesinfoPdata struct { // timeseriesMap maps from a timeseries instance (metric * label values) to the timeseries info for // the instance. -type timeseriesMapPdata struct { +type timeseriesMap struct { sync.RWMutex // The mutex is used to protect access to the member fields. It is acquired for the entirety of // AdjustMetricSlice() and also acquired by gc(). mark bool - tsiMap map[string]*timeseriesinfoPdata + tsiMap map[string]*timeseriesinfo } // Get the timeseriesinfo for the timeseries associated with the metric and label values. -func (tsm *timeseriesMapPdata) get(metric *pmetric.Metric, kv pcommon.Map) *timeseriesinfoPdata { +func (tsm *timeseriesMap) get(metric *pmetric.Metric, kv pcommon.Map) *timeseriesinfo { // This should only be invoked be functions called (directly or indirectly) by AdjustMetricSlice(). // The lock protecting tsm.tsiMap is acquired there. name := metric.Name() - sig := getTimeseriesSignaturePdata(name, kv) + sig := getTimeseriesSignature(name, kv) if metric.DataType() == pmetric.MetricDataTypeHistogram { // There are 2 types of Histograms whose aggregation temporality needs distinguishing: // * CumulativeHistogram @@ -93,7 +93,7 @@ func (tsm *timeseriesMapPdata) get(metric *pmetric.Metric, kv pcommon.Map) *time } tsi, ok := tsm.tsiMap[sig] if !ok { - tsi = ×eriesinfoPdata{} + tsi = ×eriesinfo{} tsm.tsiMap[sig] = tsi } tsm.mark = true @@ -102,7 +102,7 @@ func (tsm *timeseriesMapPdata) get(metric *pmetric.Metric, kv pcommon.Map) *time } // Create a unique timeseries signature consisting of the metric name and label values. -func getTimeseriesSignaturePdata(name string, kv pcommon.Map) string { +func getTimeseriesSignature(name string, kv pcommon.Map) string { labelValues := make([]string, 0, kv.Len()) kv.Sort().Range(func(_ string, attrValue pcommon.Value) bool { value := attrValue.StringVal() @@ -115,7 +115,7 @@ func getTimeseriesSignaturePdata(name string, kv pcommon.Map) string { } // Remove timeseries that have aged out. -func (tsm *timeseriesMapPdata) gc() { +func (tsm *timeseriesMap) gc() { tsm.Lock() defer tsm.Unlock() // this shouldn't happen under the current gc() strategy @@ -132,28 +132,28 @@ func (tsm *timeseriesMapPdata) gc() { tsm.mark = false } -func newTimeseriesMapPdata() *timeseriesMapPdata { - return ×eriesMapPdata{mark: true, tsiMap: map[string]*timeseriesinfoPdata{}} +func newTimeseriesMap() *timeseriesMap { + return ×eriesMap{mark: true, tsiMap: map[string]*timeseriesinfo{}} } -// JobsMapPdata maps from a job instance to a map of timeseriesPdata instances for the job. -type JobsMapPdata struct { +// JobsMap maps from a job instance to a map of timeseries instances for the job. +type JobsMap struct { sync.RWMutex // The mutex is used to protect access to the member fields. It is acquired for most of // get() and also acquired by gc(). gcInterval time.Duration lastGC time.Time - jobsMap map[string]*timeseriesMapPdata + jobsMap map[string]*timeseriesMap } -// NewJobsMap creates a new (empty) JobsMapPdata. -func NewJobsMapPdata(gcInterval time.Duration) *JobsMapPdata { - return &JobsMapPdata{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMapPdata)} +// NewJobsMap creates a new (empty) JobsMap. +func NewJobsMap(gcInterval time.Duration) *JobsMap { + return &JobsMap{gcInterval: gcInterval, lastGC: time.Now(), jobsMap: make(map[string]*timeseriesMap)} } // Remove jobs and timeseries that have aged out. -func (jm *JobsMapPdata) gc() { +func (jm *JobsMap) gc() { jm.Lock() defer jm.Unlock() // once the structure is locked, confirm that gc() is still necessary @@ -161,7 +161,7 @@ func (jm *JobsMapPdata) gc() { for sig, tsm := range jm.jobsMap { tsm.RLock() tsmNotMarked := !tsm.mark - // take a read lock here, no need to get a full lock as we have a lock on the JobsMapPdata + // take a read lock here, no need to get a full lock as we have a lock on the JobsMap tsm.RUnlock() if tsmNotMarked { delete(jm.jobsMap, sig) @@ -174,7 +174,7 @@ func (jm *JobsMapPdata) gc() { } } -func (jm *JobsMapPdata) maybeGC() { +func (jm *JobsMap) maybeGC() { // speculatively check if gc() is necessary, recheck once the structure is locked jm.RLock() defer jm.RUnlock() @@ -183,7 +183,7 @@ func (jm *JobsMapPdata) maybeGC() { } } -func (jm *JobsMapPdata) get(job, instance string) *timeseriesMapPdata { +func (jm *JobsMap) get(job, instance string) *timeseriesMap { sig := job + ":" + instance // a read locke is taken here as we will not need to modify jobsMap if the target timeseriesMap is available. jm.RLock() @@ -201,22 +201,22 @@ func (jm *JobsMapPdata) get(job, instance string) *timeseriesMapPdata { if ok2 { return tsm2 } - tsm2 = newTimeseriesMapPdata() + tsm2 = newTimeseriesMap() jm.jobsMap[sig] = tsm2 return tsm2 } -// MetricsAdjusterPdata takes a map from a metric instance to the initial point in the metrics instance +// MetricsAdjuster takes a map from a metric instance to the initial point in the metrics instance // and provides AdjustMetricSlice, which takes a sequence of metrics and adjust their start times based on // the initial points. -type MetricsAdjusterPdata struct { - tsm *timeseriesMapPdata +type MetricsAdjuster struct { + tsm *timeseriesMap logger *zap.Logger } // NewMetricsAdjuster is a constructor for MetricsAdjuster. -func NewMetricsAdjusterPdata(tsm *timeseriesMapPdata, logger *zap.Logger) *MetricsAdjusterPdata { - return &MetricsAdjusterPdata{ +func NewMetricsAdjuster(tsm *timeseriesMap, logger *zap.Logger) *MetricsAdjuster { + return &MetricsAdjuster{ tsm: tsm, logger: logger, } @@ -225,7 +225,7 @@ func NewMetricsAdjusterPdata(tsm *timeseriesMapPdata, logger *zap.Logger) *Metri // AdjustMetricSlice takes a sequence of metrics and adjust their start times based on the initial and // previous points in the timeseriesMap. // Returns the total number of timeseries that had reset start times. -func (ma *MetricsAdjusterPdata) AdjustMetricSlice(metricL *pmetric.MetricSlice) int { +func (ma *MetricsAdjuster) AdjustMetricSlice(metricL *pmetric.MetricSlice) int { resets := 0 // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that // nothing else can modify the data used for adjustment. @@ -241,7 +241,7 @@ func (ma *MetricsAdjusterPdata) AdjustMetricSlice(metricL *pmetric.MetricSlice) // AdjustMetrics takes a sequence of metrics and adjust their start times based on the initial and // previous points in the timeseriesMap. // Returns the total number of timeseries that had reset start times. -func (ma *MetricsAdjusterPdata) AdjustMetrics(metrics *pmetric.Metrics) int { +func (ma *MetricsAdjuster) AdjustMetrics(metrics *pmetric.Metrics) int { resets := 0 // The lock on the relevant timeseriesMap is held throughout the adjustment process to ensure that // nothing else can modify the data used for adjustment. @@ -261,7 +261,7 @@ func (ma *MetricsAdjusterPdata) AdjustMetrics(metrics *pmetric.Metrics) int { } // Returns the number of timeseries with reset start times. -func (ma *MetricsAdjusterPdata) adjustMetric(metric *pmetric.Metric) int { +func (ma *MetricsAdjuster) adjustMetric(metric *pmetric.Metric) int { switch metric.DataType() { case pmetric.MetricDataTypeGauge: // gauges don't need to be adjusted so no additional processing is necessary @@ -272,7 +272,7 @@ func (ma *MetricsAdjusterPdata) adjustMetric(metric *pmetric.Metric) int { } // Returns the number of timeseries that had reset start times. -func (ma *MetricsAdjusterPdata) adjustMetricPoints(metric *pmetric.Metric) int { +func (ma *MetricsAdjuster) adjustMetricPoints(metric *pmetric.Metric) int { switch dataType := metric.DataType(); dataType { case pmetric.MetricDataTypeGauge: return ma.adjustMetricGauge(metric) @@ -295,7 +295,7 @@ func (ma *MetricsAdjusterPdata) adjustMetricPoints(metric *pmetric.Metric) int { // Returns true if 'current' was adjusted and false if 'current' is an the initial occurrence or a // reset of the timeseries. -func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pmetric.Metric) (resets int) { +func (ma *MetricsAdjuster) adjustMetricGauge(current *pmetric.Metric) (resets int) { currentPoints := current.Gauge().DataPoints() for i := 0; i < currentPoints.Len(); i++ { @@ -343,7 +343,7 @@ func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pmetric.Metric) (rese return } -func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pmetric.Metric) (resets int) { +func (ma *MetricsAdjuster) adjustMetricHistogram(current *pmetric.Metric) (resets int) { histogram := current.Histogram() if histogram.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { // Only dealing with CumulativeDistributions. @@ -408,7 +408,7 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pmetric.Metric) ( return } -func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pmetric.Metric) (resets int) { +func (ma *MetricsAdjuster) adjustMetricSum(current *pmetric.Metric) (resets int) { currentPoints := current.Sum().DataPoints() for i := 0; i < currentPoints.Len(); i++ { @@ -462,7 +462,7 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pmetric.Metric) (resets return } -func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pmetric.Metric) (resets int) { +func (ma *MetricsAdjuster) adjustMetricSummary(current *pmetric.Metric) (resets int) { currentPoints := current.Summary().DataPoints() for i := 0; i < currentPoints.Len(); i++ { @@ -506,7 +506,6 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pmetric.Metric) (re if (currentSummary.Count() != 0 && previousSummary.Count() != 0 && currentSummary.Count() < previousSummary.Count()) || - (currentSummary.Sum() != 0 && previousSummary.Sum() != 0 && currentSummary.Sum() < previousSummary.Sum()) { diff --git a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go index 4fd78fcdfce5..cb04ad18d584 100644 --- a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go @@ -41,7 +41,7 @@ var ( ) func Test_gauge_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Gauge: round 1 - gauge not adjusted", func() *pmetric.MetricSlice { @@ -140,11 +140,11 @@ func Test_gauge_pdata(t *testing.T) { 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_cumulative_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Cumulative: round 1 - initial instance, start time is established", func() *pmetric.MetricSlice { @@ -316,7 +316,7 @@ func Test_cumulative_pdata(t *testing.T) { 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func populateSummary(sdp *pmetric.SummaryDataPoint, timestamp pcommon.Timestamp, count uint64, sum float64, quantilePercents, quantileValues []float64) { @@ -332,7 +332,7 @@ func populateSummary(sdp *pmetric.SummaryDataPoint, timestamp pcommon.Timestamp, } func Test_summary_no_count_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Summary No Count: round 1 - initial instance, start time is established", func() *pmetric.MetricSlice { @@ -440,11 +440,11 @@ func Test_summary_no_count_pdata(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_summary_flag_norecordedvalue(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Summary No Count: round 1 - initial instance, start time is established", func() *pmetric.MetricSlice { @@ -507,11 +507,11 @@ func Test_summary_flag_norecordedvalue(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_summary_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Summary: round 1 - initial instance, start time is established", metricSlice( @@ -554,19 +554,9 @@ func Test_summary_pdata(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } -var ( - distPoint = distPointPdata - histogramMetric = cumulativeDistMetricPdata - doublePoint = doublePointPdata - gaugeMetric = gaugeMetricPdata - summaryPoint = summaryPointPdata - summaryMetric = summaryMetricPdata - sumMetric = sumMetricPdata -) - func metricSlice(metrics ...*pmetric.Metric) *pmetric.MetricSlice { ms := pmetric.NewMetricSlice() for _, metric := range metrics { @@ -594,38 +584,38 @@ var ( ) func Test_cumulativeDistribution_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "CumulativeDist: round 1 - initial instance, start time is established", - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7}))), - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7}))), 1, }, { "CumulativeDist: round 2 - instance adjusted based on round 1", - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8}))), - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8}))), 0, }, { "CumulativeDist: round 3 - instance reset (value less than previous value), start time is reset", - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7}))), - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7}))), 1, }, { "CumulativeDist: round 4 - instance adjusted based on round 3", - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12}))), - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12}))), 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_histogram_flag_norecordedvalue(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Histogram: round 1 - initial instance, start time is established", - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))), - metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))), + metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{7, 4, 2, 12}))), 1, }, { @@ -640,7 +630,7 @@ func Test_histogram_flag_norecordedvalue(t *testing.T) { dp := destPointL.AppendEmpty() dp.SetTimestamp(pdt2Ms) dp.SetFlags(1) - return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + return metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) }(), func() *pmetric.MetricSlice { metric := pmetric.NewMetric() @@ -652,13 +642,13 @@ func Test_histogram_flag_norecordedvalue(t *testing.T) { dp := destPointL.AppendEmpty() dp.SetTimestamp(pdt2Ms) dp.SetFlags(1) - return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + return metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) }(), 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) { @@ -672,7 +662,7 @@ func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) { dp := destPointL.AppendEmpty() dp.SetTimestamp(pdt1Ms) dp.SetFlags(1) - return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + return metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) }() m2 := func() *pmetric.MetricSlice { metric := pmetric.NewMetric() @@ -684,9 +674,9 @@ func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) { dp := destPointL.AppendEmpty() dp.SetTimestamp(pdt2Ms) dp.SetFlags(1) - return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + return metricSlice(cumulativeDistMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) }() - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Histogram: round 1 - initial instance, start time is unknown", m1, @@ -701,7 +691,7 @@ func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_summary_flag_norecordedvalue_first_observation(t *testing.T) { @@ -727,7 +717,7 @@ func Test_summary_flag_norecordedvalue_first_observation(t *testing.T) { dp.SetFlags(1) return metricSlice(summaryMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) }() - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Summary: round 1 - initial instance, start time is unknown", m1, @@ -742,7 +732,7 @@ func Test_summary_flag_norecordedvalue_first_observation(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_gauge_flag_norecordedvalue_first_observation(t *testing.T) { @@ -768,7 +758,7 @@ func Test_gauge_flag_norecordedvalue_first_observation(t *testing.T) { dp.SetFlags(1) return metricSlice(gaugeMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) }() - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Gauge: round 1 - initial instance, start time is unknown", m1, @@ -783,7 +773,7 @@ func Test_gauge_flag_norecordedvalue_first_observation(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_sum_flag_norecordedvalue_first_observation(t *testing.T) { @@ -811,7 +801,7 @@ func Test_sum_flag_norecordedvalue_first_observation(t *testing.T) { dp.SetFlags(1) return metricSlice(sumMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) }() - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "Sum: round 1 - initial instance, start time is unknown", m1, @@ -826,24 +816,24 @@ func Test_sum_flag_norecordedvalue_first_observation(t *testing.T) { }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_multiMetrics_pdata(t *testing.T) { g1 := "gauge1" - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "MultiMetrics: round 1 - combined round 1 of individual metrics", metricSlice( gaugeMetric(g1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), summaryMetric(s1, k1v1k2v2, pdt1Ms, summaryPoint(pdt1Ms, 10, 40, percent0, []float64{1, 5, 8})), ), metricSlice( gaugeMetric(g1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), summaryMetric(s1, k1v1k2v2, pdt1Ms, summaryPoint(pdt1Ms, 10, 40, percent0, []float64{1, 5, 8})), ), 3, @@ -852,13 +842,13 @@ func Test_multiMetrics_pdata(t *testing.T) { metricSlice( gaugeMetric(g1, k1v1k2v2, pdt2Ms, doublePoint(pdt2Ms, 66)), sumMetric(c1, k1v1k2v2, pdt2Ms, doublePoint(pdt2Ms, 66)), - histogramMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8})), summaryMetric(s1, k1v1k2v2, pdt2Ms, summaryPoint(pdt2Ms, 15, 70, percent0, []float64{7, 44, 9})), ), metricSlice( gaugeMetric(g1, k1v1k2v2, pdt2Ms, doublePoint(pdt2Ms, 66)), sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt2Ms, 66)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{6, 3, 4, 8})), summaryMetric(s1, k1v1k2v2, pdt1Ms, summaryPoint(pdt2Ms, 15, 70, percent0, []float64{7, 44, 9})), ), 0, @@ -867,13 +857,13 @@ func Test_multiMetrics_pdata(t *testing.T) { metricSlice( gaugeMetric(g1, k1v1k2v2, pdt3Ms, doublePoint(pdt3Ms, 55)), sumMetric(c1, k1v1k2v2, pdt3Ms, doublePoint(pdt3Ms, 55)), - histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7})), summaryMetric(s1, k1v1k2v2, pdt3Ms, summaryPoint(pdt3Ms, 12, 66, percent0, []float64{3, 22, 5})), ), metricSlice( gaugeMetric(g1, k1v1k2v2, pdt3Ms, doublePoint(pdt3Ms, 55)), sumMetric(c1, k1v1k2v2, pdt3Ms, doublePoint(pdt3Ms, 55)), - histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{5, 3, 2, 7})), summaryMetric(s1, k1v1k2v2, pdt3Ms, summaryPoint(pdt3Ms, 12, 66, percent0, []float64{3, 22, 5})), ), 3, @@ -881,22 +871,22 @@ func Test_multiMetrics_pdata(t *testing.T) { "MultiMetrics: round 4 - combined round 4 of individual metrics", metricSlice( sumMetric(c1, k1v1k2v2, pdt4Ms, doublePoint(pdt4Ms, 72)), - histogramMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12})), summaryMetric(s1, k1v1k2v2, pdt4Ms, summaryPoint(pdt4Ms, 14, 96, percent0, []float64{9, 47, 8})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt3Ms, doublePoint(pdt4Ms, 72)), - histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt4Ms, bounds0, []uint64{7, 4, 2, 12})), summaryMetric(s1, k1v1k2v2, pdt3Ms, summaryPoint(pdt4Ms, 14, 96, percent0, []float64{9, 47, 8})), ), 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_multiTimeseries_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "MultiTimeseries: round 1 - initial first instance, start time is established", metricSlice(sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44))), @@ -952,7 +942,7 @@ func Test_multiTimeseries_pdata(t *testing.T) { 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } var ( @@ -962,7 +952,7 @@ var ( ) func Test_emptyLabels_pdata(t *testing.T) { - script := []*metricsAdjusterTestPdata{ + script := []*metricsAdjusterTest{ { "EmptyLabels: round 1 - initial instance, implicitly empty labels, start time is established", metricSlice(sumMetric(c1, emptyLabels, pdt1Ms, doublePoint(pdt1Ms, 44))), @@ -985,99 +975,99 @@ func Test_emptyLabels_pdata(t *testing.T) { 0, }, } - runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) + runScript(t, NewJobsMap(time.Minute).get("job", "0"), script) } func Test_tsGC_pdata(t *testing.T) { - script1 := []*metricsAdjusterTestPdata{ + script1 := []*metricsAdjusterTest{ { "TsGC: round 1 - initial instances, start time is established", metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v10k2v20, pdt1Ms, doublePoint(pdt1Ms, 20)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), - histogramMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v10k2v20, pdt1Ms, doublePoint(pdt1Ms, 20)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), - histogramMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), ), 4, }, } - script2 := []*metricsAdjusterTestPdata{ + script2 := []*metricsAdjusterTest{ { "TsGC: round 2 - metrics first timeseries adjusted based on round 2, second timeseries not updated", metricSlice( sumMetric(c1, k1v1k2v2, pdt2Ms, doublePoint(pdt2Ms, 88)), - histogramMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{8, 7, 9, 14})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt2Ms, distPoint(pdt2Ms, bounds0, []uint64{8, 7, 9, 14})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt2Ms, 88)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{8, 7, 9, 14})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt2Ms, bounds0, []uint64{8, 7, 9, 14})), ), 0, }, } - script3 := []*metricsAdjusterTestPdata{ + script3 := []*metricsAdjusterTest{ { "TsGC: round 3 - metrics first timeseries adjusted based on round 2, second timeseries empty due to timeseries gc()", metricSlice( sumMetric(c1, k1v1k2v2, pdt3Ms, doublePoint(pdt3Ms, 99)), sumMetric(c1, k1v10k2v20, pdt3Ms, doublePoint(pdt3Ms, 80)), - histogramMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{9, 8, 10, 15})), - histogramMetric(cd1, k1v10k2v20, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{55, 66, 33, 77})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{9, 8, 10, 15})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{55, 66, 33, 77})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt3Ms, 99)), sumMetric(c1, k1v10k2v20, pdt3Ms, doublePoint(pdt3Ms, 80)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt3Ms, bounds0, []uint64{9, 8, 10, 15})), - histogramMetric(cd1, k1v10k2v20, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{55, 66, 33, 77})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt3Ms, bounds0, []uint64{9, 8, 10, 15})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt3Ms, distPoint(pdt3Ms, bounds0, []uint64{55, 66, 33, 77})), ), 2, }, } - jobsMap := NewJobsMapPdata(time.Minute) + jobsMap := NewJobsMap(time.Minute) // run round 1 - runScriptPdata(t, jobsMap.get("job", "0"), script1) + runScript(t, jobsMap.get("job", "0"), script1) // gc the tsmap, unmarking all entries jobsMap.get("job", "0").gc() // run round 2 - update metrics first timeseries only - runScriptPdata(t, jobsMap.get("job", "0"), script2) + runScript(t, jobsMap.get("job", "0"), script2) // gc the tsmap, collecting umarked entries jobsMap.get("job", "0").gc() // run round 3 - verify that metrics second timeseries have been gc'd - runScriptPdata(t, jobsMap.get("job", "0"), script3) + runScript(t, jobsMap.get("job", "0"), script3) } func Test_jobGC_pdata(t *testing.T) { - job1Script1 := []*metricsAdjusterTestPdata{ + job1Script1 := []*metricsAdjusterTest{ { "JobGC: job 1, round 1 - initial instances, adjusted should be empty", metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v10k2v20, pdt1Ms, doublePoint(pdt1Ms, 20)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), - histogramMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt1Ms, doublePoint(pdt1Ms, 44)), sumMetric(c1, k1v10k2v20, pdt1Ms, doublePoint(pdt1Ms, 20)), - histogramMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), - histogramMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{4, 2, 3, 7})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt1Ms, distPoint(pdt1Ms, bounds0, []uint64{40, 20, 30, 70})), ), 4, }, } emptyMetricSlice := func() *pmetric.MetricSlice { ms := pmetric.NewMetricSlice(); return &ms } - job2Script1 := []*metricsAdjusterTestPdata{ + job2Script1 := []*metricsAdjusterTest{ { "JobGC: job2, round 1 - no metrics adjusted, just trigger gc", emptyMetricSlice(), @@ -1086,55 +1076,55 @@ func Test_jobGC_pdata(t *testing.T) { }, } - job1Script2 := []*metricsAdjusterTestPdata{ + job1Script2 := []*metricsAdjusterTest{ { "JobGC: job 1, round 2 - metrics timeseries empty due to job-level gc", metricSlice( sumMetric(c1, k1v1k2v2, pdt4Ms, doublePoint(pdt4Ms, 99)), sumMetric(c1, k1v10k2v20, pdt4Ms, doublePoint(pdt4Ms, 80)), - histogramMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{9, 8, 10, 15})), - histogramMetric(cd1, k1v10k2v20, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{55, 66, 33, 77})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{9, 8, 10, 15})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{55, 66, 33, 77})), ), metricSlice( sumMetric(c1, k1v1k2v2, pdt4Ms, doublePoint(pdt4Ms, 99)), sumMetric(c1, k1v10k2v20, pdt4Ms, doublePoint(pdt4Ms, 80)), - histogramMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{9, 8, 10, 15})), - histogramMetric(cd1, k1v10k2v20, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{55, 66, 33, 77})), + cumulativeDistMetric(cd1, k1v1k2v2, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{9, 8, 10, 15})), + cumulativeDistMetric(cd1, k1v10k2v20, pdt4Ms, distPoint(pdt4Ms, bounds0, []uint64{55, 66, 33, 77})), ), 4, }, } gcInterval := 10 * time.Millisecond - jobsMap := NewJobsMapPdata(gcInterval) + jobsMap := NewJobsMap(gcInterval) // run job 1, round 1 - all entries marked - runScriptPdata(t, jobsMap.get("job", "0"), job1Script1) + runScript(t, jobsMap.get("job", "0"), job1Script1) // sleep longer than gcInterval to enable job gc in the next run time.Sleep(2 * gcInterval) // run job 2, round1 - trigger job gc, unmarking all entries - runScriptPdata(t, jobsMap.get("job", "1"), job2Script1) + runScript(t, jobsMap.get("job", "1"), job2Script1) // sleep longer than gcInterval to enable job gc in the next run time.Sleep(2 * gcInterval) // re-run job 2, round1 - trigger job gc, removing unmarked entries - runScriptPdata(t, jobsMap.get("job", "1"), job2Script1) + runScript(t, jobsMap.get("job", "1"), job2Script1) // ensure that at least one jobsMap.gc() completed jobsMap.gc() // run job 1, round 2 - verify that all job 1 timeseries have been gc'd - runScriptPdata(t, jobsMap.get("job", "0"), job1Script2) + runScript(t, jobsMap.get("job", "0"), job1Script2) } -type metricsAdjusterTestPdata struct { +type metricsAdjusterTest struct { description string metrics *pmetric.MetricSlice adjusted *pmetric.MetricSlice resets int } -func runScriptPdata(t *testing.T, tsm *timeseriesMapPdata, script []*metricsAdjusterTestPdata) { +func runScript(t *testing.T, tsm *timeseriesMap, script []*metricsAdjusterTest) { l := zap.NewNop() t.Cleanup(func() { require.NoError(t, l.Sync()) }) // flushes buffer, if any - ma := NewMetricsAdjusterPdata(tsm, l) + ma := NewMetricsAdjuster(tsm, l) for _, test := range script { expectedResets := test.resets diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go index 06d699d855f1..65f4b7aaf0a3 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -func isUsefulLabelPdata(mType pmetric.MetricDataType, labelKey string) bool { +func isUsefulLabel(mType pmetric.MetricDataType, labelKey string) bool { switch labelKey { case model.MetricNameLabel, model.InstanceLabel, model.SchemeLabel, model.MetricsPathLabel, model.JobLabel: return false @@ -40,7 +40,7 @@ func isUsefulLabelPdata(mType pmetric.MetricDataType, labelKey string) bool { return true } -func getBoundaryPdata(metricType pmetric.MetricDataType, labels labels.Labels) (float64, error) { +func getBoundary(metricType pmetric.MetricDataType, labels labels.Labels) (float64, error) { labelName := "" switch metricType { case pmetric.MetricDataTypeHistogram: @@ -59,7 +59,7 @@ func getBoundaryPdata(metricType pmetric.MetricDataType, labels labels.Labels) ( return strconv.ParseFloat(v, 64) } -func convToPdataMetricType(metricType textparse.MetricType) pmetric.MetricDataType { +func convToMetricType(metricType textparse.MetricType) pmetric.MetricDataType { switch metricType { case textparse.MetricTypeCounter: // always use float64, as it's the internal data type used in prometheus @@ -81,9 +81,9 @@ func convToPdataMetricType(metricType textparse.MetricType) pmetric.MetricDataTy } } -type metricBuilderPdata struct { +type metricBuilder struct { metrics pmetric.MetricSlice - families map[string]*metricFamilyPdata + families map[string]*metricFamily hasData bool hasInternalMetric bool mc MetadataCache @@ -99,14 +99,14 @@ type metricBuilderPdata struct { // newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus // scraped page by calling its AddDataPoint function, and turn them into a pmetric.Metrics object. // by calling its Build function -func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, intervalStartTimeMs int64) *metricBuilderPdata { +func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetricRegex string, logger *zap.Logger, intervalStartTimeMs int64) *metricBuilder { var regex *regexp.Regexp if startTimeMetricRegex != "" { regex, _ = regexp.Compile(startTimeMetricRegex) } - return &metricBuilderPdata{ + return &metricBuilder{ metrics: pmetric.NewMetricSlice(), - families: map[string]*metricFamilyPdata{}, + families: map[string]*metricFamily{}, mc: mc, logger: logger, numTimeseries: 0, @@ -117,7 +117,7 @@ func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeM } } -func (b *metricBuilderPdata) matchStartTimeMetric(metricName string) bool { +func (b *metricBuilder) matchStartTimeMetric(metricName string) bool { if b.startTimeMetricRegex != nil { return b.startTimeMetricRegex.MatchString(metricName) } @@ -126,7 +126,7 @@ func (b *metricBuilderPdata) matchStartTimeMetric(metricName string) bool { } // AddDataPoint is for feeding prometheus data complexValue in its processing order -func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) error { +func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error { // Any datapoint with duplicate labels MUST be rejected per: // * https://github.com/open-telemetry/wg-prometheus/issues/44 // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407 @@ -180,7 +180,7 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) if mf, ok := b.families[familyName]; ok && mf.includesMetric(metricName) { curMF = mf } else { - curMF = newMetricFamilyPdata(metricName, b.mc, b.logger) + curMF = newMetricFamily(metricName, b.mc, b.logger) b.families[curMF.name] = curMF } } @@ -190,7 +190,7 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64) // Build an pmetric.MetricSlice based on all added data complexValue. // The only error returned by this function is errNoDataToBuild. -func (b *metricBuilderPdata) Build() (*pmetric.MetricSlice, int, int, error) { +func (b *metricBuilder) Build() (*pmetric.MetricSlice, int, int, error) { if !b.hasData { if b.hasInternalMetric { metricsL := pmetric.NewMetricSlice() @@ -200,7 +200,7 @@ func (b *metricBuilderPdata) Build() (*pmetric.MetricSlice, int, int, error) { } for _, mf := range b.families { - ts, dts := mf.ToMetricPdata(&b.metrics) + ts, dts := mf.ToMetric(&b.metrics) b.numTimeseries += ts b.droppedTimeseries += dts } diff --git a/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go index 16a943a1d909..7a9a66f7b194 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metricsbuilder_test.go @@ -27,14 +27,14 @@ import ( "go.uber.org/zap" ) -func runBuilderStartTimeTestsPdata(t *testing.T, tests []buildTestDataPdata, +func runBuilderStartTimeTests(t *testing.T, tests []buildTestData, startTimeMetricRegex string, expectedBuilderStartTime float64) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mc := newMockMetadataCache(testMetadata) st := startTs for _, page := range tt.inputs { - b := newMetricBuilderPdata(mc, true, startTimeMetricRegex, zap.NewNop(), 0) + b := newMetricBuilder(mc, true, startTimeMetricRegex, zap.NewNop(), 0) b.startTime = defaultBuilderStartTime // set to a non-zero value for _, pt := range page.pts { // set ts for testing @@ -52,7 +52,7 @@ func runBuilderStartTimeTestsPdata(t *testing.T, tests []buildTestDataPdata, func Test_startTimeMetricMatch_pdata(t *testing.T) { matchBuilderStartTime := 123.456 - matchTests := []buildTestDataPdata{ + matchTests := []buildTestData{ { name: "prefix_match", inputs: []*testScrapedPage{ @@ -76,7 +76,7 @@ func Test_startTimeMetricMatch_pdata(t *testing.T) { }, }, } - nomatchTests := []buildTestDataPdata{ + nomatchTests := []buildTestData{ { name: "nomatch1", inputs: []*testScrapedPage{ @@ -101,11 +101,11 @@ func Test_startTimeMetricMatch_pdata(t *testing.T) { }, } - runBuilderStartTimeTestsPdata(t, matchTests, "^(.+_)*process_start_time_seconds$", matchBuilderStartTime) - runBuilderStartTimeTestsPdata(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime) + runBuilderStartTimeTests(t, matchTests, "^(.+_)*process_start_time_seconds$", matchBuilderStartTime) + runBuilderStartTimeTests(t, nomatchTests, "^(.+_)*process_start_time_seconds$", defaultBuilderStartTime) } -func TestGetBoundaryPdata(t *testing.T) { +func TestGetBoundary(t *testing.T) { tests := []struct { name string mtype pmetric.MetricDataType @@ -166,7 +166,7 @@ func TestGetBoundaryPdata(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - value, err := getBoundaryPdata(tt.mtype, tt.labels) + value, err := getBoundary(tt.mtype, tt.labels) if tt.wantErr != "" { require.NotNil(t, err) require.Contains(t, err.Error(), tt.wantErr) @@ -179,7 +179,7 @@ func TestGetBoundaryPdata(t *testing.T) { } } -func TestConvToPdataMetricType(t *testing.T) { +func TestConvToMetricType(t *testing.T) { tests := []struct { name string mtype textparse.MetricType @@ -225,13 +225,13 @@ func TestConvToPdataMetricType(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - got := convToPdataMetricType(tt.mtype) + got := convToMetricType(tt.mtype) require.Equal(t, got.String(), tt.want.String()) }) } } -func TestIsUsefulLabelPdata(t *testing.T) { +func TestIsUsefulLabel(t *testing.T) { tests := []struct { name string mtypes []pmetric.MetricDataType @@ -313,7 +313,7 @@ func TestIsUsefulLabelPdata(t *testing.T) { t.Run(tt.name, func(t *testing.T) { for _, mtype := range tt.mtypes { for _, labelKey := range tt.labelKeys { - got := isUsefulLabelPdata(mtype, labelKey) + got := isUsefulLabel(mtype, labelKey) assert.Equal(t, got, tt.want) } } @@ -321,14 +321,14 @@ func TestIsUsefulLabelPdata(t *testing.T) { } } -type buildTestDataPdata struct { +type buildTestData struct { name string inputs []*testScrapedPage wants func() []*pmetric.MetricSlice } func Test_OTLPMetricBuilder_counters(t *testing.T) { - tests := []buildTestDataPdata{ + tests := []buildTestData{ { name: "single-item", inputs: []*testScrapedPage{ @@ -462,10 +462,10 @@ func Test_OTLPMetricBuilder_counters(t *testing.T) { }, } - runBuilderTestsPdata(t, tests) + runBuilderTests(t, tests) } -func runBuilderTestsPdata(t *testing.T, tests []buildTestDataPdata) { +func runBuilderTests(t *testing.T, tests []buildTestData) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { wants := tt.wants() @@ -473,7 +473,7 @@ func runBuilderTestsPdata(t *testing.T, tests []buildTestDataPdata) { mc := newMockMetadataCache(testMetadata) st := startTs for i, page := range tt.inputs { - b := newMetricBuilderPdata(mc, true, "", zap.NewNop(), startTs) + b := newMetricBuilder(mc, true, "", zap.NewNop(), startTs) b.startTime = defaultBuilderStartTime // set to a non-zero value b.intervalStartTimeMs = startTs for _, pt := range page.pts { @@ -483,14 +483,14 @@ func runBuilderTestsPdata(t *testing.T, tests []buildTestDataPdata) { } metrics, _, _, err := b.Build() assert.NoError(t, err) - assertEquivalentMetricsPdata(t, wants[i], metrics) + assertEquivalentMetrics(t, wants[i], metrics) st += interval } }) } } -func assertEquivalentMetricsPdata(t *testing.T, want, got *pmetric.MetricSlice) { +func assertEquivalentMetrics(t *testing.T, want, got *pmetric.MetricSlice) { if !assert.Equal(t, want.Len(), got.Len()) { return } @@ -513,7 +513,7 @@ var ( ) func Test_OTLPMetricBuilder_gauges(t *testing.T) { - tests := []buildTestDataPdata{ + tests := []buildTestData{ { name: "one-gauge", inputs: []*testScrapedPage{ @@ -636,11 +636,11 @@ func Test_OTLPMetricBuilder_gauges(t *testing.T) { }, } - runBuilderTestsPdata(t, tests) + runBuilderTests(t, tests) } func Test_OTLPMetricBuilder_untype(t *testing.T) { - tests := []buildTestDataPdata{ + tests := []buildTestData{ { name: "one-unknown", inputs: []*testScrapedPage{ @@ -729,11 +729,11 @@ func Test_OTLPMetricBuilder_untype(t *testing.T) { }, } - runBuilderTestsPdata(t, tests) + runBuilderTests(t, tests) } func Test_OTLPMetricBuilder_histogram(t *testing.T) { - tests := []buildTestDataPdata{ + tests := []buildTestData{ { name: "single item", inputs: []*testScrapedPage{ @@ -1031,11 +1031,11 @@ func Test_OTLPMetricBuilder_histogram(t *testing.T) { }, } - runBuilderTestsPdata(t, tests) + runBuilderTests(t, tests) } func Test_OTLPMetricBuilder_summary(t *testing.T) { - tests := []buildTestDataPdata{ + tests := []buildTestData{ { name: "no-sum-and-count", inputs: []*testScrapedPage{ @@ -1172,13 +1172,13 @@ func Test_OTLPMetricBuilder_summary(t *testing.T) { }, } - runBuilderTestsPdata(t, tests) + runBuilderTests(t, tests) } // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestOTLPMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) { mc := newMockMetadataCache(testMetadata) - mb := newMetricBuilderPdata(mc, true, "", zap.NewNop(), 0) + mb := newMetricBuilder(mc, true, "", zap.NewNop(), 0) dupLabels := labels.Labels{ {Name: "__name__", Value: "test"}, @@ -1198,7 +1198,7 @@ func TestOTLPMetricBuilderDuplicateLabelKeysAreRejected(t *testing.T) { func Test_OTLPMetricBuilder_baddata(t *testing.T) { t.Run("empty-metric-name", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilderPdata(mc, true, "", zap.NewNop(), 0) + b := newMetricBuilder(mc, true, "", zap.NewNop(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(labels.FromStrings("a", "b"), startTs, 123); err != errMetricNameNotFound { t.Error("expecting errMetricNameNotFound error, but get nil") @@ -1212,7 +1212,7 @@ func Test_OTLPMetricBuilder_baddata(t *testing.T) { t.Run("histogram-datapoint-no-bucket-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilderPdata(mc, true, "", zap.NewNop(), 0) + b := newMetricBuilder(mc, true, "", zap.NewNop(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("hist_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") @@ -1221,7 +1221,7 @@ func Test_OTLPMetricBuilder_baddata(t *testing.T) { t.Run("summary-datapoint-no-quantile-label", func(t *testing.T) { mc := newMockMetadataCache(testMetadata) - b := newMetricBuilderPdata(mc, true, "", zap.NewNop(), 0) + b := newMetricBuilder(mc, true, "", zap.NewNop(), 0) b.startTime = 1.0 // set to a non-zero value if err := b.AddDataPoint(createLabels("summary_test", "k", "v"), startTs, 123); err != errEmptyBoundaryLabel { t.Error("expecting errEmptyBoundaryLabel error, but get nil") diff --git a/receiver/prometheusreceiver/internal/otlp_transaction.go b/receiver/prometheusreceiver/internal/otlp_transaction.go index 833f20571540..9810f44125b9 100644 --- a/receiver/prometheusreceiver/internal/otlp_transaction.go +++ b/receiver/prometheusreceiver/internal/otlp_transaction.go @@ -33,7 +33,7 @@ import ( "go.uber.org/zap" ) -type transactionPdata struct { +type transaction struct { id int64 isNew bool ctx context.Context @@ -44,15 +44,15 @@ type transactionPdata struct { nodeResource *pcommon.Resource logger *zap.Logger receiverID config.ComponentID - metricBuilder *metricBuilderPdata + metricBuilder *metricBuilder job, instance string - jobsMap *JobsMapPdata + jobsMap *JobsMap obsrecv *obsreport.Receiver startTimeMs int64 } type txConfig struct { - jobsMap *JobsMapPdata + jobsMap *JobsMap useStartTimeMetric bool startTimeMetricRegex string receiverID config.ComponentID @@ -61,8 +61,8 @@ type txConfig struct { settings component.ReceiverCreateSettings } -func newTransactionPdata(ctx context.Context, txc *txConfig) *transactionPdata { - return &transactionPdata{ +func newTransaction(ctx context.Context, txc *txConfig) *transaction { + return &transaction{ id: atomic.AddInt64(&idSeq, 1), ctx: ctx, isNew: true, @@ -78,7 +78,7 @@ func newTransactionPdata(ctx context.Context, txc *txConfig) *transactionPdata { } // Append always returns 0 to disable label caching. -func (t *transactionPdata) Append(ref storage.SeriesRef, labels labels.Labels, atMs int64, value float64) (pointCount storage.SeriesRef, err error) { +func (t *transaction) Append(ref storage.SeriesRef, labels labels.Labels, atMs int64, value float64) (pointCount storage.SeriesRef, err error) { select { case <-t.ctx.Done(): return 0, errTransactionAborted @@ -99,11 +99,11 @@ func (t *transactionPdata) Append(ref storage.SeriesRef, labels labels.Labels, a return 0, t.metricBuilder.AddDataPoint(labels, atMs, value) } -func (t *transactionPdata) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { +func (t *transaction) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { return 0, nil } -func (t *transactionPdata) initTransaction(labels labels.Labels) error { +func (t *transaction) initTransaction(labels labels.Labels) error { metadataCache, err := getMetadataCache(t.ctx) if err != nil { return err @@ -117,13 +117,13 @@ func (t *transactionPdata) initTransaction(labels labels.Labels) error { t.job = job t.instance = instance } - t.nodeResource = CreateNodeAndResourcePdata(job, instance, metadataCache.SharedLabels()) - t.metricBuilder = newMetricBuilderPdata(metadataCache, t.useStartTimeMetric, t.startTimeMetricRegex, t.logger, t.startTimeMs) + t.nodeResource = CreateNodeAndResource(job, instance, metadataCache.SharedLabels()) + t.metricBuilder = newMetricBuilder(metadataCache, t.useStartTimeMetric, t.startTimeMetricRegex, t.logger, t.startTimeMs) t.isNew = false return nil } -func (t *transactionPdata) Commit() error { +func (t *transaction) Commit() error { if t.isNew { return nil } @@ -144,10 +144,10 @@ func (t *transactionPdata) Commit() error { return err } // Otherwise adjust the startTimestamp for all the metrics. - t.adjustStartTimestampPdata(metricsL) + t.adjustStartTimestamp(metricsL) } else { // TODO: Derive numPoints in this case. - _ = NewMetricsAdjusterPdata(t.jobsMap.get(t.job, t.instance), t.logger).AdjustMetricSlice(metricsL) + _ = NewMetricsAdjuster(t.jobsMap.get(t.job, t.instance), t.logger).AdjustMetricSlice(metricsL) } if metricsL.Len() > 0 { @@ -159,7 +159,7 @@ func (t *transactionPdata) Commit() error { return nil } -func (t *transactionPdata) Rollback() error { +func (t *transaction) Rollback() error { t.startTimeMs = -1 return nil } @@ -170,7 +170,7 @@ func pdataTimestampFromFloat64(ts float64) pcommon.Timestamp { return pcommon.NewTimestampFromTime(time.Unix(secs, nanos)) } -func (t transactionPdata) adjustStartTimestampPdata(metricsL *pmetric.MetricSlice) { +func (t transaction) adjustStartTimestamp(metricsL *pmetric.MetricSlice) { startTimeTs := pdataTimestampFromFloat64(t.metricBuilder.startTime) for i := 0; i < metricsL.Len(); i++ { metric := metricsL.At(i) @@ -205,7 +205,7 @@ func (t transactionPdata) adjustStartTimestampPdata(metricsL *pmetric.MetricSlic } } -func (t *transactionPdata) metricSliceToMetrics(metricsL *pmetric.MetricSlice) *pmetric.Metrics { +func (t *transaction) metricSliceToMetrics(metricsL *pmetric.MetricSlice) *pmetric.Metrics { metrics := pmetric.NewMetrics() rms := metrics.ResourceMetrics().AppendEmpty() ilm := rms.ScopeMetrics().AppendEmpty() diff --git a/receiver/prometheusreceiver/internal/otlp_transaction_test.go b/receiver/prometheusreceiver/internal/otlp_transaction_test.go index 6a7f8b755371..8d106d22c48f 100644 --- a/receiver/prometheusreceiver/internal/otlp_transaction_test.go +++ b/receiver/prometheusreceiver/internal/otlp_transaction_test.go @@ -60,7 +60,7 @@ func Test_transaction_pdata(t *testing.T) { t.Run("Commit Without Adding", func(t *testing.T) { nomc := consumertest.NewNop() - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) if got := tr.Commit(); got != nil { t.Errorf("expecting nil from Commit() but got err %v", got) } @@ -68,7 +68,7 @@ func Test_transaction_pdata(t *testing.T) { t.Run("Rollback does nothing", func(t *testing.T) { nomc := consumertest.NewNop() - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) if got := tr.Rollback(); got != nil { t.Errorf("expecting nil from Rollback() but got err %v", got) } @@ -77,7 +77,7 @@ func Test_transaction_pdata(t *testing.T) { badLabels := labels.Labels([]labels.Label{{Name: "foo", Value: "bar"}}) t.Run("Add One No Target", func(t *testing.T) { nomc := consumertest.NewNop() - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) if _, got := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -89,7 +89,7 @@ func Test_transaction_pdata(t *testing.T) { {Name: "foo", Value: "bar"}}) t.Run("Add One Job not found", func(t *testing.T) { nomc := consumertest.NewNop() - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, nomc, nil, componenttest.NewNopReceiverCreateSettings()}) if _, got := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0); got == nil { t.Errorf("expecting error from Add() but got nil") } @@ -100,7 +100,7 @@ func Test_transaction_pdata(t *testing.T) { {Name: "__name__", Value: "foo"}}) t.Run("Add One Good", func(t *testing.T) { sink := new(consumertest.MetricsSink) - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, sink, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, sink, nil, componenttest.NewNopReceiverCreateSettings()}) if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } @@ -109,7 +109,7 @@ func Test_transaction_pdata(t *testing.T) { t.Errorf("expecting nil from Commit() but got err %v", got) } l := []labels.Label{{Name: "__scheme__", Value: "http"}} - expectedNodeResource := CreateNodeAndResourcePdata("test", "localhost:8080", l) + expectedNodeResource := CreateNodeAndResource("test", "localhost:8080", l) mds := sink.AllMetrics() if len(mds) != 1 { t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics()) @@ -122,7 +122,7 @@ func Test_transaction_pdata(t *testing.T) { t.Run("Error when start time is zero", func(t *testing.T) { sink := new(consumertest.MetricsSink) - tr := newTransactionPdata(scrapeCtx, &txConfig{nil, true, "", rID, sink, nil, componenttest.NewNopReceiverCreateSettings()}) + tr := newTransaction(scrapeCtx, &txConfig{nil, true, "", rID, sink, nil, componenttest.NewNopReceiverCreateSettings()}) if _, got := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0); got != nil { t.Errorf("expecting error == nil from Add() but got: %v\n", got) } diff --git a/receiver/prometheusreceiver/internal/prom_to_otlp.go b/receiver/prometheusreceiver/internal/prom_to_otlp.go index dcdf46471267..d41e260c3473 100644 --- a/receiver/prometheusreceiver/internal/prom_to_otlp.go +++ b/receiver/prometheusreceiver/internal/prom_to_otlp.go @@ -42,8 +42,8 @@ func isDiscernibleHost(host string) bool { return true } -// CreateNodeAndResourcePdata creates the resource data added to OTLP payloads. -func CreateNodeAndResourcePdata(job, instance string, serviceDiscoveryLabels labels.Labels) *pcommon.Resource { +// CreateNodeAndResource creates the resource data added to OTLP payloads. +func CreateNodeAndResource(job, instance string, serviceDiscoveryLabels labels.Labels) *pcommon.Resource { host, port, err := net.SplitHostPort(instance) if err != nil { host = instance diff --git a/receiver/prometheusreceiver/internal/prom_to_otlp_test.go b/receiver/prometheusreceiver/internal/prom_to_otlp_test.go index 00c31363476a..6c265fd34322 100644 --- a/receiver/prometheusreceiver/internal/prom_to_otlp_test.go +++ b/receiver/prometheusreceiver/internal/prom_to_otlp_test.go @@ -282,7 +282,7 @@ func TestCreateNodeAndResourcePromToOTLP(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - got := CreateNodeAndResourcePdata(tt.job, tt.instance, tt.sdLabels) + got := CreateNodeAndResource(tt.job, tt.instance, tt.sdLabels) got.Attributes().Sort() tt.want.Attributes().Sort() require.Equal(t, got, tt.want) diff --git a/receiver/prometheusreceiver/metrics_receiver_helper_test.go b/receiver/prometheusreceiver/metrics_receiver_helper_test.go index 22eacd59b406..d87f157975b2 100644 --- a/receiver/prometheusreceiver/metrics_receiver_helper_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_helper_test.go @@ -155,7 +155,7 @@ func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, er // update attributes value (will use for validation) l := []labels.Label{{Name: "__scheme__", Value: "http"}} for _, t := range tds { - t.attributes = internal.CreateNodeAndResourcePdata(t.name, u.Host, l).Attributes() + t.attributes = internal.CreateNodeAndResource(t.name, u.Host, l).Attributes() } pCfg, err := promcfg.Load(string(cfg), false, gokitlog.NewNopLogger()) return mp, pCfg, err From 23d22f8c92128b7514456012ed4f747880121460 Mon Sep 17 00:00:00 2001 From: Alex Boten Date: Fri, 6 May 2022 08:12:35 -0700 Subject: [PATCH 2/2] Update receiver/prometheusreceiver/internal/otlp_metricfamily.go Co-authored-by: David Ashpole --- receiver/prometheusreceiver/internal/otlp_metricfamily.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusreceiver/internal/otlp_metricfamily.go b/receiver/prometheusreceiver/internal/otlp_metricfamily.go index f23b617570a4..c741928a47c8 100644 --- a/receiver/prometheusreceiver/internal/otlp_metricfamily.go +++ b/receiver/prometheusreceiver/internal/otlp_metricfamily.go @@ -127,7 +127,7 @@ func (mg *metricGroup) toDistributionPoint(orderedLabelKeys []string, dest *pmet mg.sortPoints() // for OCAgent Proto, the bounds won't include +inf - // TODO: (@odeke-em) should we also check OpenTelemetry for bucket bounds? + // TODO: (@odeke-em) should we also check OpenTelemetry Pdata for bucket bounds? bounds := make([]float64, len(mg.complexValue)-1) bucketCounts := make([]uint64, len(mg.complexValue))