-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[exporter/signalfx] Fix goroutine leaks #32781
Changes from all commits
c26893a
ed17319
dfbe5e9
03723e5
8e2cb07
dedcf3a
e0abb7b
3d6ef7a
3aa4d17
3360bb6
bf5ffce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: bug_fix | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: signalfxexporter | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Fix memory leak by re-organizing the exporter's functionality lifecycle | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [32781] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,7 +32,7 @@ import ( | |
// updates are currently not done by this port. | ||
type DimensionClient struct { | ||
sync.RWMutex | ||
ctx context.Context | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Contexts aren't supposed to be stored, per go's documentation, so we should store the cancel instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is unrelated right? Can you please move it to a separate PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a part of the refactor as a whole, removing the stored context in favor of the cancel allows the dimclient to be able to start and stop by its own calls. If we leave it as is we're relying on the exporter's top level context cancel (which technically works), but then means we can't call I'd prefer to keep it as is, unless you think this is too much change (or too confusing) in one PR. |
||
cancel context.CancelFunc | ||
Token configopaque.String | ||
APIURL *url.URL | ||
client *http.Client | ||
|
@@ -84,7 +84,7 @@ type DimensionClientOptions struct { | |
} | ||
|
||
// NewDimensionClient returns a new client | ||
func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *DimensionClient { | ||
func NewDimensionClient(options DimensionClientOptions) *DimensionClient { | ||
client := &http.Client{ | ||
Timeout: options.Timeout, | ||
Transport: &http.Transport{ | ||
|
@@ -102,10 +102,9 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di | |
TLSClientConfig: options.APITLSConfig, | ||
}, | ||
} | ||
sender := NewReqSender(ctx, client, 20, map[string]string{"client": "dimension"}) | ||
sender := NewReqSender(client, 20, map[string]string{"client": "dimension"}) | ||
|
||
return &DimensionClient{ | ||
ctx: ctx, | ||
Token: options.Token, | ||
APIURL: options.APIURL, | ||
sendDelay: options.SendDelay, | ||
|
@@ -123,7 +122,18 @@ func NewDimensionClient(ctx context.Context, options DimensionClientOptions) *Di | |
|
||
// Start the client's processing queue | ||
func (dc *DimensionClient) Start() { | ||
go dc.processQueue() | ||
var ctx context.Context | ||
// The dimension client is started during the exporter's startup functionality. | ||
// The collector spec states that for long-running operations, components should | ||
// use the background context, rather than the passed in context. | ||
ctx, dc.cancel = context.WithCancel(context.Background()) | ||
go dc.processQueue(ctx) | ||
} | ||
|
||
func (dc *DimensionClient) Shutdown() { | ||
if dc.cancel != nil { | ||
dc.cancel() | ||
} | ||
} | ||
|
||
// acceptDimension to be sent to the API. This will return fairly quickly and | ||
|
@@ -185,10 +195,10 @@ func mergeTags(tagSets ...map[string]bool) map[string]bool { | |
return out | ||
} | ||
|
||
func (dc *DimensionClient) processQueue() { | ||
func (dc *DimensionClient) processQueue(ctx context.Context) { | ||
for { | ||
select { | ||
case <-dc.ctx.Done(): | ||
case <-ctx.Done(): | ||
return | ||
case delayedDimUpdate := <-dc.delayedQueue: | ||
now := dc.now() | ||
|
@@ -201,7 +211,7 @@ func (dc *DimensionClient) processQueue() { | |
delete(dc.delayedSet, delayedDimUpdate.Key()) | ||
dc.Unlock() | ||
|
||
if err := dc.handleDimensionUpdate(delayedDimUpdate.DimensionUpdate); err != nil { | ||
if err := dc.handleDimensionUpdate(ctx, delayedDimUpdate.DimensionUpdate); err != nil { | ||
dc.logger.Error( | ||
"Could not send dimension update", | ||
zap.Error(err), | ||
|
@@ -213,13 +223,13 @@ func (dc *DimensionClient) processQueue() { | |
} | ||
|
||
// handleDimensionUpdate will set custom properties on a specific dimension value. | ||
func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) error { | ||
func (dc *DimensionClient) handleDimensionUpdate(ctx context.Context, dimUpdate *DimensionUpdate) error { | ||
var ( | ||
req *http.Request | ||
err error | ||
) | ||
|
||
req, err = dc.makePatchRequest(dimUpdate) | ||
req, err = dc.makePatchRequest(ctx, dimUpdate) | ||
|
||
if err != nil { | ||
return err | ||
|
@@ -276,7 +286,7 @@ func (dc *DimensionClient) handleDimensionUpdate(dimUpdate *DimensionUpdate) err | |
} | ||
}))) | ||
|
||
dc.requestSender.Send(req) | ||
dc.requestSender.Send(ctx, req) | ||
|
||
return nil | ||
} | ||
|
@@ -290,7 +300,7 @@ func (dc *DimensionClient) makeDimURL(key, value string) (*url.URL, error) { | |
return url, nil | ||
} | ||
|
||
func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request, error) { | ||
func (dc *DimensionClient) makePatchRequest(ctx context.Context, dim *DimensionUpdate) (*http.Request, error) { | ||
var ( | ||
tagsToAdd []string | ||
tagsToRemove []string | ||
|
@@ -319,7 +329,7 @@ func (dc *DimensionClient) makePatchRequest(dim *DimensionUpdate) (*http.Request | |
} | ||
|
||
req, err := http.NewRequestWithContext( | ||
context.Background(), | ||
ctx, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This allows the dimension client to shut itself down when requested, instead of relying on an outside caller to handle its lifecycle. |
||
"PATCH", | ||
strings.TrimRight(url.String(), "/")+"/_/sfxagent", | ||
bytes.NewReader(json)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package dimensions | ||
|
||
import ( | ||
"testing" | ||
|
||
"go.uber.org/goleak" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
goleak.VerifyTestMain(m) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cancelFn was originally responsible for shutting down all sub-packages. I've refactored to enable each package to start and shutdown their own functionality, as requested by the caller. (More explanation in PR description).