From 1ccd1fc1c5af34a828eb94e31a938ed86614a606 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Mon, 11 Mar 2024 09:47:42 -0700 Subject: [PATCH] [exporter/loadbalancing] Fix memory leaks (#31050) **Description:** This fixes a few goroutine leaks in the loadbalancing exporter. 1. `metrics`, `traces`, and `logs` exporters were starting their respective load balancers, but were not shutting them down. This adds each respective shutdown call. 2. The `loadbalancer` was starting the resolver but never shutting it down. This adds a shutdown call to the resolver. 3. The static resolver was starting resolvers for each passed in exporter, but never shut them down. This adds a shutdown call for each resolver in the static resolver. Also added a couple missing `Shutdown` calls from tests. **Link to tracking Issue:** #30438 **Testing:** All existing tests are passing as well as added goleak checks. --- .chloggen/goleak_loadbalancingexp.yaml | 27 +++++++++++++++++++ exporter/loadbalancingexporter/go.mod | 1 + .../loadbalancingexporter/loadbalancer.go | 5 ++-- .../loadbalancer_test.go | 1 + .../loadbalancingexporter/log_exporter.go | 5 ++-- .../loadbalancingexporter/metrics_exporter.go | 5 ++-- .../loadbalancingexporter/package_test.go | 17 ++++++++++++ .../resolver_dns_test.go | 1 + .../loadbalancingexporter/resolver_static.go | 8 +++++- .../loadbalancingexporter/trace_exporter.go | 5 ++-- 10 files changed, 66 insertions(+), 9 deletions(-) create mode 100755 .chloggen/goleak_loadbalancingexp.yaml create mode 100644 exporter/loadbalancingexporter/package_test.go diff --git a/.chloggen/goleak_loadbalancingexp.yaml b/.chloggen/goleak_loadbalancingexp.yaml new file mode 100755 index 000000000000..ccb58dd1c189 --- /dev/null +++ b/.chloggen/goleak_loadbalancingexp.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: loadbalancingexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leaks on shutdown + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31050] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 53fa84251387..5a4430e94d8a 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 + go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 k8s.io/api v0.29.2 diff --git a/exporter/loadbalancingexporter/loadbalancer.go b/exporter/loadbalancingexporter/loadbalancer.go index cccbc5c39245..618635728098 100644 --- a/exporter/loadbalancingexporter/loadbalancer.go +++ b/exporter/loadbalancingexporter/loadbalancer.go @@ -168,9 +168,10 @@ func endpointFound(endpoint string, endpoints []string) bool { return false } -func (lb *loadBalancer) Shutdown(context.Context) error { +func (lb *loadBalancer) Shutdown(ctx context.Context) error { + err := lb.res.shutdown(ctx) lb.stopped = true - return nil + return err } // exporterAndEndpoint returns the exporter and the endpoint for the given identifier. diff --git a/exporter/loadbalancingexporter/loadbalancer_test.go b/exporter/loadbalancingexporter/loadbalancer_test.go index 26ad546a2cd8..c1b62f926f85 100644 --- a/exporter/loadbalancingexporter/loadbalancer_test.go +++ b/exporter/loadbalancingexporter/loadbalancer_test.go @@ -134,6 +134,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) { err = p.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) + defer func() { assert.NoError(t, p.Shutdown(context.Background())) }() // test _, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0}) diff --git a/exporter/loadbalancingexporter/log_exporter.go b/exporter/loadbalancingexporter/log_exporter.go index c59c2c189e8a..9b79bdc439dc 100644 --- a/exporter/loadbalancingexporter/log_exporter.go +++ b/exporter/loadbalancingexporter/log_exporter.go @@ -57,13 +57,14 @@ func (e *logExporterImp) Start(ctx context.Context, host component.Host) error { return e.loadBalancer.Start(ctx, host) } -func (e *logExporterImp) Shutdown(context.Context) error { +func (e *logExporterImp) Shutdown(ctx context.Context) error { if !e.started { return nil } + err := e.loadBalancer.Shutdown(ctx) e.started = false e.shutdownWg.Wait() - return nil + return err } func (e *logExporterImp) ConsumeLogs(ctx context.Context, ld plog.Logs) error { diff --git a/exporter/loadbalancingexporter/metrics_exporter.go b/exporter/loadbalancingexporter/metrics_exporter.go index 9cd5ea10a676..9e2ca747414f 100644 --- a/exporter/loadbalancingexporter/metrics_exporter.go +++ b/exporter/loadbalancingexporter/metrics_exporter.go @@ -74,10 +74,11 @@ func (e *metricExporterImp) Start(ctx context.Context, host component.Host) erro return e.loadBalancer.Start(ctx, host) } -func (e *metricExporterImp) Shutdown(context.Context) error { +func (e *metricExporterImp) Shutdown(ctx context.Context) error { + err := e.loadBalancer.Shutdown(ctx) e.stopped = true e.shutdownWg.Wait() - return nil + return err } func (e *metricExporterImp) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { diff --git a/exporter/loadbalancingexporter/package_test.go b/exporter/loadbalancingexporter/package_test.go new file mode 100644 index 000000000000..4e898c447576 --- /dev/null +++ b/exporter/loadbalancingexporter/package_test.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "testing" + + "go.uber.org/goleak" +) + +// The IgnoreTopFunction call prevents catching the leak generated by opencensus +// defaultWorker.Start which at this time is part of the package's init call. +// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) +} diff --git a/exporter/loadbalancingexporter/resolver_dns_test.go b/exporter/loadbalancingexporter/resolver_dns_test.go index 6f4ca264e403..ed55f3ce19fc 100644 --- a/exporter/loadbalancingexporter/resolver_dns_test.go +++ b/exporter/loadbalancingexporter/resolver_dns_test.go @@ -107,6 +107,7 @@ func TestCantResolve(t *testing.T) { // verify assert.NoError(t, err) + assert.NoError(t, res.shutdown(context.Background())) } func TestOnChange(t *testing.T) { diff --git a/exporter/loadbalancingexporter/resolver_static.go b/exporter/loadbalancingexporter/resolver_static.go index 8527669db834..86759f132bbe 100644 --- a/exporter/loadbalancingexporter/resolver_static.go +++ b/exporter/loadbalancingexporter/resolver_static.go @@ -49,7 +49,13 @@ func (r *staticResolver) start(ctx context.Context) error { return err } -func (r *staticResolver) shutdown(_ context.Context) error { +func (r *staticResolver) shutdown(context.Context) error { + r.endpoints = nil + + for _, callback := range r.onChangeCallbacks { + callback(r.endpoints) + } + return nil } diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index d7e1dd1e5029..a6f955c69975 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -72,10 +72,11 @@ func (e *traceExporterImp) Start(ctx context.Context, host component.Host) error return e.loadBalancer.Start(ctx, host) } -func (e *traceExporterImp) Shutdown(context.Context) error { +func (e *traceExporterImp) Shutdown(ctx context.Context) error { + err := e.loadBalancer.Shutdown(ctx) e.stopped = true e.shutdownWg.Wait() - return nil + return err } func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {