Skip to content

Commit

Permalink
chore: move termination queue to singleton controller (aws#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran authored Oct 5, 2023
1 parent db60c54 commit 8e6687e
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 64 deletions.
9 changes: 3 additions & 6 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ limitations under the License.
package controllers

import (
"context"

"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -44,7 +42,6 @@ import (
)

func NewControllers(
ctx context.Context,
clock clock.Clock,
kubeClient client.Client,
kubernetesInterface kubernetes.Interface,
Expand All @@ -54,10 +51,10 @@ func NewControllers(
) []controller.Controller {

p := provisioning.NewProvisioner(kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
terminator := terminator.NewTerminator(clock, kubeClient, terminator.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), recorder))
evictionQueue := terminator.NewQueue(kubernetesInterface.CoreV1(), recorder)

return []controller.Controller{
p,
p, evictionQueue,
deprovisioning.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster),
provisioning.NewController(kubeClient, p, recorder),
nodepoolhash.NewProvisionerController(kubeClient),
Expand All @@ -66,7 +63,7 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewProvisionerController(kubeClient, cluster),
informer.NewMachineController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator, recorder),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue), recorder),
metricspod.NewController(kubeClient),
metricsprovisioner.NewController(kubeClient),
metricsnode.NewController(cluster),
Expand Down
3 changes: 1 addition & 2 deletions pkg/controllers/deprovisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type Controller struct {

// pollingPeriod that we inspect cluster to look for opportunities to deprovision
const pollingPeriod = 10 * time.Second
const immediately = time.Millisecond

var errCandidateDeleting = fmt.Errorf("candidate is deleting")

Expand Down Expand Up @@ -132,7 +131,7 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("deprovisioning via %q, %w", d, err)
}
if success {
return reconcile.Result{RequeueAfter: immediately}, nil
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
}

Expand Down
39 changes: 23 additions & 16 deletions pkg/controllers/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/samber/lo"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -32,7 +31,6 @@ import (
"github.com/aws/karpenter-core/pkg/cloudprovider/fake"
"github.com/aws/karpenter-core/pkg/controllers/termination"
"github.com/aws/karpenter-core/pkg/controllers/termination/terminator"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/metrics"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
Expand All @@ -51,11 +49,12 @@ import (

var ctx context.Context
var terminationController controller.Controller
var evictionQueue *terminator.EvictionQueue
var env *test.Environment
var defaultOwnerRefs = []metav1.OwnerReference{{Kind: "ReplicaSet", APIVersion: "appsv1", Name: "rs", UID: "1234567890"}}
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -68,8 +67,9 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...), test.WithFieldIndexers(test.MachineFieldIndexer(ctx), test.NodeClaimFieldIndexer(ctx)))

cloudProvider = fake.NewCloudProvider()
evictionQueue = terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}))
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, evictionQueue), events.NewRecorder(&record.FakeRecorder{}))
recorder = test.NewEventRecorder()
queue = terminator.NewQueue(env.KubernetesInterface.CoreV1(), recorder)
terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder)
})

var _ = AfterSuite(func() {
Expand All @@ -85,6 +85,7 @@ var _ = Describe("Termination", func() {
nodeClaim, node = test.NodeClaimAndNode(v1beta1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1beta1.TerminationFinalizer}}})
machine = test.Machine(v1alpha5.Machine{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1beta1.TerminationFinalizer}}, Status: v1alpha5.MachineStatus{ProviderID: node.Spec.ProviderID}})
cloudProvider.CreatedNodeClaims[node.Spec.ProviderID] = nodeclaimutil.New(machine)
queue.Reset()
})

AfterEach(func() {
Expand Down Expand Up @@ -182,6 +183,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect node to exist and be draining
ExpectNodeDraining(env.Client, node.Name)
Expand All @@ -207,6 +209,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect pod with no owner ref to be enqueued for eviction
ExpectEvicted(env.Client, pod)
Expand Down Expand Up @@ -264,13 +267,14 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect node to exist and be draining
ExpectNodeDraining(env.Client, node.Name)

// Expect podNoEvict to fail eviction due to PDB, and be retried
Eventually(func() int {
return evictionQueue.NumRequeues(client.ObjectKeyFromObject(podNoEvict))
return queue.NumRequeues(client.ObjectKeyFromObject(podNoEvict))
}).Should(BeNumerically(">=", 1))

// Delete pod to simulate successful eviction
Expand All @@ -293,6 +297,7 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect node to exist and be draining
ExpectNodeDraining(env.Client, node.Name)
Expand All @@ -304,8 +309,10 @@ var _ = Describe("Termination", func() {
// Expect the critical pods to be evicted and deleted
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectEvicted(env.Client, podNodeCritical)
ExpectDeleted(ctx, env.Client, podNodeCritical)
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectEvicted(env.Client, podClusterCritical)
ExpectDeleted(ctx, env.Client, podClusterCritical)

Expand Down Expand Up @@ -335,9 +342,10 @@ var _ = Describe("Termination", func() {
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect mirror pod to not be queued for eviction
ExpectNotEnqueuedForEviction(evictionQueue, podNoEvict)
ExpectNotEnqueuedForEviction(queue, podNoEvict)

// Expect podEvict to be enqueued for eviction then be successful
ExpectEvicted(env.Client, podEvict)
Expand All @@ -359,16 +367,15 @@ var _ = Describe("Termination", func() {

})
It("should not delete nodes until all pods are deleted", func() {
pods := []*v1.Pod{
test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}),
test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}),
}
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, pods[0], pods[1])

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect the pods to be evicted
ExpectEvicted(env.Client, pods[0], pods[1])
Expand All @@ -393,16 +400,15 @@ var _ = Describe("Termination", func() {
ExpectNotFound(ctx, env.Client, node)
})
It("should delete nodes with no underlying instance even if not fully drained", func() {
pods := []*v1.Pod{
test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}),
test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}),
}
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, pods[0], pods[1])

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect the pods to be evicted
ExpectEvicted(env.Client, pods[0], pods[1])
Expand Down Expand Up @@ -431,6 +437,7 @@ var _ = Describe("Termination", func() {
// Before grace period, node should not delete
Expect(env.Client.Delete(ctx, node)).To(Succeed())
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectEvicted(env.Client, pod)

Expand Down Expand Up @@ -466,7 +473,7 @@ var _ = Describe("Termination", func() {
})
})

func ExpectNotEnqueuedForEviction(e *terminator.EvictionQueue, pods ...*v1.Pod) {
func ExpectNotEnqueuedForEviction(e *terminator.Queue, pods ...*v1.Pod) {
for _, pod := range pods {
ExpectWithOffset(1, e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse())
}
Expand Down
79 changes: 46 additions & 33 deletions pkg/controllers/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/aws/karpenter-core/pkg/operator/controller"

terminatorevents "github.com/aws/karpenter-core/pkg/controllers/termination/terminator/events"
"github.com/aws/karpenter-core/pkg/events"
Expand All @@ -56,61 +60,65 @@ func IsNodeDrainError(err error) bool {
return errors.As(err, &nodeDrainErr)
}

type EvictionQueue struct {
type Queue struct {
workqueue.RateLimitingInterface
set.Set

coreV1Client corev1.CoreV1Interface
recorder events.Recorder
}

func NewEvictionQueue(ctx context.Context, coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *EvictionQueue {
queue := &EvictionQueue{
func NewQueue(coreV1Client corev1.CoreV1Interface, recorder events.Recorder) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),
coreV1Client: coreV1Client,
recorder: recorder,
}
go queue.Start(logging.WithLogger(ctx, logging.FromContext(ctx).Named("eviction")))
return queue
}

// Add adds pods to the EvictionQueue
func (e *EvictionQueue) Add(pods ...*v1.Pod) {
func (q *Queue) Name() string {
return "eviction-queue"
}

func (q *Queue) Builder(_ context.Context, m manager.Manager) controller.Builder {
return controller.NewSingletonManagedBy(m)
}

// Add adds pods to the Queue
func (q *Queue) Add(pods ...*v1.Pod) {
for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !e.Set.Contains(nn) {
e.Set.Add(nn)
e.RateLimitingInterface.Add(nn)
if nn := client.ObjectKeyFromObject(pod); !q.Set.Contains(nn) {
q.Set.Add(nn)
q.RateLimitingInterface.Add(nn)
}
}
}

func (e *EvictionQueue) Start(ctx context.Context) {
for {
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := e.RateLimitingInterface.Get()
if shutdown {
break
}
nn := item.(types.NamespacedName)
// Evict pod
if e.evict(ctx, nn) {
e.RateLimitingInterface.Forget(nn)
e.Set.Remove(nn)
e.RateLimitingInterface.Done(nn)
continue
}
e.RateLimitingInterface.Done(nn)
// Requeue pod if eviction failed
e.RateLimitingInterface.AddRateLimited(nn)
func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := q.RateLimitingInterface.Get()
if shutdown {
return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown")
}
nn := item.(types.NamespacedName)
defer q.RateLimitingInterface.Done(nn)
// Evict pod
if q.Evict(ctx, nn) {
q.RateLimitingInterface.Forget(nn)
q.Set.Remove(nn)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
logging.FromContext(ctx).Errorf("EvictionQueue is broken and has shutdown")
// Requeue pod if eviction failed
q.RateLimitingInterface.AddRateLimited(nn)
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

// evict returns true if successful eviction call, and false if not an eviction-related error
func (e *EvictionQueue) evict(ctx context.Context, nn types.NamespacedName) bool {
// Evict returns true if successful eviction call, and false if not an eviction-related error
func (q *Queue) Evict(ctx context.Context, nn types.NamespacedName) bool {
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", nn))
if err := e.coreV1Client.Pods(nn.Namespace).EvictV1(ctx, &policyv1.Eviction{
if err := q.coreV1Client.Pods(nn.Namespace).EvictV1(ctx, &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace},
}); err != nil {
// status codes for the eviction API are defined here:
Expand All @@ -119,7 +127,7 @@ func (e *EvictionQueue) evict(ctx context.Context, nn types.NamespacedName) bool
return true
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
e.recorder.Publish(terminatorevents.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{
q.recorder.Publish(terminatorevents.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", nn.Namespace, nn.Name)))
Expand All @@ -128,6 +136,11 @@ func (e *EvictionQueue) evict(ctx context.Context, nn types.NamespacedName) bool
logging.FromContext(ctx).Errorf("evicting pod, %s", err)
return false
}
e.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
q.recorder.Publish(terminatorevents.EvictPod(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace}}))
return true
}

func (q *Queue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay))
q.Set = set.NewSet()
}
Loading

0 comments on commit 8e6687e

Please sign in to comment.