From d936776c102f185cdf6aa7604320fcdb79f62588 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Mon, 15 Apr 2024 07:38:37 -0700 Subject: [PATCH] [receiver/mongodbatlas] Fix memory leak (#32206) **Description:** The receiver was leaking goroutines by holding onto idle connections. The solution is to reference the underlying `Transport` object, and call `CloseIdleConnections` on it during shutdown. This change also enables `goleak` on the MongoDB Atlas receiver to help ensure no goroutines are being leaked. **Link to tracking Issue:** #30428 **Testing:** All existing tests are passing, as well as added `goleak` checks. --- .chloggen/goleak_mongodbatlas.yaml | 27 +++++++++++++++++++ receiver/mongodbatlasreceiver/events.go | 9 ++++++- receiver/mongodbatlasreceiver/events_test.go | 21 ++++++++++----- .../internal/mongodb_atlas_client.go | 6 ++++- receiver/mongodbatlasreceiver/package_test.go | 14 ++++++++++ .../mongodbatlasreceiver/receiver_test.go | 10 ++++--- 6 files changed, 75 insertions(+), 12 deletions(-) create mode 100644 .chloggen/goleak_mongodbatlas.yaml create mode 100644 receiver/mongodbatlasreceiver/package_test.go diff --git a/.chloggen/goleak_mongodbatlas.yaml b/.chloggen/goleak_mongodbatlas.yaml new file mode 100644 index 000000000000..cb54de5399ce --- /dev/null +++ b/.chloggen/goleak_mongodbatlas.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mongodbatlasreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak by closing idle connections on shutdown + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32206] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/mongodbatlasreceiver/events.go b/receiver/mongodbatlasreceiver/events.go index df01af961858..a07eda518514 100644 --- a/receiver/mongodbatlasreceiver/events.go +++ b/receiver/mongodbatlasreceiver/events.go @@ -6,6 +6,7 @@ package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" "encoding/json" + "errors" "fmt" "sync" "time" @@ -34,6 +35,7 @@ type eventsClient interface { GetProjectEvents(ctx context.Context, groupID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error) GetOrganization(ctx context.Context, orgID string) (*mongodbatlas.Organization, error) GetOrganizationEvents(ctx context.Context, orgID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error) + Shutdown() error } type eventsReceiver struct { @@ -97,7 +99,12 @@ func (er *eventsReceiver) Shutdown(ctx context.Context) error { er.logger.Debug("Shutting down events receiver") er.cancel() er.wg.Wait() - return er.checkpoint(ctx) + + var err []error + err = append(err, er.client.Shutdown()) + err = append(err, er.checkpoint(ctx)) + + return errors.Join(err...) } func (er *eventsReceiver) startPolling(ctx context.Context) error { diff --git a/receiver/mongodbatlasreceiver/events_test.go b/receiver/mongodbatlasreceiver/events_test.go index 1a8761fb07b8..786f413f2685 100644 --- a/receiver/mongodbatlasreceiver/events_test.go +++ b/receiver/mongodbatlasreceiver/events_test.go @@ -164,16 +164,14 @@ func TestProjectGetFailure(t *testing.T) { mClient := &mockEventsClient{} mClient.On("GetProject", mock.Anything, "fake-project").Return(nil, fmt.Errorf("unable to get project: %d", http.StatusUnauthorized)) mClient.On("GetOrganization", mock.Anything, "fake-org").Return(nil, fmt.Errorf("unable to get org: %d", http.StatusUnauthorized)) + mClient.setupMock(t) + r.client = mClient - err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient()) - require.NoError(t, err) - + require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())) require.Never(t, func() bool { return sink.LogRecordCount() > 0 }, 2*time.Second, 500*time.Millisecond) - - err = r.Shutdown(context.Background()) - require.NoError(t, err) + require.NoError(t, r.Shutdown(context.Background())) } type mockEventsClient struct { @@ -216,7 +214,12 @@ func (mec *mockEventsClient) loadTestEvents(t *testing.T, filename string) []*mo func (mec *mockEventsClient) GetProject(ctx context.Context, pID string) (*mongodbatlas.Project, error) { args := mec.Called(ctx, pID) - return args.Get(0).(*mongodbatlas.Project), args.Error(1) + receivedProject := args.Get(0) + if receivedProject == nil { + return nil, args.Error(1) + } + + return receivedProject.(*mongodbatlas.Project), args.Error(1) } func (mec *mockEventsClient) GetProjectEvents(ctx context.Context, pID string, opts *internal.GetEventsOptions) ([]*mongodbatlas.Event, bool, error) { @@ -233,3 +236,7 @@ func (mec *mockEventsClient) GetOrganizationEvents(ctx context.Context, oID stri args := mec.Called(ctx, oID, opts) return args.Get(0).([]*mongodbatlas.Event), args.Bool(1), args.Error(2) } + +func (mec *mockEventsClient) Shutdown() error { + return nil +} diff --git a/receiver/mongodbatlasreceiver/internal/mongodb_atlas_client.go b/receiver/mongodbatlasreceiver/internal/mongodb_atlas_client.go index c6e5a37d8af2..75357fd69095 100644 --- a/receiver/mongodbatlasreceiver/internal/mongodb_atlas_client.go +++ b/receiver/mongodbatlasreceiver/internal/mongodb_atlas_client.go @@ -123,6 +123,7 @@ func (rt *clientRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) type MongoDBAtlasClient struct { log *zap.Logger client *mongodbatlas.Client + transport *http.Transport roundTripper *clientRoundTripper } @@ -133,18 +134,21 @@ func NewMongoDBAtlasClient( backoffConfig configretry.BackOffConfig, log *zap.Logger, ) *MongoDBAtlasClient { - t := digest.NewTransport(publicKey, privateKey) + defaultTransporter := &http.Transport{} + t := digest.NewTransportWithHTTPTransport(publicKey, privateKey, defaultTransporter) roundTripper := newClientRoundTripper(t, log, backoffConfig) tc := &http.Client{Transport: roundTripper} client := mongodbatlas.NewClient(tc) return &MongoDBAtlasClient{ log, client, + defaultTransporter, roundTripper, } } func (s *MongoDBAtlasClient) Shutdown() error { + s.transport.CloseIdleConnections() return s.roundTripper.Shutdown() } diff --git a/receiver/mongodbatlasreceiver/package_test.go b/receiver/mongodbatlasreceiver/package_test.go new file mode 100644 index 000000000000..dade34bd31f6 --- /dev/null +++ b/receiver/mongodbatlasreceiver/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package mongodbatlasreceiver + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/receiver/mongodbatlasreceiver/receiver_test.go b/receiver/mongodbatlasreceiver/receiver_test.go index 9ade727feb20..1cf20aa461b0 100644 --- a/receiver/mongodbatlasreceiver/receiver_test.go +++ b/receiver/mongodbatlasreceiver/receiver_test.go @@ -18,14 +18,18 @@ func TestDefaultConfig(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() require.Equal(t, cfg.(*Config).ControllerConfig.CollectionInterval, 3*time.Minute) - recv, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + recv, err := createMetricsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) require.NotNil(t, recv, "receiver creation failed") - err = recv.Start(context.Background(), componenttest.NewNopHost()) + err = recv.Start(ctx, componenttest.NewNopHost()) require.NoError(t, err) - err = recv.Shutdown(context.Background()) + err = recv.Shutdown(ctx) require.NoError(t, err) }