Skip to content

Commit

Permalink
Fix timestamp handling in remote_write (#21166)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored Sep 23, 2020
1 parent 77503cf commit d2a8922
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Disable Kafka metricsets based on Jolokia by default. They require a different configuration. {pull}20989[20989]
- Fix panic index out of range error when getting AWS account name. {pull}21101[21101] {issue}21095[21095]
- Handle missing counters in the application_pool metricset. {pull}21071[21071]
- Fix timestamp handling in remote_write. {pull}21166[21166]
- Fix remote_write flaky test. {pull}21173[21173]
- Visualization title fixes in aws, azure and googlecloud compute dashboards. {pull}21098[21098]

Expand Down
6 changes: 3 additions & 3 deletions metricbeat/module/prometheus/remote_write/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@ func (p *remoteWriteEventGenerator) GenerateEvents(metrics model.Samples) map[st
labels[string(k)] = v
}

// join metrics with same labels in a single event
labelsHash := labels.String()
// join metrics with same labels and same timestamp in a single event
labelsHash := labels.String() + metric.Timestamp.Time().String()
if _, ok := eventList[labelsHash]; !ok {
eventList[labelsHash] = mb.Event{
ModuleFields: common.MapStr{
"metrics": common.MapStr{},
},
Timestamp: metric.Timestamp.Time(),
}

// Add labels
Expand All @@ -74,7 +75,6 @@ func (p *remoteWriteEventGenerator) GenerateEvents(metrics model.Samples) map[st

// Not checking anything here because we create these maps some lines before
e := eventList[labelsHash]
e.Timestamp = metric.Timestamp.Time()
data := common.MapStr{
name: val,
}
Expand Down
80 changes: 80 additions & 0 deletions metricbeat/module/prometheus/remote_write/remote_write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package remote_write

import (
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
)

// TestGenerateEventsCounter tests counter simple cases
func TestGenerateEventsCounter(t *testing.T) {
g := remoteWriteEventGenerator{}

timestamp := model.Time(424242)
timestamp1 := model.Time(424243)
labels := common.MapStr{
"listener_name": model.LabelValue("http"),
}

// first fetch
metrics := model.Samples{
&model.Sample{
Metric: map[model.LabelName]model.LabelValue{
"__name__": "net_conntrack_listener_conn_closed_total",
"listener_name": "http",
},
Value: model.SampleValue(42),
Timestamp: timestamp,
},
&model.Sample{
Metric: map[model.LabelName]model.LabelValue{
"__name__": "net_conntrack_listener_conn_closed_total",
"listener_name": "http",
},
Value: model.SampleValue(43),
Timestamp: timestamp1,
},
}
events := g.GenerateEvents(metrics)

expected := common.MapStr{
"metrics": common.MapStr{
"net_conntrack_listener_conn_closed_total": float64(42),
},
"labels": labels,
}
expected1 := common.MapStr{
"metrics": common.MapStr{
"net_conntrack_listener_conn_closed_total": float64(43),
},
"labels": labels,
}

assert.Equal(t, len(events), 2)
e := events[labels.String()+timestamp.Time().String()]
assert.EqualValues(t, e.ModuleFields, expected)
assert.EqualValues(t, e.Timestamp, timestamp.Time())
e = events[labels.String()+timestamp1.Time().String()]
assert.EqualValues(t, e.ModuleFields, expected1)
assert.EqualValues(t, e.Timestamp, timestamp1.Time())
}
10 changes: 5 additions & 5 deletions x-pack/metricbeat/module/prometheus/remote_write/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,17 @@ func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[str

promType := g.findMetricType(name, labels)

labelsHash := labels.String()
labelsHash := labels.String() + metric.Timestamp.Time().String()
labelsClone := labels.Clone()
labelsClone.Delete("le")
if promType == histogramType {
labelsHash = labelsClone.String()
labelsHash = labelsClone.String() + metric.Timestamp.Time().String()
}
// join metrics with same labels in a single event
if _, ok := eventList[labelsHash]; !ok {
eventList[labelsHash] = mb.Event{
ModuleFields: common.MapStr{},
Timestamp: metric.Timestamp.Time(),
}

// Add labels
Expand All @@ -145,7 +146,6 @@ func (g remoteWriteTypedGenerator) GenerateEvents(metrics model.Samples) map[str
}

e := eventList[labelsHash]
e.Timestamp = metric.Timestamp.Time()
switch promType {
case counterType:
data = common.MapStr{
Expand Down Expand Up @@ -220,10 +220,11 @@ func (g *remoteWriteTypedGenerator) rateCounterFloat64(name string, labels commo
// processPromHistograms receives a group of Histograms and converts each one to ES histogram
func (g *remoteWriteTypedGenerator) processPromHistograms(eventList map[string]mb.Event, histograms map[string]histogram) {
for _, histogram := range histograms {
labelsHash := histogram.labels.String()
labelsHash := histogram.labels.String() + histogram.timestamp.String()
if _, ok := eventList[labelsHash]; !ok {
eventList[labelsHash] = mb.Event{
ModuleFields: common.MapStr{},
Timestamp: histogram.timestamp,
}

// Add labels
Expand All @@ -233,7 +234,6 @@ func (g *remoteWriteTypedGenerator) processPromHistograms(eventList map[string]m
}

e := eventList[labelsHash]
e.Timestamp = histogram.timestamp

hist := dto.Histogram{
Bucket: histogram.buckets,
Expand Down
Loading

0 comments on commit d2a8922

Please sign in to comment.