Skip to content

Commit

Permalink
Switch away from using errChan in the manager
Browse files Browse the repository at this point in the history
Instead of using an error channel in the manager, we just use a mutex
and store one error.  The "waiting" is done by blocking on a channel
which gets closed to signal errors.

This achives the same effect (only return one error) without having the
chance of blocking goroutines; with the old code, goroutines could block
trying to return their results (especially since a few tried to return
nil results), and since runnables can be added after start, there's no
way to appropriately size the channel to avoid this happening (plus no
point, since we only report the first error anyway).

We also only report errors when the occurred, never signaling for errors with
a nil error value.
  • Loading branch information
DirectXMan12 committed Oct 29, 2019
1 parent 0fdf465 commit fe4ada0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
82 changes: 70 additions & 12 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ type controllerManager struct {
started bool
startedLeader bool
healthzStarted bool
errChan chan error

// NB(directxman12): we don't just use an error channel here to avoid the situation where the
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
// errSignal lets us track when we should stop because an error occurred
errSignal *errSignaler

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
Expand Down Expand Up @@ -150,6 +154,51 @@ type controllerManager struct {
retryPeriod time.Duration
}

type errSignaler struct {
// errSignal indicates that an error occurred, when closed. It shouldn't
// be written to.
errSignal chan struct{}

// err is the received error
err error

mu sync.Mutex
}

func (r *errSignaler) SignalError(err error) {
r.mu.Lock()
defer r.mu.Unlock()

if err == nil {
// non-error, ignore
log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring")
return
}

if r.err != nil {
// we already have an error, don't try again
return
}

// save the error and report it
r.err = err
close(r.errSignal)
}

func (r *errSignaler) Error() error {
r.mu.Lock()
defer r.mu.Unlock()

return r.err
}

func (r *errSignaler) GotError() chan struct{} {
r.mu.Lock()
defer r.mu.Unlock()

return r.errSignal
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
Expand All @@ -174,7 +223,9 @@ func (cm *controllerManager) Add(r Runnable) error {
if shouldStart {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.internalStop)
if err := r.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()
}

Expand Down Expand Up @@ -304,15 +355,15 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
go func() {
log.Info("starting metrics server", "path", metricsPath)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
cm.errChan <- err
cm.errSignal.SignalError(err)
}
}()

// Shutdown the server when stop is closed
select {
case <-stop:
if err := server.Shutdown(context.Background()); err != nil {
cm.errChan <- err
cm.errSignal.SignalError(err)
}
}
}
Expand All @@ -334,7 +385,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
// Run server
go func() {
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
cm.errChan <- err
cm.errSignal.SignalError(err)
}
}()
cm.healthzStarted = true
Expand All @@ -344,7 +395,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
select {
case <-stop:
if err := server.Shutdown(context.Background()); err != nil {
cm.errChan <- err
cm.errSignal.SignalError(err)
}
}
}
Expand All @@ -353,6 +404,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)

// initialize this here so that we reset the signal channel state on every start
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}

// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused)
Expand Down Expand Up @@ -380,9 +434,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
case <-stop:
// We are done
return nil
case err := <-cm.errChan:
case <-cm.errSignal.GotError():
// Error starting a controller
return err
return cm.errSignal.Error()
}
}

Expand All @@ -398,7 +452,9 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()
}
}
Expand All @@ -415,7 +471,9 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.internalStop)
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()
}

Expand All @@ -433,7 +491,7 @@ func (cm *controllerManager) waitForCache() {
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
cm.errSignal.SignalError(err)
}
}()

Expand All @@ -457,7 +515,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errChan <- fmt.Errorf("leader election lost")
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
},
},
})
Expand Down
1 change: 0 additions & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ func New(config *rest.Config, options Options) (Manager, error) {
return &controllerManager{
config: config,
scheme: options.Scheme,
errChan: make(chan error),
cache: cache,
fieldIndexes: cache,
client: writeObj,
Expand Down
6 changes: 4 additions & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ var _ = Describe("manger.Manager", func() {
mgr.startCache = func(stop <-chan struct{}) error {
return fmt.Errorf("expected error")
}
Expect(m.Start(stop).Error()).To(ContainSubstring("expected error"))
Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error")))

close(done)
})
Expand Down Expand Up @@ -314,7 +314,9 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Start(stop)).NotTo(HaveOccurred())
// NB(directxman12): this should definitely return an error. If it doesn't happen,
// it means someone was signaling "stop: error" with a nil "error".
Expect(m.Start(stop)).NotTo(Succeed())
close(done)
}()
<-c1
Expand Down

0 comments on commit fe4ada0

Please sign in to comment.