diff --git a/exporter/carbonexporter/exporter_test.go b/exporter/carbonexporter/exporter_test.go index b97b5bf88229..e283b4daa0fa 100644 --- a/exporter/carbonexporter/exporter_test.go +++ b/exporter/carbonexporter/exporter_test.go @@ -23,7 +23,6 @@ import ( "net" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -35,6 +34,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/atomic" "google.golang.org/protobuf/types/known/timestamppb" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -226,17 +226,17 @@ func Test_connPool_Concurrency(t *testing.T) { concurrentWriters := 3 writesPerRoutine := 3 - var doneFlag int64 - defer func(flag *int64) { - atomic.StoreInt64(flag, 1) - }(&doneFlag) + doneFlag := atomic.NewBool(false) + defer func() { + doneFlag.Store(true) + }() var recvWG sync.WaitGroup recvWG.Add(concurrentWriters * writesPerRoutine * md.MetricCount()) go func() { for { conn, err := ln.AcceptTCP() - if atomic.LoadInt64(&doneFlag) != 0 { + if doneFlag.Load() { // Close is expected to cause error. return } diff --git a/exporter/carbonexporter/go.mod b/exporter/carbonexporter/go.mod index cd3b675838c9..1bf41ff61c4e 100644 --- a/exporter/carbonexporter/go.mod +++ b/exporter/carbonexporter/go.mod @@ -10,6 +10,7 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 google.golang.org/protobuf v1.28.0 ) @@ -31,7 +32,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index c7a183b218ab..6e5fabd65809 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -22,12 +22,12 @@ import ( "net/http" "os" "sync" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -181,16 +181,16 @@ func TestExporter_PushEvent(t *testing.T) { }), } - handlers := map[string]func(*int64) bulkHandler{ - "fail http request": func(attempts *int64) bulkHandler { + handlers := map[string]func(attempts *atomic.Int64) bulkHandler{ + "fail http request": func(attempts *atomic.Int64) bulkHandler { return func([]itemRequest) ([]itemResponse, error) { - atomic.AddInt64(attempts, 1) + attempts.Inc() return nil, &httpTestError{message: "oops"} } }, - "fail item": func(attempts *int64) bulkHandler { + "fail item": func(attempts *atomic.Int64) bulkHandler { return func(docs []itemRequest) ([]itemResponse, error) { - atomic.AddInt64(attempts, 1) + attempts.Inc() return itemsReportStatus(docs, http.StatusTooManyRequests) } }, @@ -202,15 +202,15 @@ func TestExporter_PushEvent(t *testing.T) { for name, configurer := range configurations { t.Run(name, func(t *testing.T) { t.Parallel() - var attempts int64 - server := newESTestServer(t, handler(&attempts)) + attempts := atomic.NewInt64(0) + server := newESTestServer(t, handler(attempts)) testConfig := configurer(server.URL) exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) mustSend(t, exporter, `{"message": "test1"}`) time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), atomic.LoadInt64(&attempts)) + assert.Equal(t, int64(1), attempts.Load()) }) } }) @@ -218,9 +218,9 @@ func TestExporter_PushEvent(t *testing.T) { }) t.Run("do not retry invalid request", func(t *testing.T) { - var attempts int64 + attempts := atomic.NewInt64(0) server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - atomic.AddInt64(&attempts, 1) + attempts.Inc() return nil, &httpTestError{message: "oops", status: http.StatusBadRequest} }) @@ -228,7 +228,7 @@ func TestExporter_PushEvent(t *testing.T) { mustSend(t, exporter, `{"message": "test1"}`) time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), atomic.LoadInt64(&attempts)) + assert.Equal(t, int64(1), attempts.Load()) }) t.Run("retry single item", func(t *testing.T) { @@ -252,9 +252,9 @@ func TestExporter_PushEvent(t *testing.T) { }) t.Run("do not retry bad item", func(t *testing.T) { - var attempts int64 + attempts := atomic.NewInt64(0) server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - atomic.AddInt64(&attempts, 1) + attempts.Inc() return itemsReportStatus(docs, http.StatusBadRequest) }) @@ -262,7 +262,7 @@ func TestExporter_PushEvent(t *testing.T) { mustSend(t, exporter, `{"message": "test1"}`) time.Sleep(200 * time.Millisecond) - assert.Equal(t, int64(1), atomic.LoadInt64(&attempts)) + assert.Equal(t, int64(1), attempts.Load()) }) t.Run("only retry failed items", func(t *testing.T) { diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index eb841359cf05..dc45cb5647e0 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -10,6 +10,7 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 @@ -33,7 +34,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 0b5971d872c5..4919a14a9e1a 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -8,6 +8,7 @@ require ( go.opencensus.io v0.23.0 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 ) @@ -35,7 +36,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect diff --git a/exporter/loadbalancingexporter/log_exporter_test.go b/exporter/loadbalancingexporter/log_exporter_test.go index 4f9171a12fad..2e1d7ce540b7 100644 --- a/exporter/loadbalancingexporter/log_exporter_test.go +++ b/exporter/loadbalancingexporter/log_exporter_test.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "net" - "sync/atomic" "testing" "time" @@ -33,6 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -388,17 +388,18 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { lb.res = res p.loadBalancer = lb - var counter1, counter2 int64 + counter1 := atomic.NewInt64(0) + counter2 := atomic.NewInt64(0) defaultExporters := map[string]component.Exporter{ "127.0.0.1": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { - atomic.AddInt64(&counter1, 1) + counter1.Inc() // simulate an unreachable backend time.Sleep(10 * time.Second) return nil }, ), "127.0.0.2": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error { - atomic.AddInt64(&counter2, 1) + counter2.Inc() return nil }, ), @@ -448,8 +449,8 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) { // verify require.Equal(t, []string{"127.0.0.2"}, lastResolved) - require.Greater(t, atomic.LoadInt64(&counter1), int64(0)) - require.Greater(t, atomic.LoadInt64(&counter2), int64(0)) + require.Greater(t, counter1.Load(), int64(0)) + require.Greater(t, counter2.Load(), int64(0)) } func randomLogs() plog.Logs { diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 28fecf4815c0..324039e8602b 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -22,7 +22,6 @@ import ( "math/rand" "net" "path/filepath" - "sync/atomic" "testing" "time" @@ -37,6 +36,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/service/servicetest" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -416,17 +416,18 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { lb.res = res p.loadBalancer = lb - var counter1, counter2 int64 + counter1 := atomic.NewInt64(0) + counter2 := atomic.NewInt64(0) defaultExporters := map[string]component.Exporter{ "127.0.0.1": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { - atomic.AddInt64(&counter1, 1) + counter1.Inc() // simulate an unreachable backend time.Sleep(10 * time.Second) return nil }, ), "127.0.0.2": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error { - atomic.AddInt64(&counter2, 1) + counter2.Inc() return nil }, ), @@ -476,8 +477,8 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) { // verify require.Equal(t, []string{"127.0.0.2"}, lastResolved) - require.Greater(t, atomic.LoadInt64(&counter1), int64(0)) - require.Greater(t, atomic.LoadInt64(&counter2), int64(0)) + require.Greater(t, counter1.Load(), int64(0)) + require.Greater(t, counter2.Load(), int64(0)) } func randomTraces() ptrace.Traces { diff --git a/exporter/skywalkingexporter/go.mod b/exporter/skywalkingexporter/go.mod index 0c4f5ca8b9d7..03c709168498 100644 --- a/exporter/skywalkingexporter/go.mod +++ b/exporter/skywalkingexporter/go.mod @@ -9,6 +9,7 @@ require ( go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 google.golang.org/grpc v1.46.0 skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 ) @@ -34,7 +35,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect diff --git a/exporter/skywalkingexporter/skywalking_benchmark_test.go b/exporter/skywalkingexporter/skywalking_benchmark_test.go index 35793125d017..af18d8e0e9ab 100644 --- a/exporter/skywalkingexporter/skywalking_benchmark_test.go +++ b/exporter/skywalkingexporter/skywalking_benchmark_test.go @@ -20,7 +20,6 @@ import ( "net" "strconv" "sync" - "sync/atomic" "testing" "time" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/atomic" "google.golang.org/grpc" v3 "skywalking.apache.org/repo/goapi/collect/common/v3" logpb "skywalking.apache.org/repo/goapi/collect/logging/v3" @@ -39,7 +39,7 @@ import ( ) var ( - consumerNum int32 + consumerNum = atomic.NewInt32(0) sumNum = 10000 ) @@ -89,7 +89,7 @@ func TestSkywalking(t *testing.T) { func test(nGoroutine int, nStream int, t *testing.T) { exporter, server, m := doInit(nStream, t) - atomic.StoreInt32(&consumerNum, -int32(nStream)) + consumerNum.Store(-int32(nStream)) l := testdata.GenerateLogsOneLogRecordNoResource() l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetIntVal(0) @@ -199,8 +199,8 @@ func (h *mockLogHandler2) Collect(stream logpb.LogReportService_CollectServer) e return stream.SendAndClose(&v3.Commands{}) } if err == nil { - atomic.AddInt32(&consumerNum, 1) - if atomic.LoadInt32(&consumerNum) >= int32(sumNum) { + consumerNum.Inc() + if consumerNum.Load() >= int32(sumNum) { end := time.Now().UnixMilli() h.stopChan <- end return nil diff --git a/exporter/sumologicexporter/go.mod b/exporter/sumologicexporter/go.mod index 4a348b808bd1..6222b1820436 100644 --- a/exporter/sumologicexporter/go.mod +++ b/exporter/sumologicexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 ) @@ -34,7 +35,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect diff --git a/exporter/sumologicexporter/sender_test.go b/exporter/sumologicexporter/sender_test.go index 316f137a2f80..c2189e722e39 100644 --- a/exporter/sumologicexporter/sender_test.go +++ b/exporter/sumologicexporter/sender_test.go @@ -22,7 +22,6 @@ import ( "net/http" "net/http/httptest" "strings" - "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -31,6 +30,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/atomic" ) type senderTest struct { @@ -40,16 +40,16 @@ type senderTest struct { } func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request)) *senderTest { - var reqCounter int32 + reqCounter := atomic.NewInt32(0) // generate a test server so we can capture and inspect the request testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if len(cb) == 0 { return } - if c := int(atomic.LoadInt32(&reqCounter)); assert.Greater(t, len(cb), c) { + if c := int(reqCounter.Load()); assert.Greater(t, len(cb), c) { cb[c](w, req) - atomic.AddInt32(&reqCounter, 1) + reqCounter.Inc() } })) diff --git a/processor/cumulativetodeltaprocessor/go.mod b/processor/cumulativetodeltaprocessor/go.mod index 83b301f245de..09b2ca11188d 100644 --- a/processor/cumulativetodeltaprocessor/go.mod +++ b/processor/cumulativetodeltaprocessor/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/zap v1.21.0 ) @@ -27,7 +28,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/tracker_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/tracker_test.go index b607a364c1eb..e3e55892a7dd 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/tracker_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/tracker_test.go @@ -17,12 +17,12 @@ package tracking import ( "context" "reflect" - "sync/atomic" "testing" "time" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -216,7 +216,7 @@ func Test_metricTracker_removeStale(t *testing.T) { func Test_metricTracker_sweeper(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) sweepEvent := make(chan pcommon.Timestamp) - closed := int32(0) + closed := atomic.NewBool(false) onSweep := func(staleBefore pcommon.Timestamp) { sweepEvent <- staleBefore @@ -230,14 +230,14 @@ func Test_metricTracker_sweeper(t *testing.T) { start := time.Now() go func() { tr.sweeper(ctx, onSweep) - atomic.StoreInt32(&closed, 1) + closed.Store(true) close(sweepEvent) }() for i := 1; i <= 2; i++ { staleBefore := <-sweepEvent tickTime := time.Since(start) + tr.maxStaleness*time.Duration(i) - if atomic.LoadInt32(&closed) == 1 { + if closed.Load() { t.Fatalf("Sweeper returned prematurely.") } @@ -252,7 +252,7 @@ func Test_metricTracker_sweeper(t *testing.T) { } cancel() <-sweepEvent - if atomic.LoadInt32(&closed) == 0 { + if !closed.Load() { t.Errorf("Sweeper did not terminate.") } } diff --git a/processor/groupbytraceprocessor/event_test.go b/processor/groupbytraceprocessor/event_test.go index fe1d04fe4ead..b1cefa612121 100644 --- a/processor/groupbytraceprocessor/event_test.go +++ b/processor/groupbytraceprocessor/event_test.go @@ -18,7 +18,6 @@ import ( "errors" "strings" "sync" - "sync/atomic" "testing" "time" @@ -28,6 +27,7 @@ import ( "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -357,14 +357,15 @@ func TestEventShutdown(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - traceReceivedFired, traceExpiredFired := int64(0), int64(0) + traceReceivedFired := atomic.NewInt64(0) + traceExpiredFired := atomic.NewInt64(0) em := newEventMachine(zap.NewNop(), 50, 1, 1_000) em.onTraceReceived = func(tracesWithID, *eventMachineWorker) error { - atomic.StoreInt64(&traceReceivedFired, 1) + traceReceivedFired.Store(1) return nil } em.onTraceExpired = func(pcommon.TraceID, *eventMachineWorker) error { - atomic.StoreInt64(&traceExpiredFired, 1) + traceExpiredFired.Store(1) return nil } em.onTraceRemoved = func(pcommon.TraceID) error { @@ -409,13 +410,13 @@ func TestEventShutdown(t *testing.T) { }) // verify - assert.Equal(t, int64(1), atomic.LoadInt64(&traceReceivedFired)) + assert.Equal(t, int64(1), traceReceivedFired.Load()) // If the code is wrong, there's a chance that the test will still pass // in case the event is processed after the assertion. // for this reason, we add a small delay here time.Sleep(10 * time.Millisecond) - assert.Equal(t, int64(0), atomic.LoadInt64(&traceExpiredFired)) + assert.Equal(t, int64(0), traceExpiredFired.Load()) // wait until the shutdown has returned shutdownWg.Wait() diff --git a/processor/groupbytraceprocessor/go.mod b/processor/groupbytraceprocessor/go.mod index 41902608336e..4225b79ebe69 100644 --- a/processor/groupbytraceprocessor/go.mod +++ b/processor/groupbytraceprocessor/go.mod @@ -8,6 +8,7 @@ require ( go.opencensus.io v0.23.0 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 ) @@ -28,7 +29,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index b31c054fd16f..d58a2bc14af4 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -18,12 +18,12 @@ import ( "encoding/binary" "runtime" "sync" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/atomic" ) func TestBatcherNew(t *testing.T) { @@ -70,13 +70,13 @@ func BenchmarkConcurrentEnqueue(b *testing.B) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - var ticked int32 - var received int32 + ticked := atomic.NewInt64(0) + received := atomic.NewInt64(0) go func() { for range ticker.C { batch, _ := batcher.CloseCurrentAndTakeFirstBatch() - atomic.AddInt32(&ticked, 1) - atomic.AddInt32(&received, int32(len(batch))) + ticked.Inc() + received.Add(int64(len(batch))) } }() diff --git a/receiver/prometheusreceiver/go.mod b/receiver/prometheusreceiver/go.mod index b0b39c830c9b..583dc98c5563 100644 --- a/receiver/prometheusreceiver/go.mod +++ b/receiver/prometheusreceiver/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7 go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/zap v1.21.0 google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v2 v2.4.0 @@ -135,7 +136,6 @@ require ( go.opentelemetry.io/otel/sdk v1.7.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect diff --git a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go index 5fed6b838bac..64189d3766fc 100644 --- a/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go +++ b/receiver/prometheusreceiver/internal/staleness_end_to_end_test.go @@ -23,7 +23,6 @@ import ( "net/url" "os" "strings" - "sync/atomic" "testing" "time" @@ -38,6 +37,7 @@ import ( "go.opentelemetry.io/collector/config/mapprovider/filemapprovider" "go.opentelemetry.io/collector/processor/batchprocessor" "go.opentelemetry.io/collector/service" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -57,10 +57,10 @@ func TestStalenessMarkersEndToEnd(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // 1. Setup the server that sends series that intermittently appear and disappear. - var n uint64 + n := atomic.NewUint64(0) scrapeServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { // Increment the scrape count atomically per scrape. - i := atomic.AddUint64(&n, 1) + i := n.Add(1) select { case <-ctx.Done(): diff --git a/receiver/prometheusreceiver/metrics_receiver_helper_test.go b/receiver/prometheusreceiver/metrics_receiver_helper_test.go index d87f157975b2..4930bffbaafb 100644 --- a/receiver/prometheusreceiver/metrics_receiver_helper_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_helper_test.go @@ -23,7 +23,6 @@ import ( "net/http/httptest" "net/url" "sync" - "sync/atomic" "testing" gokitlog "github.com/go-kit/log" @@ -38,6 +37,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/atomic" "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" @@ -52,18 +52,17 @@ type mockPrometheusResponse struct { type mockPrometheus struct { mu sync.Mutex // mu protects the fields below. endpoints map[string][]mockPrometheusResponse - accessIndex map[string]*int32 + accessIndex map[string]*atomic.Int32 wg *sync.WaitGroup srv *httptest.Server } func newMockPrometheus(endpoints map[string][]mockPrometheusResponse) *mockPrometheus { - accessIndex := make(map[string]*int32) + accessIndex := make(map[string]*atomic.Int32) wg := &sync.WaitGroup{} wg.Add(len(endpoints)) for k := range endpoints { - v := int32(0) - accessIndex[k] = &v + accessIndex[k] = atomic.NewInt32(0) } mp := &mockPrometheus{ wg: wg, @@ -84,8 +83,8 @@ func (mp *mockPrometheus) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(404) return } - index := int(*iptr) - atomic.AddInt32(iptr, 1) + index := int(iptr.Load()) + iptr.Add(1) pages := mp.endpoints[req.URL.Path] if index >= len(pages) { if index == len(pages) {