Skip to content

Commit

Permalink
[prometheusremotewriteexporter] reduce allocations in createAttributes (
Browse files Browse the repository at this point in the history
#57)

createAttributes was allocating a new label slice for every series, which generates mucho garbage (~30-40% of all allocations). Keep around a re-usable underlying array of labels to reduce allocations on the hot path.
  • Loading branch information
Eugene Ma authored and edma2 committed Sep 13, 2024
1 parent 0e2bea5 commit 6a5d2ca
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 69 deletions.
5 changes: 4 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type prwExporter struct {
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
converter prometheusremotewrite.PrometheusConverter
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -126,6 +127,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
converter: *prometheusremotewrite.NewPrometheusConverter(),
}

prwe.wal = newWAL(cfg.WAL, prwe.export)
Expand Down Expand Up @@ -173,7 +175,8 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
return errors.New("shutdown has been called")
default:

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
tsMap, err := prwe.converter.FromMetrics(md, prwe.exporterSettings)
defer prwe.converter.Reset()
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
Expand Down
67 changes: 31 additions & 36 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"slices"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -96,46 +97,40 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)

// Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2

if haveServiceName {
maxLabelCount++
}

if haveInstanceID {
maxLabelCount++
}

// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)
l := c.labelsMap
clear(l)

// store duplicate labels separately in a throwaway map
// assuming this is the less common case
collisions := make(map[string][]string)

// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
// XXX: Should we always drop service namespace/service name/service instance ID from the labels
// (as they get mapped to other Prometheus labels)?
attributes.Range(func(key string, value pcommon.Value) bool {
if !slices.Contains(ignoreAttrs, key) {
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
var finalKey = prometheustranslator.NormalizeLabel(key)
if _, alreadyExists := l[finalKey]; alreadyExists {
collisions[finalKey] = append(collisions[finalKey], value.AsString())
} else {
l[finalKey] = value.AsString()
}
}
return true
})
sort.Stable(ByLabelName(labels))

for _, label := range labels {
var finalKey = prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
l[finalKey] = existingValue + ";" + label.Value
} else {
l[finalKey] = label.Value
}
for key, values := range collisions {
values = append(values, l[key])
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
sort.Strings(values)
l[key] = strings.Join(values, ";")
}

// Map service.name + service.namespace to job
Expand Down Expand Up @@ -175,12 +170,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
l[name] = extras[i+1]
}

labels = labels[:0]
startIndex := len(c.labels)
for k, v := range l {
labels = append(labels, prompb.Label{Name: k, Value: v})
c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}

return labels
return c.labels[startIndex:]
}

// isValidAggregationTemporality checks whether an OTel metric has a valid
Expand All @@ -200,12 +195,12 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
return false
}

func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// If the sum is unset, it indicates the _sum metric point should be
// omitted
Expand Down Expand Up @@ -383,12 +378,12 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp {
return b
}

func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
settings Settings, baseName string) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Expand Down Expand Up @@ -456,7 +451,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr

// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false.
// Otherwise it creates a new one and returns that, and true.
func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
h := timeSeriesSignature(lbls)
ts := c.unique[h]
if ts != nil {
Expand Down Expand Up @@ -492,7 +487,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp
// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist.
// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp,
// both converted to milliseconds.
func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
ts, created := c.getOrCreateTimeSeries(lbls)
if created {
ts.Samples = []prompb.Sample{
Expand All @@ -506,7 +501,7 @@ func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi
}

// addResourceTargetInfo converts the resource to the target info metric.
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) {
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) {
if settings.DisableTargetInfo || timestamp == 0 {
return
}
Expand Down Expand Up @@ -534,7 +529,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}

labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
labels := converter.createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
Expand Down
24 changes: 15 additions & 9 deletions pkg/translator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestPrometheusConverter_addSample(t *testing.T) {
}

t.Run("empty_case", func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
converter.addSample(nil, nil)
assert.Empty(t, converter.unique)
assert.Empty(t, converter.conflicts)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestPrometheusConverter_addSample(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
converter.addSample(&tt.testCase[0].sample, tt.testCase[0].labels)
converter.addSample(&tt.testCase[1].sample, tt.testCase[1].labels)
assert.Exactly(t, tt.want, converter.unique)
Expand Down Expand Up @@ -359,8 +359,9 @@ func Test_createLabelSet(t *testing.T) {
}
// run tests
for _, tt := range tests {
c := NewPrometheusConverter()
t.Run(tt.name, func(t *testing.T) {
assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...))
assert.ElementsMatch(t, tt.want, c.createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...))
})
}
}
Expand All @@ -375,10 +376,15 @@ func BenchmarkCreateAttributes(b *testing.B) {
m.PutInt("test-int-key", 123)
m.PutBool("test-bool-key", true)

c := NewPrometheusConverter()
// preallocate slice to simulate a fully-grown buffer
c.labels = make([]prompb.Label, 0, b.N*m.Len())

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
createAttributes(r, m, ext, nil, true)
c.createAttributes(r, m, ext, nil, true)
}
}

