Skip to content

Commit

Permalink
Revise leader election logic for endpoints controller (#12021)
Browse files Browse the repository at this point in the history
Revise leader election logic for endpoints controller

Our leader election logic can result in updates being missed under certain
conditions. Leases expire after their duration is up, even if their current
holder has been terminated. During this dead time, any changes in the
system will be observed by other controllers, but will not be written to
the API Server.

For example, during a rollout, a controller that has come up will not be
able to acquire the lease for a maximum time of 30 seconds (lease
duration). Within this time frame, any changes to the system (e.g. modified
workloads, services, deleted endpointslices) will be observed but not acted
on by the newly created controller. Once the controller gets into a bad
state, it can only recover after 10 minutes (via service resyncs) or if any
resources are modified.

To address this, we change our leader election mechanism. Instead of
pushing leader election to the edge (i.e. when performing writes) we only
allow events to be observed when a controller is leading (i.e. by
registering callbacks). When a controller stops leading, all of its
callbacks will be de-registered.

NOTE:

 * controllers will have a grace period during which they can renew their
   lease. Their callbacks will be de-registered only if this fails. We will
   not register and de-register callbacks that often for a single
   controller.
 * we do not lose out on any state. Other informers will continue to run
   (e.g. destination readers). When callbacks are registered, we pass all of
   the cached objects through them. In other words, we do not issue API
   requests on registration, we process the state of the cluster as observed
   from the cache.
 * we make another change that's slightly orthogonal. Before we shutdown,
   we ensure to drain the queue. This should not be a race since we will
   first block until the queue is drained, then signal to the leader elector
   loop that we are done. This gives us some confidence that all events have
   been processed as soon as they were observed.

Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid authored Feb 1, 2024
1 parent 3902b33 commit d4f99b3
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 44 deletions.
153 changes: 109 additions & 44 deletions controller/api/destination/external-workload/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package externalworkload
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

ewv1alpha1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1alpha1"
Expand All @@ -20,13 +20,15 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/workqueue"
endpointslicerec "k8s.io/endpointslice"
epsliceutil "k8s.io/endpointslice/util"
)

const (
// Name of the lease resource the controller will use
leaseName = "linkerd-destination-endpoint-write"

// Duration of the lease
// Core controllers (kube-controller-manager) has a duration of 15 seconds
leaseDuration = 30 * time.Second

// Deadline for the leader to refresh its lease. Core controllers have a
Expand Down Expand Up @@ -57,10 +59,24 @@ type EndpointsController struct {
queue workqueue.RateLimitingInterface
reconciler *endpointsReconciler
stop chan struct{}
isLeader atomic.Bool

lec leaderelection.LeaderElectionConfig
sync.RWMutex
informerHandlers
}

// informerHandlers holds handles to callbacks that have been registered with
// the API Server client's informers.
//
// These callbacks will be registered when a controller is elected as leader,
// and de-registered when the lease is lost.
type informerHandlers struct {
ewHandle cache.ResourceEventHandlerRegistration
esHandle cache.ResourceEventHandlerRegistration
svcHandle cache.ResourceEventHandlerRegistration

// Mutex to guard handler registration since the elector loop may start
// executing callbacks when a controller starts reading in a background task
sync.Mutex
}

// The EndpointsController code has been structured (and modified) based on the
Expand Down Expand Up @@ -94,38 +110,6 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
}),
}

_, err := k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return nil, err
}

_, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return nil, err
}

// Store configuration for leader elector client. The leader elector will
// accept three callbacks. When a lease is claimed, the elector will mark
// the manager as a 'leader'. When a lease is released, the elector will set
Expand All @@ -148,15 +132,23 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
RenewDeadline: leaseRenewDeadline,
RetryPeriod: leaseRetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(context.Context) {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(true)
OnStartedLeading: func(ctx context.Context) {
err := ec.addHandlers()
if err != nil {
// If the leader has failed to register callbacks then
// panic; we are in a bad state that's hard to recover from
// gracefully.
panic(fmt.Sprintf("failed to register event handlers: %v", err))
}
},
OnStoppedLeading: func() {
ec.Lock()
defer ec.Unlock()
ec.isLeader.Store(false)
err := ec.removeHandlers()
if err != nil {
// If the leader has failed to de-register callbacks then
// panic; otherwise, we risk racing with the newly elected
// leader
panic(fmt.Sprintf("failed to de-register event handlers: %v", err))
}
ec.log.Infof("%s released lease", hostname)
},
OnNewLeader: func(identity string) {
Expand All @@ -170,6 +162,78 @@ func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stop
return ec, nil
}

// addHandlers will register a set of callbacks with the different informers
// needed to synchronise endpoint state.
func (ec *EndpointsController) addHandlers() error {
var err error
ec.Lock()
defer ec.Unlock()

// Wipe out previously observed state. This ensures we will not have stale
// cache errors due to events that happened when callbacks were not firing.
ec.reconciler.endpointTracker = epsliceutil.NewEndpointSliceTracker()

ec.svcHandle, err = ec.k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onServiceUpdate,
DeleteFunc: ec.onServiceUpdate,
UpdateFunc: func(_, newObj interface{}) {
ec.onServiceUpdate(newObj)
},
})

