diff --git a/.chloggen/goleak_signalfxexp.yaml b/.chloggen/goleak_signalfxexp.yaml new file mode 100644 index 000000000000..6cb03bd5b113 --- /dev/null +++ b/.chloggen/goleak_signalfxexp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: signalfxexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak by re-organizing the exporter's functionality lifecycle + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32781] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/signalfxexporter/exporter.go b/exporter/signalfxexporter/exporter.go index 6e05e7e0b74b..1a7ab7f60c13 100644 --- a/exporter/signalfxexporter/exporter.go +++ b/exporter/signalfxexporter/exporter.go @@ -60,7 +60,6 @@ type signalfxExporter struct { hostMetadataSyncer *hostmetadata.Syncer converter *translation.MetricsConverter dimClient *dimensions.DimensionClient - cancelFn func() } // newSignalFxExporter returns a new SignalFx exporter. @@ -100,6 +99,9 @@ func newSignalFxExporter( } func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) { + if se.converter != nil { + se.converter.Start() + } ingestURL, err := se.config.getIngestURL() if err != nil { return err @@ -129,8 +131,6 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err if err != nil { return fmt.Errorf("could not load API TLS config: %w", err) } - cancellable, cancelFn := context.WithCancel(ctx) - se.cancelFn = cancelFn apiURL, err := se.config.getAPIURL() if err != nil { @@ -138,7 +138,6 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err } dimClient := dimensions.NewDimensionClient( - cancellable, dimensions.DimensionClientOptions{ Token: se.config.AccessToken, APIURL: apiURL, @@ -235,8 +234,12 @@ func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error { } func (se *signalfxExporter) shutdown(_ context.Context) error { - if se.cancelFn != nil { - se.cancelFn() + if se.dimClient != nil { + se.dimClient.Shutdown() + } + + if se.converter != nil { + se.converter.Shutdown() } return nil } diff --git a/exporter/signalfxexporter/exporter_test.go b/exporter/signalfxexporter/exporter_test.go index 650935545a47..ef55bea36865 100644 --- a/exporter/signalfxexporter/exporter_test.go +++ b/exporter/signalfxexporter/exporter_test.go @@ -1169,7 +1169,6 @@ func TestConsumeMetadata(t *testing.T) { logger := zap.NewNop() dimClient := dimensions.NewDimensionClient( - context.Background(), dimensions.DimensionClientOptions{ Token: "foo", APIURL: serverURL, @@ -1203,6 +1202,10 @@ func TestConsumeMetadata(t *testing.T) { case <-c: // wait 500ms longer than send delay case <-time.After(tt.sendDelay + 500*time.Millisecond): + // If no updates are supposed to be sent, the server doesn't update dimensions, and + // doesn't call Done. This is correct behavior, so the test needs to account for it here, + // or a goroutine will be leaked. + defer wg.Done() require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response") } @@ -1331,6 +1334,7 @@ func TestTLSExporterInit(t *testing.T) { sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopSettings()) assert.NoError(t, err) err = sfx.start(context.Background(), componenttest.NewNopHost()) + defer func() { require.NoError(t, sfx.shutdown(context.Background())) }() if tt.wantErr { require.Error(t, err) if tt.wantErrMessage != "" { @@ -1402,6 +1406,7 @@ func TestTLSIngestConnection(t *testing.T) { assert.NoError(t, err) err = sfx.start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err) + defer func() { assert.NoError(t, sfx.shutdown(context.Background())) }() _, err = sfx.pushMetricsData(context.Background(), metricsPayload) if tt.wantErr { @@ -1526,10 +1531,7 @@ func TestTLSAPIConnection(t *testing.T) { require.NoError(t, err) serverURL, err := url.Parse(tt.config.APIURL) assert.NoError(t, err) - cancellable, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() dimClient := dimensions.NewDimensionClient( - cancellable, dimensions.DimensionClientOptions{ Token: "", APIURL: serverURL, @@ -1541,6 +1543,7 @@ func TestTLSAPIConnection(t *testing.T) { APITLSConfig: apiTLSCfg, }) dimClient.Start() + defer func() { dimClient.Shutdown() }() se := &signalfxExporter{ dimClient: dimClient, diff --git a/exporter/signalfxexporter/generated_package_test.go b/exporter/signalfxexporter/generated_package_test.go index 914fdb307101..05bce43252c3 100644 --- a/exporter/signalfxexporter/generated_package_test.go +++ b/exporter/signalfxexporter/generated_package_test.go @@ -3,11 +3,11 @@ package signalfxexporter import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration - os.Exit(m.Run()) + goleak.VerifyTestMain(m) } diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient.go b/exporter/signalfxexporter/internal/dimensions/dimclient.go index 98ca7c6b102c..62366b433331 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient.go @@ -32,7 +32,7 @@ import ( // updates are currently not done by this port. type DimensionClient struct { sync.RWMutex - ctx context.Context + cancel context.CancelFunc Token configopaque.String APIURL *url.URL client *http.Client @@ -84,7 +84,7 @@ type DimensionClientOptions struct { } // NewDimensionClient returns a new client -func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *DimensionClient { +func NewDimensionClient(options DimensionClientOptions) *DimensionClient { client := &http.Client{ Timeout: options.Timeout, Transport: &http.Transport{ @@ -102,10 +102,9 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di TLSClientConfig: options.APITLSConfig, }, } - sender := NewReqSender(ctx, client, 20, map[string]string{"client": "dimension"}) + sender := NewReqSender(client, 20, map[string]string{"client": "dimension"}) return &DimensionClient{ - ctx: ctx, Token: options.Token, APIURL: options.APIURL, sendDelay: options.SendDelay, @@ -123,7 +122,18 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di // Start the client's processing queue func (dc *DimensionClient) Start() { - go dc.processQueue() + var ctx context.Context + // The dimension client is started during the exporter's startup functionality. + // The collector spec states that for long-running operations, components should + // use the background context, rather than the passed in context. + ctx, dc.cancel = context.WithCancel(context.Background()) + go dc.processQueue(ctx) +} + +func (dc *DimensionClient) Shutdown() { + if dc.cancel != nil { + dc.cancel() + } } // acceptDimension to be sent to the API. This will return fairly quickly and @@ -185,10 +195,10 @@ func mergeTags(tagSets ...map[string]bool) map[string]bool { return out } -func (dc *DimensionClient) processQueue() { +func (dc *DimensionClient) processQueue(ctx context.Context) { for { select { - case <-dc.ctx.Done(): + case <-ctx.Done(): return case delayedDimUpdate := <-dc.delayedQueue: now := dc.now() @@ -201,7 +211,7 @@ func (dc *DimensionClient) processQueue() { delete(dc.delayedSet, delayedDimUpdate.Key()) dc.Unlock() - if err := dc.handleDimensionUpdate(delayedDimUpdate.DimensionUpdate); err != nil { + if err := dc.handleDimensionUpdate(ctx, delayedDimUpdate.DimensionUpdate); err != nil { dc.logger.Error( "Could not send dimension update", zap.Error(err), @@ -213,13 +223,13 @@ func (dc *DimensionClient) processQueue() { } // handleDimensionUpdate will set custom properties on a specific dimension value. -func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) error { +func (dc *DimensionClient) handleDimensionUpdate(ctx context.Context, dimUpdate *DimensionUpdate) error { var ( req *http.Request err error ) - req, err = dc.makePatchRequest(dimUpdate) + req, err = dc.makePatchRequest(ctx, dimUpdate) if err != nil { return err @@ -276,7 +286,7 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err } }))) - dc.requestSender.Send(req) + dc.requestSender.Send(ctx, req) return nil } @@ -290,7 +300,7 @@ func (dc *DimensionClient) makeDimURL(key, value string) (*url.URL, error) { return url, nil } -func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request, error) { +func (dc *DimensionClient) makePatchRequest(ctx context.Context, dim *DimensionUpdate) (*http.Request, error) { var ( tagsToAdd []string tagsToRemove []string @@ -319,7 +329,7 @@ func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request } req, err := http.NewRequestWithContext( - context.Background(), + ctx, "PATCH", strings.TrimRight(url.String(), "/")+"/_/sfxagent", bytes.NewReader(json)) diff --git a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go index c996728d860a..17b618a1fe6f 100644 --- a/exporter/signalfxexporter/internal/dimensions/dimclient_test.go +++ b/exporter/signalfxexporter/internal/dimensions/dimclient_test.go @@ -102,13 +102,14 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can server.Close() }() - client := NewDimensionClient(ctx, DimensionClientOptions{ - APIURL: serverURL, - LogUpdates: true, - Logger: zap.NewNop(), - SendDelay: time.Second, - MaxBuffered: 10, - }) + client := NewDimensionClient( + DimensionClientOptions{ + APIURL: serverURL, + LogUpdates: true, + Logger: zap.NewNop(), + SendDelay: time.Second, + MaxBuffered: 10, + }) client.Start() return client, dimCh, forcedResp, cancel @@ -117,6 +118,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can func TestDimensionClient(t *testing.T) { client, dimCh, forcedResp, cancel := setup(t) defer cancel() + defer client.Shutdown() t.Run("send dimension update with properties and tags", func(t *testing.T) { require.NoError(t, client.acceptDimension(&DimensionUpdate{ @@ -310,6 +312,7 @@ func TestDimensionClient(t *testing.T) { func TestFlappyUpdates(t *testing.T) { client, dimCh, _, cancel := setup(t) defer cancel() + defer client.Shutdown() // Do some flappy updates for i := 0; i < 5; i++ { @@ -348,6 +351,7 @@ func TestFlappyUpdates(t *testing.T) { func TestInvalidUpdatesNotSent(t *testing.T) { client, dimCh, _, cancel := setup(t) defer cancel() + defer client.Shutdown() require.NoError(t, client.acceptDimension(&DimensionUpdate{ Name: "host", Value: "", diff --git a/exporter/signalfxexporter/internal/dimensions/package_test.go b/exporter/signalfxexporter/internal/dimensions/package_test.go new file mode 100644 index 000000000000..5b69b4ac3787 --- /dev/null +++ b/exporter/signalfxexporter/internal/dimensions/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package dimensions + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/internal/dimensions/requests.go b/exporter/signalfxexporter/internal/dimensions/requests.go index ed957ddd014e..f328474404d5 100644 --- a/exporter/signalfxexporter/internal/dimensions/requests.go +++ b/exporter/signalfxexporter/internal/dimensions/requests.go @@ -20,12 +20,11 @@ type ReqSender struct { client *http.Client requests chan *http.Request workerCount uint - ctx context.Context additionalDimensions map[string]string runningWorkers *atomic.Int64 } -func NewReqSender(ctx context.Context, client *http.Client, +func NewReqSender(client *http.Client, workerCount uint, diagnosticDimensions map[string]string) *ReqSender { return &ReqSender{ client: client, @@ -33,36 +32,41 @@ func NewReqSender(ctx context.Context, client *http.Client, // Unbuffered so that it blocks clients requests: make(chan *http.Request), workerCount: workerCount, - ctx: ctx, runningWorkers: &atomic.Int64{}, } } // Send sends the request. Not thread-safe. -func (rs *ReqSender) Send(req *http.Request) { +func (rs *ReqSender) Send(ctx context.Context, req *http.Request) { // Slight optimization to avoid spinning up unnecessary workers if there // aren't ever that many dim updates. Once workers start, they remain for the // duration of the agent. select { + case <-ctx.Done(): + return case rs.requests <- req: return default: if rs.runningWorkers.Load() < int64(rs.workerCount) { - go rs.processRequests() + go rs.processRequests(ctx) } - // Block until we can get through a request - rs.requests <- req + // Block until we can get through a request, unless context has been cancelled. + select { + case <-ctx.Done(): + return + case rs.requests <- req: + } } } -func (rs *ReqSender) processRequests() { +func (rs *ReqSender) processRequests(ctx context.Context) { rs.runningWorkers.Add(1) defer rs.runningWorkers.Add(-1) for { select { - case <-rs.ctx.Done(): + case <-ctx.Done(): return case req := <-rs.requests: if err := rs.sendRequest(req); err != nil { diff --git a/exporter/signalfxexporter/internal/translation/converter.go b/exporter/signalfxexporter/internal/translation/converter.go index 9a63c8815e12..ffc5e91e1d18 100644 --- a/exporter/signalfxexporter/internal/translation/converter.go +++ b/exporter/signalfxexporter/internal/translation/converter.go @@ -65,6 +65,12 @@ func NewMetricsConverter( }, nil } +func (c *MetricsConverter) Start() { + if c.metricTranslator != nil { + c.metricTranslator.Start() + } +} + // MetricsToSignalFxV2 converts the passed in MetricsData to SFx datapoints // and if processHistograms is set, histogram metrics are not converted to SFx format. // It returns those datapoints and the number of time series that had to be @@ -161,6 +167,12 @@ func (c *MetricsConverter) ConvertDimension(dim string) string { return filterKeyChars(res, c.datapointValidator.nonAlphanumericDimChars) } +func (c *MetricsConverter) Shutdown() { + if c.metricTranslator != nil { + c.metricTranslator.Shutdown() + } +} + // Values obtained from https://dev.splunk.com/observability/docs/datamodel/ingest#Criteria-for-metric-and-dimension-names-and-values const ( maxMetricNameLength = 256 diff --git a/exporter/signalfxexporter/internal/translation/delta_translator.go b/exporter/signalfxexporter/internal/translation/delta_translator.go index 26f19c06a419..f7d1912681b5 100644 --- a/exporter/signalfxexporter/internal/translation/delta_translator.go +++ b/exporter/signalfxexporter/internal/translation/delta_translator.go @@ -20,10 +20,15 @@ func newDeltaTranslator(ttl int64, done chan struct{}) *deltaTranslator { sweepIntervalSeconds = 1 } m := ttlmap.New(sweepIntervalSeconds, ttl, done) - m.Start() return &deltaTranslator{prevPts: m} } +func (t *deltaTranslator) start() { + if t.prevPts != nil { + t.prevPts.Start() + } +} + func (t *deltaTranslator) translate(pts []*sfxpb.DataPoint, tr Rule) []*sfxpb.DataPoint { for _, currPt := range pts { deltaMetricName, ok := tr.Mapping[currPt.Metric] @@ -64,6 +69,12 @@ func (t *deltaTranslator) deltaPt(deltaMetricName string, currPt *sfxpb.DataPoin return deltaPt } +func (t *deltaTranslator) shutdown() { + if t.prevPts != nil { + t.prevPts.Shutdown() + } +} + func doubleDeltaPt(currPt *sfxpb.DataPoint, prevPt *sfxpb.DataPoint, deltaMetricName string) *sfxpb.DataPoint { delta := *currPt.Value.DoubleValue - *prevPt.Value.DoubleValue if delta < 0 { diff --git a/exporter/signalfxexporter/internal/translation/package_test.go b/exporter/signalfxexporter/internal/translation/package_test.go new file mode 100644 index 000000000000..82f4ca268e8f --- /dev/null +++ b/exporter/signalfxexporter/internal/translation/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package translation + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/internal/translation/translator.go b/exporter/signalfxexporter/internal/translation/translator.go index b899de0dc86e..573691adce86 100644 --- a/exporter/signalfxexporter/internal/translation/translator.go +++ b/exporter/signalfxexporter/internal/translation/translator.go @@ -391,6 +391,12 @@ func getMetricNamesAsSlice(metricName string, metricNames map[string]bool) []str return out } +func (mp *MetricTranslator) Start() { + if mp.deltaTranslator != nil { + mp.deltaTranslator.start() + } +} + // TranslateDataPoints transforms datapoints to a format compatible with signalfx backend // sfxDataPoints represents one metric converted to signalfx protobuf datapoints func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoints []*sfxpb.DataPoint) []*sfxpb.DataPoint { @@ -540,6 +546,12 @@ func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoint return processedDataPoints } +func (mp *MetricTranslator) Shutdown() { + if mp.deltaTranslator != nil { + mp.deltaTranslator.shutdown() + } +} + func calcNewMetricInputPairs(processedDataPoints []*sfxpb.DataPoint, tr Rule) [][2]*sfxpb.DataPoint { var operand1Pts, operand2Pts []*sfxpb.DataPoint for _, dp := range processedDataPoints { diff --git a/exporter/signalfxexporter/internal/utils/package_test.go b/exporter/signalfxexporter/internal/utils/package_test.go new file mode 100644 index 000000000000..832862afb888 --- /dev/null +++ b/exporter/signalfxexporter/internal/utils/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/exporter/signalfxexporter/metadata.yaml b/exporter/signalfxexporter/metadata.yaml index 1f7d2a35c1db..8b95a12ae0f6 100644 --- a/exporter/signalfxexporter/metadata.yaml +++ b/exporter/signalfxexporter/metadata.yaml @@ -17,5 +17,3 @@ tests: retry_on_failure: enabled: false expect_consumer_error: true - goleak: - skip: true diff --git a/receiver/signalfxreceiver/generated_package_test.go b/receiver/signalfxreceiver/generated_package_test.go index c0e3fc300353..224abed22636 100644 --- a/receiver/signalfxreceiver/generated_package_test.go +++ b/receiver/signalfxreceiver/generated_package_test.go @@ -3,11 +3,11 @@ package signalfxreceiver import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration - os.Exit(m.Run()) + goleak.VerifyTestMain(m) } diff --git a/receiver/signalfxreceiver/go.mod b/receiver/signalfxreceiver/go.mod index dffbe66ded3d..1ade241fbffd 100644 --- a/receiver/signalfxreceiver/go.mod +++ b/receiver/signalfxreceiver/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/collector/exporter v0.108.2-0.20240829190554-7da6b618a7ee go.opentelemetry.io/collector/pdata v1.14.2-0.20240829190554-7da6b618a7ee go.opentelemetry.io/collector/receiver v0.108.2-0.20240829190554-7da6b618a7ee + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) diff --git a/receiver/signalfxreceiver/metadata.yaml b/receiver/signalfxreceiver/metadata.yaml index 373f510c7f8d..65549865060f 100644 --- a/receiver/signalfxreceiver/metadata.yaml +++ b/receiver/signalfxreceiver/metadata.yaml @@ -10,5 +10,3 @@ status: emeritus: tests: - goleak: - skip: true diff --git a/receiver/signalfxreceiver/receiver_test.go b/receiver/signalfxreceiver/receiver_test.go index 9ed9c45d6ae5..52c9b83b23f3 100644 --- a/receiver/signalfxreceiver/receiver_test.go +++ b/receiver/signalfxreceiver/receiver_test.go @@ -39,7 +39,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func Test_signalfxeceiver_New(t *testing.T) { +func Test_signalfxreceiver_New(t *testing.T) { defaultConfig := createDefaultConfig().(*Config) type args struct { config Config @@ -78,11 +78,14 @@ func Test_signalfxeceiver_New(t *testing.T) { } err = got.Start(context.Background(), componenttest.NewNopHost()) assert.Equal(t, tt.wantStartErr, err) + if err == nil { + assert.NoError(t, got.Shutdown(context.Background())) + } }) } } -func Test_signalfxeceiver_EndToEnd(t *testing.T) { +func Test_signalfxreceiver_EndToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) cfg := createDefaultConfig().(*Config) cfg.Endpoint = addr