Expand Down Expand Up @@ -439,7 +445,7 @@ func TestPrometheusConverter_addExemplars(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
converter := &prometheusConverter{
converter := &PrometheusConverter{
unique: tt.orig,
}
converter.addExemplars(tt.dataPoint, tt.bucketBounds)
Expand Down Expand Up @@ -620,7 +626,7 @@ func TestAddResourceTargetInfo(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, converter)

Expand Down Expand Up @@ -765,7 +771,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

converter.addSummaryDataPoints(
metric.Summary().DataPoints(),
Expand Down Expand Up @@ -875,7 +881,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()
converter := newPrometheusConverter()
converter := NewPrometheusConverter()

converter.addHistogramDataPoints(
metric.Histogram().DataPoints(),
Expand All @@ -893,7 +899,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
}

func TestPrometheusConverter_getOrCreateTimeSeries(t *testing.T) {
converter := newPrometheusConverter()
converter := NewPrometheusConverter()
lbls := []prompb.Label{
{
Name: "key1",
Expand Down
4 changes: 2 additions & 2 deletions pkg/translator/prometheusremotewrite/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (

const defaultZeroThreshold = 1e-128

func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string) error {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
lbls := createAttributes(
lbls := c.createAttributes(
resource,
pt.Attributes(),
settings.ExternalLabels,
Expand Down
2 changes: 1 addition & 1 deletion pkg/translator/prometheusremotewrite/histograms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
metric := tt.metric()

converter := newPrometheusConverter()
converter := NewPrometheusConverter()
require.NoError(t, converter.addExponentialHistogramDataPoints(
metric.ExponentialHistogram().DataPoints(),
pcommon.NewResource(),
Expand Down
35 changes: 25 additions & 10 deletions pkg/translator/prometheusremotewrite/metrics_to_prw.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ type Settings struct {
}

// FromMetrics converts pmetric.Metrics to Prometheus remote write format.
func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) {
c := newPrometheusConverter()
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) {
errs := c.fromMetrics(md, settings)
tss := c.timeSeries()
out := make(map[string]*prompb.TimeSeries, len(tss))
Expand All @@ -39,21 +38,37 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.Time
return out, errs
}

// prometheusConverter converts from OTel write format to Prometheus write format.
type prometheusConverter struct {
// PrometheusConverter converts from OTel write format to Prometheus write format.
// Internally it keeps a buffer of labels to avoid expensive allocations, so it is
// best to keep it around for the lifetime of the Go process. Due to this shared
// state, PrometheusConverter is NOT thread-safe and is only intended to be used by
// a single go-routine at a time.
// Each FromMetrics call should be followed by a Reset when the metrics can be safely
// discarded.
type PrometheusConverter struct {
unique map[uint64]*prompb.TimeSeries
conflicts map[uint64][]*prompb.TimeSeries
labels []prompb.Label
labelsMap map[string]string
}

func newPrometheusConverter() *prometheusConverter {
return &prometheusConverter{
func NewPrometheusConverter() *PrometheusConverter {
return &PrometheusConverter{
unique: map[uint64]*prompb.TimeSeries{},
conflicts: map[uint64][]*prompb.TimeSeries{},
labelsMap: make(map[string]string),
}
}

func (c *PrometheusConverter) Reset() {
clear(c.labels)
c.labels = c.labels[:0]
clear(c.unique)
clear(c.conflicts)
}

// fromMetrics converts pmetric.Metrics to Prometheus remote write format.
func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
func (c *PrometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) {
resourceMetricsSlice := md.ResourceMetrics()
for i := 0; i < resourceMetricsSlice.Len(); i++ {
resourceMetrics := resourceMetricsSlice.At(i)
Expand Down Expand Up @@ -132,7 +147,7 @@ func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings)
}

// timeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format.
func (c *prometheusConverter) timeSeries() []prompb.TimeSeries {
func (c *PrometheusConverter) timeSeries() []prompb.TimeSeries {
conflicts := 0
for _, ts := range c.conflicts {
conflicts += len(ts)
Expand Down Expand Up @@ -164,7 +179,7 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool {

// addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value,
// the exemplar is added to the bucket bound's time series, provided that the time series' has samples.
func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) {
func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) {
if len(bucketBounds) == 0 {
return
}
Expand All @@ -189,7 +204,7 @@ func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint,
// If there is no corresponding TimeSeries already, it's created.
// The corresponding TimeSeries is returned.
// If either lbls is nil/empty or sample is nil, nothing is done.
func (c *prometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries {
func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries {
if sample == nil || len(lbls) == 0 {
// This shouldn't happen
return nil
Expand Down
Loading

0 comments on commit 6a5d2ca

Please sign in to comment.