diff --git a/.chloggen/goleak_awscontainerinsights_k8sapiserver.yaml b/.chloggen/goleak_awscontainerinsights_k8sapiserver.yaml new file mode 100644 index 000000000000..0c7997d0be87 --- /dev/null +++ b/.chloggen/goleak_awscontainerinsights_k8sapiserver.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscontainerinsightreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak on shutdown in K8s API Server + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32405] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go index bc9c593ca14e..ff3d4b7bb65d 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver.go @@ -45,6 +45,10 @@ type eventBroadcaster interface { // NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster // with the event source set to the given event source. NewRecorder(scheme *runtime.Scheme, source v1.EventSource) record.EventRecorderLogger + // Shutdown shuts down the broadcaster. Once the broadcaster is shut + // down, it will only try to record an event in a sink once before + // giving up on it with an error message. + Shutdown() } type K8sClient interface { @@ -89,19 +93,25 @@ func New(clusterNameProvider clusterNameProvider, logger *zap.Logger, options .. logger: logger, clusterNameProvider: clusterNameProvider, k8sClient: k8sclient.Get(logger), - broadcaster: record.NewBroadcaster(), } for _, opt := range options { opt(k) } + if k.broadcaster == nil { + // NewBroadcaster starts a goroutine in the background, so we only want to + // call if it a user hasn't defined their own broadcaster. This will help + // avoid leaking a goroutine. + k.broadcaster = record.NewBroadcaster() + } + if k.k8sClient == nil { - return nil, errors.New("failed to start k8sapiserver because k8sclient is nil") + return nil, errors.Join(k.Shutdown(), errors.New("failed to start k8sapiserver because k8sclient is nil")) } if err := k.init(); err != nil { - return nil, fmt.Errorf("fail to initialize k8sapiserver, err: %w", err) + return nil, errors.Join(k.Shutdown(), fmt.Errorf("fail to initialize k8sapiserver, err: %w", err)) } return k, nil @@ -242,6 +252,10 @@ func (k *K8sAPIServer) Shutdown() error { if k.cancel != nil { k.cancel() } + if k.broadcaster != nil { + k.broadcaster.Shutdown() + } + return nil } diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go index a064d48723df..194994498007 100644 --- a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/k8sapiserver_test.go @@ -104,6 +104,8 @@ func (m *mockEventBroadcaster) NewRecorder(_ *runtime.Scheme, _ v1.EventSource) return record.NewFakeRecorder(100) } +func (m *mockEventBroadcaster) Shutdown() {} + func getStringAttrVal(m pmetric.Metrics, key string) string { rm := m.ResourceMetrics().At(0) attributes := rm.Resource().Attributes() @@ -184,6 +186,8 @@ func TestK8sAPIServer_GetMetrics(t *testing.T) { assert.NotNil(t, k8sAPIServer) assert.NoError(t, err) + defer func() { assert.NoError(t, k8sAPIServer.Shutdown()) }() + mockClient.On("NamespaceToRunningPodNum").Return(map[string]int{"default": 2}) mockClient.On("ClusterFailedNodeCount").Return(1) mockClient.On("ClusterNodeCount").Return(1) diff --git a/receiver/awscontainerinsightreceiver/internal/k8sapiserver/package_test.go b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/package_test.go new file mode 100644 index 000000000000..da7f26d1f1a3 --- /dev/null +++ b/receiver/awscontainerinsightreceiver/internal/k8sapiserver/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sapiserver + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +}