From 1ff279a2edbbff8825d2f1ab97a8197b9a13affc Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 6 May 2022 12:40:29 -0700 Subject: [PATCH] signalfxexporter: remove usage of sync/atomic (#9779) Signed-off-by: Bogdan Drutu --- exporter/signalfxexporter/go.mod | 9 +++---- .../internal/dimensions/dimclient.go | 27 +++---------------- .../internal/dimensions/dimclient_test.go | 25 +++++------------ .../internal/dimensions/metadata.go | 2 -- .../internal/dimensions/requests.go | 25 +++++++---------- 5 files changed, 23 insertions(+), 65 deletions(-) diff --git a/exporter/signalfxexporter/go.mod b/exporter/signalfxexporter/go.mod index f1b87e22fb69..3595f1137826 100644 --- a/exporter/signalfxexporter/go.mod +++ b/exporter/signalfxexporter/go.mod @@ -6,6 +6,7 @@ require ( github.com/gobwas/glob v0.2.3 github.com/gogo/protobuf v1.3.2 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.50.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.50.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.50.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.50.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.50.0 @@ -14,13 +15,10 @@ require ( github.com/signalfx/signalfx-agent/pkg/apm v0.0.0-20201202163743-65b4fa925fc8 github.com/stretchr/testify v1.7.1 go.opentelemetry.io/collector v0.50.1-0.20220429151328-041f39835df7 + go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7 + go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.21.0 -) - -require ( - github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.50.0 - go.opentelemetry.io/collector/semconv v0.50.1-0.20220429151328-041f39835df7 gopkg.in/yaml.v2 v2.4.0 ) @@ -61,7 +59,6 @@ require ( go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/metric v0.30.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient.go b/exporter/signalfxexporter/internal/dimensions/dimclient.go index 703a92063d3c..70f31f7dcb0d 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient.go @@ -26,7 +26,6 @@ import ( "reflect" "strings" "sync" - "sync/atomic" "time" "go.uber.org/zap" @@ -61,19 +60,9 @@ type DimensionClient struct { // For easier unit testing now func() time.Time - // TODO: Look into collecting these metrics and other traces via obsreport - DimensionsCurrentlyDelayed int64 - TotalDimensionsDropped int64 - // The number of dimension updates that happened to the same dimension - // within sendDelay. - TotalFlappyUpdates int64 - TotalClientError4xxResponses int64 - TotalRetriedUpdates int64 - TotalInvalidDimensions int64 - TotalSuccessfulUpdates int64 - logUpdates bool - logger *zap.Logger - metricsConverter translation.MetricsConverter + logUpdates bool + logger *zap.Logger + metricsConverter translation.MetricsConverter } type queuedDimension struct { @@ -139,15 +128,11 @@ func (dc *DimensionClient) acceptDimension(dimUpdate *DimensionUpdate) error { if delayedDimUpdate := dc.delayedSet[dimUpdate.Key()]; delayedDimUpdate != nil { if !reflect.DeepEqual(delayedDimUpdate, dimUpdate) { - dc.TotalFlappyUpdates++ - // Merge the latest updates into existing one. delayedDimUpdate.Properties = mergeProperties(delayedDimUpdate.Properties, dimUpdate.Properties) delayedDimUpdate.Tags = mergeTags(delayedDimUpdate.Tags, dimUpdate.Tags) } } else { - atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(1)) - dc.delayedSet[dimUpdate.Key()] = dimUpdate select { case dc.delayedQueue <- &queuedDimension{ @@ -156,8 +141,6 @@ func (dc *DimensionClient) acceptDimension(dimUpdate *DimensionUpdate) error { }: break default: - dc.TotalDimensionsDropped++ - atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(-1)) return errors.New("dropped dimension update, propertiesMaxBuffered exceeded") } } @@ -204,8 +187,6 @@ func (dc *DimensionClient) processQueue() { time.Sleep(delayedDimUpdate.TimeToSend.Sub(now)) } - atomic.AddInt64(&dc.DimensionsCurrentlyDelayed, int64(-1)) - dc.Lock() delete(dc.delayedSet, delayedDimUpdate.Key()) dc.Unlock() @@ -237,7 +218,6 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err req = req.WithContext( context.WithValue(req.Context(), RequestFailedCallbackKey, RequestFailedCallback(func(statusCode int, err error) { if statusCode >= 400 && statusCode < 500 && statusCode != 404 { - atomic.AddInt64(&dc.TotalClientError4xxResponses, int64(1)) dc.logger.Error( "Unable to update dimension, not retrying", zap.Error(err), @@ -261,7 +241,6 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err zap.String("dimensionUpdate", dimUpdate.String()), zap.Int("statusCode", statusCode), ) - atomic.AddInt64(&dc.TotalRetriedUpdates, int64(1)) // The retry is meant to provide some measure of robustness against // temporary API failures. If the API is down for significant // periods of time, dimension updates will probably eventually back diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go index 2ed60afe829a..5deb7d2453cf 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go @@ -23,11 +23,11 @@ import ( "net/url" "regexp" "strconv" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -61,11 +61,11 @@ loop: return dims } -func makeHandler(dimCh chan<- dim, forcedResp *atomic.Value) http.HandlerFunc { +func makeHandler(dimCh chan<- dim, forcedResp *atomic.Int32) http.HandlerFunc { forcedResp.Store(200) return func(rw http.ResponseWriter, r *http.Request) { - forcedRespInt := forcedResp.Load().(int) + forcedRespInt := int(forcedResp.Load()) if forcedRespInt != 200 { rw.WriteHeader(forcedRespInt) return @@ -98,11 +98,11 @@ func makeHandler(dimCh chan<- dim, forcedResp *atomic.Value) http.HandlerFunc { } } -func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Value, context.CancelFunc) { +func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.CancelFunc) { dimCh := make(chan dim) - var forcedResp atomic.Value - server := httptest.NewServer(makeHandler(dimCh, &forcedResp)) + forcedResp := atomic.NewInt32(0) + server := httptest.NewServer(makeHandler(dimCh, forcedResp)) serverURL, err := url.Parse(server.URL) require.NoError(t, err, "failed to get server URL", err) @@ -122,7 +122,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Value, context.Can }) client.Start() - return client, dimCh, &forcedResp, cancel + return client, dimCh, forcedResp, cancel } func TestDimensionClient(t *testing.T) { @@ -354,16 +354,6 @@ func TestFlappyUpdates(t *testing.T) { Properties: map[string]*string{"index": newString("4")}, }, }, dims) - - // Give it enough time to run the counter updates which happen after the - // request is completed. - time.Sleep(1 * time.Second) - - require.Equal(t, int64(8), atomic.LoadInt64(&client.TotalFlappyUpdates)) - require.Equal(t, int64(0), atomic.LoadInt64(&client.DimensionsCurrentlyDelayed)) - require.Equal(t, int64(2), atomic.LoadInt64(&client.requestSender.TotalRequestsStarted)) - require.Equal(t, int64(2), atomic.LoadInt64(&client.requestSender.TotalRequestsCompleted)) - require.Equal(t, int64(0), atomic.LoadInt64(&client.requestSender.TotalRequestsFailed)) } func TestInvalidUpdatesNotSent(t *testing.T) { @@ -394,7 +384,6 @@ func TestInvalidUpdatesNotSent(t *testing.T) { dims := waitForDims(dimCh, 2, 3) require.Len(t, dims, 0) - require.Equal(t, int64(0), atomic.LoadInt64(&client.TotalInvalidDimensions)) } func newString(s string) *string { diff --git a/exporter/signalfxexporter/internal/dimensions/metadata.go b/exporter/signalfxexporter/internal/dimensions/metadata.go index 1593e6dc42dd..461652c7991e 100644 --- a/exporter/signalfxexporter/internal/dimensions/metadata.go +++ b/exporter/signalfxexporter/internal/dimensions/metadata.go @@ -17,7 +17,6 @@ package dimensions // import "github.com/open-telemetry/opentelemetry-collector- import ( "fmt" "strings" - "sync/atomic" "go.uber.org/multierr" @@ -111,7 +110,6 @@ func (dc *DimensionClient) PushMetadata(metadata []*metadata.MetadataUpdate) err dimensionUpdate := getDimensionUpdateFromMetadata(*m, dc.metricsConverter) if dimensionUpdate.Name == "" || dimensionUpdate.Value == "" { - atomic.AddInt64(&dc.TotalInvalidDimensions, int64(1)) return fmt.Errorf("dimensionUpdate %v is missing Name or value, cannot send", dimensionUpdate) } diff --git a/exporter/signalfxexporter/internal/dimensions/requests.go b/exporter/signalfxexporter/internal/dimensions/requests.go index 41166b5e5381..ffe59e1c2551 100644 --- a/exporter/signalfxexporter/internal/dimensions/requests.go +++ b/exporter/signalfxexporter/internal/dimensions/requests.go @@ -33,7 +33,8 @@ import ( "fmt" "io/ioutil" "net/http" - "sync/atomic" + + "go.uber.org/atomic" ) // ReqSender is a direct port of @@ -44,11 +45,7 @@ type ReqSender struct { workerCount uint ctx context.Context additionalDimensions map[string]string - - RunningWorkers int64 - TotalRequestsStarted int64 - TotalRequestsCompleted int64 - TotalRequestsFailed int64 + runningWorkers *atomic.Int64 } func NewReqSender(ctx context.Context, client *http.Client, @@ -57,9 +54,10 @@ func NewReqSender(ctx context.Context, client *http.Client, client: client, additionalDimensions: diagnosticDimensions, // Unbuffered so that it blocks clients - requests: make(chan *http.Request), - workerCount: workerCount, - ctx: ctx, + requests: make(chan *http.Request), + workerCount: workerCount, + ctx: ctx, + runningWorkers: atomic.NewInt64(0), } } @@ -72,7 +70,7 @@ func (rs *ReqSender) Send(req *http.Request) { case rs.requests <- req: return default: - if atomic.LoadInt64(&rs.RunningWorkers) < int64(rs.workerCount) { + if rs.runningWorkers.Load() < int64(rs.workerCount) { go rs.processRequests() } @@ -82,20 +80,17 @@ func (rs *ReqSender) Send(req *http.Request) { } func (rs *ReqSender) processRequests() { - atomic.AddInt64(&rs.RunningWorkers, int64(1)) - defer atomic.AddInt64(&rs.RunningWorkers, int64(-1)) + rs.runningWorkers.Add(1) + defer rs.runningWorkers.Add(-1) for { select { case <-rs.ctx.Done(): return case req := <-rs.requests: - atomic.AddInt64(&rs.TotalRequestsStarted, int64(1)) if err := rs.sendRequest(req); err != nil { - atomic.AddInt64(&rs.TotalRequestsFailed, int64(1)) continue } - atomic.AddInt64(&rs.TotalRequestsCompleted, int64(1)) } } }