Skip to content

Commit

Permalink
Change all usages of sync/atomic to uber/atomic in test files (open-t…
Browse files Browse the repository at this point in the history
…elemetry#9811)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored and djaglowski committed May 10, 2022
1 parent 6a55bf4 commit df16efa
Show file tree
Hide file tree
Showing 19 changed files with 78 additions and 76 deletions.
12 changes: 6 additions & 6 deletions exporter/carbonexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -35,6 +34,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/atomic"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
Expand Down Expand Up @@ -226,17 +226,17 @@ func Test_connPool_Concurrency(t *testing.T) {
concurrentWriters := 3
writesPerRoutine := 3

var doneFlag int64
defer func(flag *int64) {
atomic.StoreInt64(flag, 1)
}(&doneFlag)
doneFlag := atomic.NewBool(false)
defer func() {
doneFlag.Store(true)
}()

var recvWG sync.WaitGroup
recvWG.Add(concurrentWriters * writesPerRoutine * md.MetricCount())
go func() {
for {
conn, err := ln.AcceptTCP()
if atomic.LoadInt64(&doneFlag) != 0 {
if doneFlag.Load() {
// Close is expected to cause error.
return
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/carbonexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
google.golang.org/protobuf v1.28.0

)
Expand All @@ -31,7 +32,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
Expand Down
30 changes: 15 additions & 15 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"net/http"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -181,16 +181,16 @@ func TestExporter_PushEvent(t *testing.T) {
}),
}

handlers := map[string]func(*int64) bulkHandler{
"fail http request": func(attempts *int64) bulkHandler {
handlers := map[string]func(attempts *atomic.Int64) bulkHandler{
"fail http request": func(attempts *atomic.Int64) bulkHandler {
return func([]itemRequest) ([]itemResponse, error) {
atomic.AddInt64(attempts, 1)
attempts.Inc()
return nil, &httpTestError{message: "oops"}
}
},
"fail item": func(attempts *int64) bulkHandler {
"fail item": func(attempts *atomic.Int64) bulkHandler {
return func(docs []itemRequest) ([]itemResponse, error) {
atomic.AddInt64(attempts, 1)
attempts.Inc()
return itemsReportStatus(docs, http.StatusTooManyRequests)
}
},
Expand All @@ -202,33 +202,33 @@ func TestExporter_PushEvent(t *testing.T) {
for name, configurer := range configurations {
t.Run(name, func(t *testing.T) {
t.Parallel()
var attempts int64
server := newESTestServer(t, handler(&attempts))
attempts := atomic.NewInt64(0)
server := newESTestServer(t, handler(attempts))

testConfig := configurer(server.URL)
exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig })
mustSend(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), atomic.LoadInt64(&attempts))
assert.Equal(t, int64(1), attempts.Load())
})
}
})
}
})

t.Run("do not retry invalid request", func(t *testing.T) {
var attempts int64
attempts := atomic.NewInt64(0)
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
atomic.AddInt64(&attempts, 1)
attempts.Inc()
return nil, &httpTestError{message: "oops", status: http.StatusBadRequest}
})

exporter := newTestExporter(t, server.URL)
mustSend(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), atomic.LoadInt64(&attempts))
assert.Equal(t, int64(1), attempts.Load())
})

t.Run("retry single item", func(t *testing.T) {
Expand All @@ -252,17 +252,17 @@ func TestExporter_PushEvent(t *testing.T) {
})

t.Run("do not retry bad item", func(t *testing.T) {
var attempts int64
attempts := atomic.NewInt64(0)
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
atomic.AddInt64(&attempts, 1)
attempts.Inc()
return itemsReportStatus(docs, http.StatusBadRequest)
})

exporter := newTestExporter(t, server.URL)
mustSend(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), atomic.LoadInt64(&attempts))
assert.Equal(t, int64(1), attempts.Load())
})

t.Run("only retry failed items", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0

Expand All @@ -33,7 +34,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
2 changes: 1 addition & 1 deletion exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
go.opencensus.io v0.23.0
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
)
Expand Down Expand Up @@ -35,7 +36,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
Expand Down
13 changes: 7 additions & 6 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net"
"sync/atomic"
"testing"
"time"

Expand All @@ -33,6 +32,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -388,17 +388,18 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
lb.res = res
p.loadBalancer = lb

var counter1, counter2 int64
counter1 := atomic.NewInt64(0)
counter2 := atomic.NewInt64(0)
defaultExporters := map[string]component.Exporter{
"127.0.0.1": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
atomic.AddInt64(&counter1, 1)
counter1.Inc()
// simulate an unreachable backend
time.Sleep(10 * time.Second)
return nil
},
),
"127.0.0.2": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
atomic.AddInt64(&counter2, 1)
counter2.Inc()
return nil
},
),
Expand Down Expand Up @@ -448,8 +449,8 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {

// verify
require.Equal(t, []string{"127.0.0.2"}, lastResolved)
require.Greater(t, atomic.LoadInt64(&counter1), int64(0))
require.Greater(t, atomic.LoadInt64(&counter2), int64(0))
require.Greater(t, counter1.Load(), int64(0))
require.Greater(t, counter2.Load(), int64(0))
}

func randomLogs() plog.Logs {
Expand Down
13 changes: 7 additions & 6 deletions exporter/loadbalancingexporter/trace_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math/rand"
"net"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand All @@ -37,6 +36,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/service/servicetest"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -416,17 +416,18 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {
lb.res = res
p.loadBalancer = lb

var counter1, counter2 int64
counter1 := atomic.NewInt64(0)
counter2 := atomic.NewInt64(0)
defaultExporters := map[string]component.Exporter{
"127.0.0.1": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error {
atomic.AddInt64(&counter1, 1)
counter1.Inc()
// simulate an unreachable backend
time.Sleep(10 * time.Second)
return nil
},
),
"127.0.0.2": newMockTracesExporter(func(ctx context.Context, td ptrace.Traces) error {
atomic.AddInt64(&counter2, 1)
counter2.Inc()
return nil
},
),
Expand Down Expand Up @@ -476,8 +477,8 @@ func TestRollingUpdatesWhenConsumeTraces(t *testing.T) {

// verify
require.Equal(t, []string{"127.0.0.2"}, lastResolved)
require.Greater(t, atomic.LoadInt64(&counter1), int64(0))
require.Greater(t, atomic.LoadInt64(&counter2), int64(0))
require.Greater(t, counter1.Load(), int64(0))
require.Greater(t, counter2.Load(), int64(0))
}

func randomTraces() ptrace.Traces {
Expand Down
2 changes: 1 addition & 1 deletion exporter/skywalkingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
google.golang.org/grpc v1.46.0
skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
)
Expand All @@ -34,7 +35,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
Expand Down
10 changes: 5 additions & 5 deletions exporter/skywalkingexporter/skywalking_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -31,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/atomic"
"google.golang.org/grpc"
v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
logpb "skywalking.apache.org/repo/goapi/collect/logging/v3"
Expand All @@ -39,7 +39,7 @@ import (
)

var (
consumerNum int32
consumerNum = atomic.NewInt32(0)
sumNum = 10000
)

Expand Down Expand Up @@ -89,7 +89,7 @@ func TestSkywalking(t *testing.T) {

func test(nGoroutine int, nStream int, t *testing.T) {
exporter, server, m := doInit(nStream, t)
atomic.StoreInt32(&consumerNum, -int32(nStream))
consumerNum.Store(-int32(nStream))
l := testdata.GenerateLogsOneLogRecordNoResource()
l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetIntVal(0)

Expand Down Expand Up @@ -199,8 +199,8 @@ func (h *mockLogHandler2) Collect(stream logpb.LogReportService_CollectServer) e
return stream.SendAndClose(&v3.Commands{})
}
if err == nil {
atomic.AddInt32(&consumerNum, 1)
if atomic.LoadInt32(&consumerNum) >= int32(sumNum) {
consumerNum.Inc()
if consumerNum.Load() >= int32(sumNum) {
end := time.Now().UnixMilli()
h.stopChan <- end
return nil
Expand Down
2 changes: 1 addition & 1 deletion exporter/sumologicexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7
go.opentelemetry.io/collector/pdata v0.50.1-0.20220429151328-041f39835df7
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.8.0
)

Expand Down Expand Up @@ -34,7 +35,6 @@ require (
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
Expand Down
8 changes: 4 additions & 4 deletions exporter/sumologicexporter/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -31,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/atomic"
)

type senderTest struct {
Expand All @@ -40,16 +40,16 @@ type senderTest struct {
}

func prepareSenderTest(t *testing.T, cb []func(w http.ResponseWriter, req *http.Request)) *senderTest {
var reqCounter int32
reqCounter := atomic.NewInt32(0)
// generate a test server so we can capture and inspect the request
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if len(cb) == 0 {
return
}

if c := int(atomic.LoadInt32(&reqCounter)); assert.Greater(t, len(cb), c) {
if c := int(reqCounter.Load()); assert.Greater(t, len(cb), c) {
cb[c](w, req)
atomic.AddInt32(&reqCounter, 1)
reqCounter.Inc()
}
}))

Expand Down
Loading

0 comments on commit df16efa

Please sign in to comment.