Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

fix: goroutine leaks #221

Merged
merged 10 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import (
"github.com/mailgun/gubernator/v2/cluster"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func TestStartMultipleInstances(t *testing.T) {
t.Cleanup(func() {
goleak.VerifyNone(t)
})
miparnisari marked this conversation as resolved.
Show resolved Hide resolved
err := cluster.Start(2)
require.NoError(t, err)
defer cluster.Stop()
t.Cleanup(cluster.Stop)

assert.Equal(t, 2, len(cluster.GetPeers()))
assert.Equal(t, 2, len(cluster.GetDaemons()))
Expand Down
2 changes: 1 addition & 1 deletion cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"k8s.io/klog"
"k8s.io/klog/v2"
)

var log = logrus.WithField("category", "gubernator")
Expand Down
7 changes: 6 additions & 1 deletion daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package gubernator
import (
"context"
"crypto/tls"
"io"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -49,6 +50,7 @@ type Daemon struct {
V1Server *V1Instance

log FieldLogger
logWriter *io.PipeWriter
pool PoolInterface
conf DaemonConfig
httpSrv *http.Server
Expand Down Expand Up @@ -282,7 +284,8 @@ func (s *Daemon) Start(ctx context.Context) error {
s.promRegister, promhttp.HandlerFor(s.promRegister, promhttp.HandlerOpts{}),
))
mux.Handle("/", gateway)
log := log.New(newLogWriter(s.log), "", 0)
s.logWriter = newLogWriter(s.log)
log := log.New(s.logWriter, "", 0)
s.httpSrv = &http.Server{Addr: s.conf.HTTPListenAddress, Handler: mux, ErrorLog: log}

s.HTTPListener, err = net.Listen("tcp", s.conf.HTTPListenAddress)
Expand Down Expand Up @@ -376,6 +379,8 @@ func (s *Daemon) Close() {
s.log.Infof("GRPC close for %s ...", s.GRPCListeners[i].Addr())
srv.GracefulStop()
}
s.logWriter.Close()
_ = s.V1Server.Close()
s.wg.Stop()
s.statsHandler.Close()
s.gwCancel()
Expand Down
7 changes: 5 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,11 @@ func TestMain(m *testing.M) {
fmt.Println(err)
os.Exit(1)
}
defer cluster.Stop()
os.Exit(m.Run())
code := m.Run()
cluster.Stop()

// os.Exit doesn't run deferred functions
os.Exit(code)
}

func TestOverTheLimit(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func (gm *globalManager) runAsyncHits() {
hits = make(map[string]*RateLimitReq)
}
case <-done:
interval.Stop()
return false
}
return true
Expand Down Expand Up @@ -203,6 +204,7 @@ func (gm *globalManager) runBroadcasts() {
gm.metricGlobalQueueLength.Set(0)
}
case <-done:
interval.Stop()
return false
}
return true
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
go.opentelemetry.io/otel/trace v1.21.0
go.uber.org/goleak v1.3.0
golang.org/x/net v0.18.0
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b
Expand All @@ -30,7 +31,7 @@ require (
k8s.io/api v0.23.3
k8s.io/apimachinery v0.23.3
k8s.io/client-go v0.23.3
k8s.io/klog v0.3.1
k8s.io/klog/v2 v2.120.1
)

require (
Expand All @@ -41,7 +42,7 @@ require (
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down Expand Up @@ -91,7 +92,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
Expand Down
10 changes: 5 additions & 5 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,15 @@ func (s *V1Instance) Close() (err error) {
return nil
}

if s.conf.Loader == nil {
return nil
}

s.global.Close()

err = s.workerPool.Store(ctx)
if err != nil {
s.log.WithError(err).
Error("Error in workerPool.Store")
return errors.Wrap(err, "Error in workerPool.Store")
if s.conf.Loader != nil {
err = s.workerPool.Store(ctx)
if err != nil {
s.log.WithError(err).
Error("Error in workerPool.Store")
return errors.Wrap(err, "Error in workerPool.Store")
}
}

err = s.workerPool.Close()
Expand Down
Loading