if err != nil {
return err
}

ec.esHandle, err = ec.k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onEndpointSliceAdd,
UpdateFunc: ec.onEndpointSliceUpdate,
DeleteFunc: ec.onEndpointSliceDelete,
})

if err != nil {
return err
}

ec.ewHandle, err = ec.k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ec.onAddExternalWorkload,
DeleteFunc: ec.onDeleteExternalWorkload,
UpdateFunc: ec.onUpdateExternalWorkload,
})

if err != nil {
return err
}

return nil
}

// removeHandlers will de-register callbacks
func (ec *EndpointsController) removeHandlers() error {
var err error
ec.Lock()
defer ec.Unlock()
if ec.svcHandle != nil {
if err = ec.k8sAPI.Svc().Informer().RemoveEventHandler(ec.svcHandle); err != nil {
return err
}
}

if ec.ewHandle != nil {
if err = ec.k8sAPI.ExtWorkload().Informer().RemoveEventHandler(ec.ewHandle); err != nil {
return err
}
}

if ec.esHandle != nil {
if err = ec.k8sAPI.ES().Informer().RemoveEventHandler(ec.esHandle); err != nil {
return err
}
}

return nil
}

// Start will run the endpoint manager's processing loop and leader elector.
//
// The function will spawn three background tasks; one to run the leader elector
Expand Down Expand Up @@ -207,8 +271,8 @@ func (ec *EndpointsController) Start() {
go func() {
// Block until a shutdown signal arrives
<-ec.stop
// Do not drain the queue since we may not hold the lease.
ec.queue.ShutDown()
// Drain the queue before signalling the lease to terminate
ec.queue.ShutDownWithDrain()
// Propagate shutdown to elector
cancel()
ec.log.Infof("received shutdown signal")
Expand Down Expand Up @@ -325,6 +389,7 @@ func (ec *EndpointsController) syncService(update string) error {

epSlices = dropEndpointSlicesPendingDeletion(epSlices)
if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", svc)
return errors.New("EndpointSlice informer cache is out of date")
}
err = ec.reconciler.reconcile(svc, ews, epSlices)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,81 @@ func TestWorkloadServicesToUpdate(t *testing.T) {
}
}

// Assert that de-registering handlers won't result in cache staleness issues
//
// The test will simulate a scenario where a lease is acquired, an endpointslice
// created, and the lease is lost. Without wiping out state, this test will
// fail, since any changes made to the resources will not be observed while the
// lease is not held; these changes will result in stale cache entries (since
// the state diverged).
func TestLeaderElectionSyncsState(t *testing.T) {
client, actions, esController := newController(t)
ns := "test-ns"
service := createService(t, esController, ns, "test-svc")
ew1 := newExternalWorkload(1, ns, false, true)
esController.serviceStore.Add(service)
esController.externalWorkloadsStore.Add(ew1)

// Simulate a lease being acquired,
err := esController.addHandlers()
if err != nil {
t.Fatalf("unexpected error when registering client-go callbacks: %v", err)
}

err = esController.syncService(fmt.Sprintf("%s/%s", ns, service.Name))
if err != nil {
t.Fatalf("unexpected error when processing service %s/%s: %v", ns, service.Name, err)
}
expectActions(t, actions(), 1, "create", "endpointslices")

slices, err := client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("expected no error fetching endpoint slices, got: %s", err)
}
if len(slices.Items) != 1 {
t.Errorf("expected 1 endpoint slices, got: %d", len(slices.Items))
}
sliceName := slices.Items[0].Name

// Simulate a lease being lost; we delete the previously created
// endpointslice out-of-band.
err = esController.removeHandlers()
if err != nil {
t.Fatalf("unexpected error when de-registering client-go callbacks: %v", err)
}
err = client.Client.DiscoveryV1().EndpointSlices(ns).Delete(context.TODO(), sliceName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("unexpected error when deleting endpointslice %s/%s: %v", ns, sliceName, err)
}
slices, err = client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("expected no error fetching endpoint slices, got: %s", err)
}
if len(slices.Items) != 0 {
t.Errorf("expected 0 endpoint slices, got: %d", len(slices.Items))
}

// The lease is re-acquired. We should start with a clean slate to avoid
// cache staleness errors.
esController.addHandlers()
err = esController.syncService(fmt.Sprintf("%s/%s", ns, service.Name))
if err != nil {
t.Fatalf("unexpected error when processing service %s/%s: %v", ns, service.Name, err)
}
expectActions(t, actions(), 1, "create", "endpointslices")
slices, err = client.Client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Errorf("expected no error fetching endpoint slices, got: %s", err)
}
if len(slices.Items) != 1 {
t.Errorf("expected 1 endpoint slices, got: %d", len(slices.Items))
}
if slices.Items[0].Name == sliceName {
t.Fatalf("expected newly created slice's name to be different than the initial slice, got: %s", sliceName)
}

}

// protoPtr takes a Protocol and returns a pointer to it.
func protoPtr(proto v1.Protocol) *v1.Protocol {
return &proto
Expand Down

0 comments on commit d4f99b3

Please sign in to comment.