Skip to content

Commit

Permalink
Revise leader election logic for endpoints controller
Browse files Browse the repository at this point in the history
Our leader election logic can result in updates being missed under
certain conditions. Leases expire after their duration is up, even if
their current holder has been terminated. During this dead time, any
changes in the system will be observed by other controllers, but will
not be written to the API Server.

For example, during a rollout, a controller that has come up will not be
able to acquire the lease for a maximum time of 30 seconds (lease
duration). Within this time frame, any changes to the system (e.g.
modified workloads, services, deleted endpointslices) will be observed
but not acted on by the newly created controller. Once the controller
gets into a bad state, it can only recover after 10 minutes (via service
resyncs) or if any resources are modified.

To address this, we change our leader election mechanism. Instead of
pushing leader election to the edge (i.e. when performing writes) we
only allow events to be observed when a controller is leading (i.e. by
registering callbacks). When a controller stops leading, all of its
callbacks will be de-registered.

NOTE:

* controllers will have a grace period during which they can renew their
  lease. Their callbacks will be de-registered only if this fails. We
  will not register and de-register callbacks that often for a single
  controller.
* we do not lose out on any state. Other informers will continue to run
  (e.g. destination readers). When callbacks are registered, we pass all
  of the cached objects through them. In other words, we do not issue
  API requests on registration, we process the state of the cluster as
  observed from the cache.
* we make another change that's slightly orthogonal. Before we shutdown,
  we ensure to drain the queue. This should not be a race since we will
  first block until the queue is drained, then signal to the leader
  elector loop that we are done. This gives us some confidence that all
  events have been processed as soon as they were observed.

Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid committed Feb 1, 2024
1 parent c20a8c7 commit 879b1cf
Showing 1 changed file with 100 additions and 44 deletions.
144 changes: 100 additions & 44 deletions controller/api/destination/external-workload/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package externalworkload
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
Expand All @@ -27,6 +27,7 @@ const (
leaseName = "linkerd-destination-endpoint-write"

// Duration of the lease
// Core controllers (kube-controller-manager) has a duration of 15 seconds
leaseDuration = 30 * time.Second

// Deadline for the leader to refresh its lease. Core controllers have a
Expand Down Expand Up @@ -57,10 +58,88 @@ type EndpointsController struct {
queue workqueue.RateLimitingInterface
reconciler *endpointsReconciler
stop chan struct{}
isLeader atomic.Bool

lec leaderelection.LeaderElectionConfig
sync.RWMutex
informerHandlers
}

// informerHandlers holds handles to callbacks that have been registered with
// the API Server client's informers.
//
// These callbacks will be registered when a controller is elected as leader,
// and de-registered when the lease is lost.
type informerHandlers struct {
ewHandle cache.ResourceEventHandlerRegistration
esHandle cache.ResourceEventHandlerRegistration
svcHandle cache.ResourceEventHandlerRegistration

sync.Mutex
}

// addHandlers will register a set of callbacks with the different informers
// needed to synchronise endpoint state.
func (ec *EndpointsController) addHandlers() error {
var err error
ec.Lock()
defer ec.Unlock()

ec.svcHandle, err = ec.k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return err
}

ec.esHandle, err = ec.k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return err
}

ec.ewHandle, err = ec.k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return err
}

return nil
}

// removeHandlers will de-register callbacks
func (ec *EndpointsController) removeHandlers() error {
var err error
if ec.svcHandle != nil {
if err = ec.k8sAPI.Svc().Informer().RemoveEventHandler(ec.svcHandle); err != nil {
return err
}
}

if ec.ewHandle != nil {
if err = ec.k8sAPI.ExtWorkload().Informer().RemoveEventHandler(ec.ewHandle); err != nil {
return err
}
}

if ec.esHandle != nil {
if err = ec.k8sAPI.ES().Informer().RemoveEventHandler(ec.esHandle); err != nil {
return err
}
}

return nil
}

// The EndpointsController code has been structured (and modified) based on the
Expand Down Expand Up @@ -94,38 +173,6 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
}),
}

_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return nil, err
}

// Store configuration for leader elector client. The leader elector will
// accept three callbacks. When a lease is claimed, the elector will mark
// the manager as a 'leader'. When a lease is released, the elector will set
Expand All @@ -148,15 +195,23 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
RenewDeadline: leaseRenewDeadline,
RetryPeriod: leaseRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(context.Context) {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(true)
OnStartedLeading: func(ctx context.Context) {
err := ec.addHandlers()
if err != nil {
// If the leader has failed to register callbacks then
// panic; we are in a bad state that's hard to recover from
// gracefully.
panic(fmt.Sprintf("failed to register event handlers: %v", err))
}
},
OnStoppedLeading: func() {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(false)
err := ec.removeHandlers()
if err != nil {
// If the leader has failed to de-register callbacks then
// panic; otherwise, we risk racing with the newly elected
// leader
panic(fmt.Sprintf("failed to de-register event handlers: %v", err))
}
ec.log.Infof("%s released lease", hostname)
},
OnNewLeader: func(identity string) {
Expand Down Expand Up @@ -207,8 +262,8 @@ func (ec *EndpointsController) Start() {
go func() {
// Block until a shutdown signal arrives
<-ec.stop
// Do not drain the queue since we may not hold the lease.
ec.queue.ShutDown()
// Drain the queue before signalling the lease to terminate
ec.queue.ShutDownWithDrain()
// Propagate shutdown to elector
cancel()
ec.log.Infof("received shutdown signal")
Expand Down Expand Up @@ -325,6 +380,7 @@ func (ec *EndpointsController) syncService(update string) error {

epSlices = dropEndpointSlicesPendingDeletion(epSlices)
if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", svc)
return errors.New("EndpointSlice informer cache is out of date")
}
err = ec.reconciler.reconcile(svc, ews, epSlices)
Expand Down

0 comments on commit 879b1cf

Please sign in to comment.