Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revise leader election logic for endpoints controller #12021

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 100 additions & 44 deletions controller/api/destination/external-workload/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package externalworkload
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
Expand All @@ -27,6 +27,7 @@ const (
leaseName = "linkerd-destination-endpoint-write"

// Duration of the lease
// Core controllers (kube-controller-manager) has a duration of 15 seconds
leaseDuration = 30 * time.Second

// Deadline for the leader to refresh its lease. Core controllers have a
Expand Down Expand Up @@ -57,10 +58,88 @@ type EndpointsController struct {
queue workqueue.RateLimitingInterface
reconciler *endpointsReconciler
stop chan struct{}
isLeader atomic.Bool

lec leaderelection.LeaderElectionConfig
sync.RWMutex
informerHandlers
}

// informerHandlers holds handles to callbacks that have been registered with
// the API Server client's informers.
//
// These callbacks will be registered when a controller is elected as leader,
// and de-registered when the lease is lost.
type informerHandlers struct {
ewHandle cache.ResourceEventHandlerRegistration
esHandle cache.ResourceEventHandlerRegistration
svcHandle cache.ResourceEventHandlerRegistration

sync.Mutex
}

// addHandlers will register a set of callbacks with the different informers
// needed to synchronise endpoint state.
func (ec *EndpointsController) addHandlers() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is going to be a problem here. Imagine the following sequence of events:

  1. Controller acquire lease.
  2. Creates endpoints for a service and external workload
  3. Drops lease removes callback
  4. Meanwhile endpoints membership is modified by another controller
  5. Our controllers aquires the lease
  6. We get an ADD for the service, external workload,etc
  7. We try to do a reconciliation, but the state of the endpoints tracker is stale because we never saw the DELETE
  8. We try to requeue but our state never recovers and we drop the update.
  9. We end up in an inconsistent state.

The problem here is that you are keeping state in the endpointsslice tracker that might change under you feet. If you look at the upstream implementation, you will notice that we start with a new endpoitnsslices tracker each time we aquire the lease.

We need to do that here as well. Keeping this state around will get us into trouble.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. The endpoint tracker does add a bit of complexity but the reasoning above is super compelling and likely to happen.

To fix, we wipe the tracker before callback registration happens. I've also added a test that exercises the scenario you described. I can get the test to fail without the aforementioned change.

var err error
ec.Lock()
defer ec.Unlock()

ec.svcHandle, err = ec.k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return err
}

ec.esHandle, err = ec.k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return err
}

ec.ewHandle, err = ec.k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return err
}

return nil
}

// removeHandlers will de-register callbacks
func (ec *EndpointsController) removeHandlers() error {
var err error
if ec.svcHandle != nil {
if err = ec.k8sAPI.Svc().Informer().RemoveEventHandler(ec.svcHandle); err != nil {
return err
}
}

if ec.ewHandle != nil {
if err = ec.k8sAPI.ExtWorkload().Informer().RemoveEventHandler(ec.ewHandle); err != nil {
return err
}
}

if ec.esHandle != nil {
if err = ec.k8sAPI.ES().Informer().RemoveEventHandler(ec.esHandle); err != nil {
return err
}
}

return nil
}

// The EndpointsController code has been structured (and modified) based on the
Expand Down Expand Up @@ -94,38 +173,6 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
}),
}

_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return nil, err
}

// Store configuration for leader elector client. The leader elector will
// accept three callbacks. When a lease is claimed, the elector will mark
// the manager as a 'leader'. When a lease is released, the elector will set
Expand All @@ -148,15 +195,23 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
RenewDeadline: leaseRenewDeadline,
RetryPeriod: leaseRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(context.Context) {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(true)
OnStartedLeading: func(ctx context.Context) {
err := ec.addHandlers()
if err != nil {
// If the leader has failed to register callbacks then
// panic; we are in a bad state that's hard to recover from
// gracefully.
panic(fmt.Sprintf("failed to register event handlers: %v", err))
}
},
OnStoppedLeading: func() {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(false)
err := ec.removeHandlers()
if err != nil {
// If the leader has failed to de-register callbacks then
// panic; otherwise, we risk racing with the newly elected
// leader
panic(fmt.Sprintf("failed to de-register event handlers: %v", err))
}
ec.log.Infof("%s released lease", hostname)
},
OnNewLeader: func(identity string) {
Expand Down Expand Up @@ -207,8 +262,8 @@ func (ec *EndpointsController) Start() {
go func() {
// Block until a shutdown signal arrives
<-ec.stop
// Do not drain the queue since we may not hold the lease.
ec.queue.ShutDown()
// Drain the queue before signalling the lease to terminate
ec.queue.ShutDownWithDrain()
Comment on lines +274 to +275
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

// Propagate shutdown to elector
cancel()
ec.log.Infof("received shutdown signal")
Expand Down Expand Up @@ -325,6 +380,7 @@ func (ec *EndpointsController) syncService(update string) error {

epSlices = dropEndpointSlicesPendingDeletion(epSlices)
if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", svc)
return errors.New("EndpointSlice informer cache is out of date")
}
err = ec.reconciler.reconcile(svc, ews, epSlices)
Expand Down
Loading