Skip to content

Commit

Permalink
Remove data race by removing the state altogether
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Nov 28, 2024
1 parent a432a97 commit c311965
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 58 deletions.
4 changes: 1 addition & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type prwExporter struct {
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -140,7 +139,6 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
}

if prwe.exporterSettings.ExportCreatedMetric {
Expand Down Expand Up @@ -229,7 +227,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
52 changes: 16 additions & 36 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,37 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent

import (
"errors"
"math"
"sort"

"github.com/prometheus/prometheus/prompb"
)

type batchTimeSeriesState struct {
// Track batch sizes sent to avoid over allocating huge buffers.
// This helps in the case where large batches are sent to avoid allocating too much unused memory
nextTimeSeriesBufferSize int
nextMetricMetadataBufferSize int
nextRequestBufferSize int
}

func newBatchTimeSericesState() batchTimeSeriesState {
return batchTimeSeriesState{
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
}
}

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) {
// batchTimeSeries splits series into multiple write requests if they exceed the maxBatchByteSize.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

// Allocate a buffer size of at least 10, or twice the last # of requests we sent
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))
// Allocate a buffer size of at least 10.
requests := make([]*prompb.WriteRequest, 0, 10)

// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
// Allocate a time series buffer with the length of the input.
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()
for _, series := range tsMap {
sizeOfSeries := series.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
sizeOfCurrentBatch = 0
}

tsArray = append(tsArray, *v)
tsArray = append(tsArray, *series)
sizeOfCurrentBatch += sizeOfSeries
i++
}
Expand All @@ -63,23 +45,22 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
// Allocate a metric metadata with the length of the input.
mArray := make([]prompb.MetricMetadata, 0, len(m))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()
for _, metadata := range m {
sizeOfM := metadata.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
sizeOfCurrentBatch = 0
}

mArray = append(mArray, *v)
mArray = append(mArray, *metadata)
sizeOfCurrentBatch += sizeOfM
i++
}
Expand All @@ -89,7 +70,6 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

state.nextRequestBufferSize = 2 * len(requests)
return requests, nil
}

Expand Down
22 changes: 3 additions & 19 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package prometheusremotewriteexporter

import (
"math"
"testing"

"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -58,24 +57,14 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state)
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
if tt.returnErr {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Len(t, requests, tt.numExpectedRequests)
if tt.numExpectedRequests <= 1 {
assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
} else {
assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
}
})
}
}
Expand All @@ -96,14 +85,10 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) {

tsMap1 := getTimeseriesMap(tsArray)

state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil)

assert.NoError(t, err)
assert.Len(t, requests, 18)
assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 36, state.nextRequestBufferSize)
}

// Benchmark_batchTimeSeries checks batchTimeSeries
Expand All @@ -129,10 +114,9 @@ func Benchmark_batchTimeSeries(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

state := newBatchTimeSericesState()
// Run batchTimeSeries 100 times with a 1mb max request size
for i := 0; i < b.N; i++ {
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil)
assert.NoError(b, err)
assert.Len(b, requests, 18)
}
Expand Down

0 comments on commit c311965

Please sign in to comment.