Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/signalfx] Fix goroutine leaks #32781

Merged
merged 11 commits into from
Sep 4, 2024
27 changes: 27 additions & 0 deletions .chloggen/goleak_signalfxexp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signalfxexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak by re-organizing the exporter's functionality lifecycle

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32781]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
15 changes: 9 additions & 6 deletions exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type signalfxExporter struct {
hostMetadataSyncer *hostmetadata.Syncer
converter *translation.MetricsConverter
dimClient *dimensions.DimensionClient
cancelFn func()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cancelFn was originally responsible for shutting down all sub-packages. I've refactored to enable each package to start and shutdown their own functionality, as requested by the caller. (More explanation in PR description).

}

// newSignalFxExporter returns a new SignalFx exporter.
Expand Down Expand Up @@ -100,6 +99,9 @@ func newSignalFxExporter(
}

func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err error) {
if se.converter != nil {
se.converter.Start()
}
ingestURL, err := se.config.getIngestURL()
if err != nil {
return err
Expand Down Expand Up @@ -129,16 +131,13 @@ func (se *signalfxExporter) start(ctx context.Context, host component.Host) (err
if err != nil {
return fmt.Errorf("could not load API TLS config: %w", err)
}
cancellable, cancelFn := context.WithCancel(ctx)
se.cancelFn = cancelFn

apiURL, err := se.config.getAPIURL()
if err != nil {
return err
}

dimClient := dimensions.NewDimensionClient(
cancellable,
dimensions.DimensionClientOptions{
Token: se.config.AccessToken,
APIURL: apiURL,
Expand Down Expand Up @@ -235,8 +234,12 @@ func (se *signalfxExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
}

func (se *signalfxExporter) shutdown(_ context.Context) error {
if se.cancelFn != nil {
se.cancelFn()
if se.dimClient != nil {
se.dimClient.Shutdown()
}

if se.converter != nil {
se.converter.Shutdown()
}
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,6 @@ func TestConsumeMetadata(t *testing.T) {
logger := zap.NewNop()

dimClient := dimensions.NewDimensionClient(
context.Background(),
dimensions.DimensionClientOptions{
Token: "foo",
APIURL: serverURL,
Expand Down Expand Up @@ -1203,6 +1202,10 @@ func TestConsumeMetadata(t *testing.T) {
case <-c:
// wait 500ms longer than send delay
case <-time.After(tt.sendDelay + 500*time.Millisecond):
// If no updates are supposed to be sent, the server doesn't update dimensions, and
// doesn't call Done. This is correct behavior, so the test needs to account for it here,
// or a goroutine will be leaked.
defer wg.Done()
require.True(t, tt.shouldNotSendUpdate, "timeout waiting for response")
}

Expand Down Expand Up @@ -1331,6 +1334,7 @@ func TestTLSExporterInit(t *testing.T) {
sfx, err := newSignalFxExporter(tt.config, exportertest.NewNopSettings())
assert.NoError(t, err)
err = sfx.start(context.Background(), componenttest.NewNopHost())
defer func() { require.NoError(t, sfx.shutdown(context.Background())) }()
if tt.wantErr {
require.Error(t, err)
if tt.wantErrMessage != "" {
Expand Down Expand Up @@ -1402,6 +1406,7 @@ func TestTLSIngestConnection(t *testing.T) {
assert.NoError(t, err)
err = sfx.start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
defer func() { assert.NoError(t, sfx.shutdown(context.Background())) }()

_, err = sfx.pushMetricsData(context.Background(), metricsPayload)
if tt.wantErr {
Expand Down Expand Up @@ -1526,10 +1531,7 @@ func TestTLSAPIConnection(t *testing.T) {
require.NoError(t, err)
serverURL, err := url.Parse(tt.config.APIURL)
assert.NoError(t, err)
cancellable, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
dimClient := dimensions.NewDimensionClient(
cancellable,
dimensions.DimensionClientOptions{
Token: "",
APIURL: serverURL,
Expand All @@ -1541,6 +1543,7 @@ func TestTLSAPIConnection(t *testing.T) {
APITLSConfig: apiTLSCfg,
})
dimClient.Start()
defer func() { dimClient.Shutdown() }()

se := &signalfxExporter{
dimClient: dimClient,
Expand Down
6 changes: 3 additions & 3 deletions exporter/signalfxexporter/generated_package_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 23 additions & 13 deletions exporter/signalfxexporter/internal/dimensions/dimclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// updates are currently not done by this port.
type DimensionClient struct {
sync.RWMutex
ctx context.Context
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contexts aren't supposed to be stored, per go's documentation, so we should store the cancel instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated right? Can you please move it to a separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a part of the refactor as a whole, removing the stored context in favor of the cancel allows the dimclient to be able to start and stop by its own calls. If we leave it as is we're relying on the exporter's top level context cancel (which technically works), but then means we can't call goleak in this package and we're relying on another package to manage its lifecycle without being obvious.

I'd prefer to keep it as is, unless you think this is too much change (or too confusing) in one PR.

cancel context.CancelFunc
Token configopaque.String
APIURL *url.URL
client *http.Client
Expand Down Expand Up @@ -84,7 +84,7 @@ type DimensionClientOptions struct {
}

// NewDimensionClient returns a new client
func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *DimensionClient {
func NewDimensionClient(options DimensionClientOptions) *DimensionClient {
client := &http.Client{
Timeout: options.Timeout,
Transport: &http.Transport{
Expand All @@ -102,10 +102,9 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di
TLSClientConfig: options.APITLSConfig,
},
}
sender := NewReqSender(ctx, client, 20, map[string]string{"client": "dimension"})
sender := NewReqSender(client, 20, map[string]string{"client": "dimension"})

return &DimensionClient{
ctx: ctx,
Token: options.Token,
APIURL: options.APIURL,
sendDelay: options.SendDelay,
Expand All @@ -123,7 +122,18 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di

// Start the client's processing queue
func (dc *DimensionClient) Start() {
go dc.processQueue()
var ctx context.Context
// The dimension client is started during the exporter's startup functionality.
// The collector spec states that for long-running operations, components should
// use the background context, rather than the passed in context.
ctx, dc.cancel = context.WithCancel(context.Background())
go dc.processQueue(ctx)
}

func (dc *DimensionClient) Shutdown() {
if dc.cancel != nil {
dc.cancel()
}
}

// acceptDimension to be sent to the API. This will return fairly quickly and
Expand Down Expand Up @@ -185,10 +195,10 @@ func mergeTags(tagSets ...map[string]bool) map[string]bool {
return out
}

func (dc *DimensionClient) processQueue() {
func (dc *DimensionClient) processQueue(ctx context.Context) {
for {
select {
case <-dc.ctx.Done():
case <-ctx.Done():
return
case delayedDimUpdate := <-dc.delayedQueue:
now := dc.now()
Expand All @@ -201,7 +211,7 @@ func (dc *DimensionClient) processQueue() {
delete(dc.delayedSet, delayedDimUpdate.Key())
dc.Unlock()

if err := dc.handleDimensionUpdate(delayedDimUpdate.DimensionUpdate); err != nil {
if err := dc.handleDimensionUpdate(ctx, delayedDimUpdate.DimensionUpdate); err != nil {
dc.logger.Error(
"Could not send dimension update",
zap.Error(err),
Expand All @@ -213,13 +223,13 @@ func (dc *DimensionClient) processQueue() {
}

// handleDimensionUpdate will set custom properties on a specific dimension value.
func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) error {
func (dc *DimensionClient) handleDimensionUpdate(ctx context.Context, dimUpdate *DimensionUpdate) error {
var (
req *http.Request
err error
)

req, err = dc.makePatchRequest(dimUpdate)
req, err = dc.makePatchRequest(ctx, dimUpdate)

if err != nil {
return err
Expand Down Expand Up @@ -276,7 +286,7 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err
}
})))

dc.requestSender.Send(req)
dc.requestSender.Send(ctx, req)

return nil
}
Expand All @@ -290,7 +300,7 @@ func (dc *DimensionClient) makeDimURL(key, value string) (*url.URL, error) {
return url, nil
}

func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request, error) {
func (dc *DimensionClient) makePatchRequest(ctx context.Context, dim *DimensionUpdate) (*http.Request, error) {
var (
tagsToAdd []string
tagsToRemove []string
Expand Down Expand Up @@ -319,7 +329,7 @@ func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request
}

req, err := http.NewRequestWithContext(
context.Background(),
ctx,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows the dimension client to shut itself down when requested, instead of relying on an outside caller to handle its lifecycle.

"PATCH",
strings.TrimRight(url.String(), "/")+"/_/sfxagent",
bytes.NewReader(json))
Expand Down
18 changes: 11 additions & 7 deletions exporter/signalfxexporter/internal/dimensions/dimclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can
server.Close()
}()

client := NewDimensionClient(ctx, DimensionClientOptions{
APIURL: serverURL,
LogUpdates: true,
Logger: zap.NewNop(),
SendDelay: time.Second,
MaxBuffered: 10,
})
client := NewDimensionClient(
DimensionClientOptions{
APIURL: serverURL,
LogUpdates: true,
Logger: zap.NewNop(),
SendDelay: time.Second,
MaxBuffered: 10,
})
client.Start()

return client, dimCh, forcedResp, cancel
Expand All @@ -117,6 +118,7 @@ func setup(t *testing.T) (*DimensionClient, chan dim, *atomic.Int32, context.Can
func TestDimensionClient(t *testing.T) {
client, dimCh, forcedResp, cancel := setup(t)
defer cancel()
defer client.Shutdown()

t.Run("send dimension update with properties and tags", func(t *testing.T) {
require.NoError(t, client.acceptDimension(&DimensionUpdate{
Expand Down Expand Up @@ -310,6 +312,7 @@ func TestDimensionClient(t *testing.T) {
func TestFlappyUpdates(t *testing.T) {
client, dimCh, _, cancel := setup(t)
defer cancel()
defer client.Shutdown()

// Do some flappy updates
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -348,6 +351,7 @@ func TestFlappyUpdates(t *testing.T) {
func TestInvalidUpdatesNotSent(t *testing.T) {
client, dimCh, _, cancel := setup(t)
defer cancel()
defer client.Shutdown()
require.NoError(t, client.acceptDimension(&DimensionUpdate{
Name: "host",
Value: "",
Expand Down
14 changes: 14 additions & 0 deletions exporter/signalfxexporter/internal/dimensions/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package dimensions

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
22 changes: 13 additions & 9 deletions exporter/signalfxexporter/internal/dimensions/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,53 @@ type ReqSender struct {
client *http.Client
requests chan *http.Request
workerCount uint
ctx context.Context
additionalDimensions map[string]string
runningWorkers *atomic.Int64
}

func NewReqSender(ctx context.Context, client *http.Client,
func NewReqSender(client *http.Client,
workerCount uint, diagnosticDimensions map[string]string) *ReqSender {
return &ReqSender{
client: client,
additionalDimensions: diagnosticDimensions,
// Unbuffered so that it blocks clients
requests: make(chan *http.Request),
workerCount: workerCount,
ctx: ctx,
runningWorkers: &atomic.Int64{},
}
}

// Send sends the request. Not thread-safe.
func (rs *ReqSender) Send(req *http.Request) {
func (rs *ReqSender) Send(ctx context.Context, req *http.Request) {
// Slight optimization to avoid spinning up unnecessary workers if there
// aren't ever that many dim updates. Once workers start, they remain for the
// duration of the agent.
select {
case <-ctx.Done():
return
case rs.requests <- req:
return
default:
if rs.runningWorkers.Load() < int64(rs.workerCount) {
go rs.processRequests()
go rs.processRequests(ctx)
}

// Block until we can get through a request
rs.requests <- req
// Block until we can get through a request, unless context has been cancelled.
select {
case <-ctx.Done():
return
case rs.requests <- req:
}
}
}

func (rs *ReqSender) processRequests() {
func (rs *ReqSender) processRequests(ctx context.Context) {
rs.runningWorkers.Add(1)
defer rs.runningWorkers.Add(-1)

for {
select {
case <-rs.ctx.Done():
case <-ctx.Done():
return
case req := <-rs.requests:
if err := rs.sendRequest(req); err != nil {
Expand Down
Loading
Loading