Skip to content

Commit

Permalink
move test to a separate file due to unrelated race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
edma2 committed Nov 20, 2024
1 parent 31186a8 commit f184393
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 109 deletions.
153 changes: 153 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//go:build !race
// +build !race

// note: this test doesn't pass currently due to a race condition in batchTimeSeries
// WARNING: DATA RACE
// Write at 0x00c0001e9550 by goroutine 34:
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.batchTimeSeries()
// helper.go:92 +0xf8b
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).handleExport()
// exporter.go:240 +0xe4
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).PushMetrics()
// exporter.go:217 +0x70f
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.Test_PushMetricsConcurrent.func3()
// exporter_test.go:905 +0x78

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}
109 changes: 0 additions & 109 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,115 +802,6 @@ func Test_PushMetrics(t *testing.T) {
}
}

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}

func Test_validateAndSanitizeExternalLabels(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit f184393

Please sign in to comment.