Skip to content

Commit

Permalink
Trigger termination logic on watch events (#446)
Browse files Browse the repository at this point in the history
* Trigger termination logic on watch events

* Changed watch patterns

* removed Watches() in favor of for()

* Removed Interval() from Controller interface
  • Loading branch information
njtran authored Jun 24, 2021
1 parent 446a8ee commit 687795b
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 120 deletions.
17 changes: 8 additions & 9 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,25 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
. "github.com/awslabs/karpenter/pkg/test/expectations"
"github.com/awslabs/karpenter/pkg/utils/resources"
"github.com/patrickmn/go-cache"
"knative.dev/pkg/ptr"

"github.com/Pallinder/go-randomdata"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning/v1alpha1/allocation"

"github.com/awslabs/karpenter/pkg/test"
. "github.com/awslabs/karpenter/pkg/test/expectations"
"github.com/awslabs/karpenter/pkg/utils/resources"

"github.com/Pallinder/go-randomdata"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/patrickmn/go-cache"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/ptr"
)

func TestAPIs(t *testing.T) {
Expand Down
43 changes: 19 additions & 24 deletions pkg/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@ import (
"fmt"

"github.com/awslabs/karpenter/pkg/utils/conditions"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"knative.dev/pkg/apis"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// GenericController implements controllerruntime.Reconciler and runs a
Expand All @@ -53,28 +50,26 @@ func (c *GenericController) Reconcile(ctx context.Context, req reconcile.Request
if _, ok := resource.(apis.Defaultable); ok {
resource.(apis.Defaultable).SetDefaults(ctx)
}
// 4. Set to true to remove race condition where multiple controllers set the status of the same object
// TODO: remove status conditions on provisioners
if conditionsAccessor, ok := resource.(apis.ConditionsAccessor); ok {
apis.NewLivingConditionSet(conditions.Active).Manage(conditionsAccessor).MarkTrue(conditions.Active)
// 4. Reconcile
result, err := c.Controller.Reconcile(ctx, resource)
if err != nil {
zap.S().Errorf("Controller failed to reconcile kind %s, %s", resource.GetObjectKind().GroupVersionKind().Kind, err.Error())
}
// 5. Reconcile
if _, err := c.Controller.Reconcile(ctx, resource); err != nil {
zap.S().Errorf("Controller failed to reconcile kind %s, %s",
resource.GetObjectKind().GroupVersionKind().Kind, err.Error())
return reconcile.Result{Requeue: true}, nil
// 5. Set status based on results of reconcile
if conditionsAccessor, ok := resource.(apis.ConditionsAccessor); ok {
m := apis.NewLivingConditionSet(conditions.Active).Manage(conditionsAccessor)
if err != nil {
m.MarkFalse(conditions.Active, err.Error(), "")
} else {
m.MarkTrue(conditions.Active)
}
}
// 6. Update Status using a merge patch
if err := c.Status().Patch(ctx, resource, client.MergeFrom(persisted)); err != nil {
return reconcile.Result{}, fmt.Errorf("Failed to persist changes to %s, %w", req.NamespacedName, err)
// If the controller is reconciling nodes, don't patch
if _, ok := resource.(*v1.Node); !ok {
if err := c.Status().Patch(ctx, resource, client.MergeFrom(persisted)); err != nil {
return result, fmt.Errorf("Failed to persist changes to %s, %w", req.NamespacedName, err)
}
}
return reconcile.Result{RequeueAfter: c.Interval()}, nil
}

// WatchDescription returns the necessary information to create a watch
// a. source: the resource that is being watched
// b. eventHandler: which controller objects to be reconciled
// c. predicates: which events can be filtered out before processed
func (c *GenericController) Watches(ctx context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
return c.Controller.Watches(ctx)
return result, err
}
4 changes: 0 additions & 4 deletions pkg/controllers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (m *GenericControllerManager) RegisterControllers(controllers ...Controller
controlledObject := c.For()
builder := controllerruntime.NewControllerManagedBy(m).
For(controlledObject).
Watches(c.Watches(context.Background())).
WithOptions(controller.Options{
RateLimiter: workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),
Expand All @@ -77,9 +76,6 @@ func (m *GenericControllerManager) RegisterControllers(controllers ...Controller
if namedController, ok := c.(NamedController); ok {
builder.Named(namedController.Name())
}
for _, resource := range c.Owns() {
builder = builder.Owns(resource)
}
if err := builder.Complete(&GenericController{Controller: c, Client: m.GetClient()}); err != nil {
panic(fmt.Sprintf("Failed to register controller to manager for %s", controlledObject))
}
Expand Down
22 changes: 3 additions & 19 deletions pkg/controllers/provisioning/v1alpha1/allocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@ import (
"github.com/awslabs/karpenter/pkg/utils/apiobject"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// Controller for the resource
Expand All @@ -49,11 +44,6 @@ func (c *Controller) For() client.Object {
return &v1alpha1.Provisioner{}
}

// Owns returns the resources owned by this controller's resource.
func (c *Controller) Owns() []client.Object {
return nil
}

func (c *Controller) Interval() time.Duration {
return 5 * time.Second
}
Expand Down Expand Up @@ -82,7 +72,7 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
return reconcile.Result{}, fmt.Errorf("filtering pods, %w", err)
}
if len(pods) == 0 {
return reconcile.Result{}, nil
return reconcile.Result{RequeueAfter: c.Interval()}, nil
}
zap.S().Infof("Found %d provisionable pods", len(pods))

Expand All @@ -108,18 +98,12 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
return reconcile.Result{}, fmt.Errorf("creating capacity, %w", err)
}

// 4. Bind pods to nodes
// 5. Bind pods to nodes
for _, packedNode := range packedNodes {
zap.S().Infof("Binding pods %v to node %s", apiobject.PodNamespacedNames(packedNode.Pods), packedNode.Node.Name)
if err := c.binder.Bind(ctx, packedNode.Node, packedNode.Pods); err != nil {
zap.S().Errorf("Continuing after failing to bind, %s", err.Error())
}
}
return reconcile.Result{}, nil
}

func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
return &source.Kind{Type: &v1.Pod{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { return false }))
return reconcile.Result{RequeueAfter: c.Interval()}, nil
}
18 changes: 1 addition & 17 deletions pkg/controllers/provisioning/v1alpha1/reallocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,9 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// Controller for the resource
Expand All @@ -43,11 +38,6 @@ func (c *Controller) For() client.Object {
return &v1alpha1.Provisioner{}
}

// Owns returns the resources owned by this controller's resource.
func (c *Controller) Owns() []client.Object {
return nil
}

func (c *Controller) Interval() time.Duration {
return 5 * time.Second
}
Expand Down Expand Up @@ -81,11 +71,5 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
if err := c.utilization.terminateExpired(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err)
}
return reconcile.Result{}, nil
}

func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
return &source.Kind{Type: &v1.Pod{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { return false }))
return reconcile.Result{RequeueAfter: c.Interval()}, nil
}
26 changes: 3 additions & 23 deletions pkg/controllers/terminating/v1alpha1/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@ package v1alpha1
import (
"context"
"fmt"
"time"

provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/functional"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// Controller for the resource
Expand All @@ -44,15 +39,6 @@ func (c *Controller) For() client.Object {
return &v1.Node{}
}

// Owns returns the resources owned by this controller's resource.
func (c *Controller) Owns() []client.Object {
return nil
}

func (c *Controller) Interval() time.Duration {
return 5 * time.Second
}

func (c *Controller) Name() string {
return "terminator"
}
Expand All @@ -73,25 +59,19 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
return reconcile.Result{}, nil
}
// 2. Cordon node
if err := c.terminator.cordonNode(ctx, node); err != nil {
if err := c.terminator.cordon(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err)
}
// 3. Drain node
drained, err := c.terminator.drainNode(ctx, node)
drained, err := c.terminator.drain(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err)
}
// 4. If fully drained, terminate the node
if drained {
if err := c.terminator.terminateNode(ctx, node); err != nil {
if err := c.terminator.terminate(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err)
}
}
return reconcile.Result{Requeue: !drained}, nil
}

func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
return &source.Kind{Type: &v1.Node{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { return false }))
}
3 changes: 2 additions & 1 deletion pkg/controllers/terminating/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ var _ = AfterSuite(func() {
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = Describe("Reallocation", func() {
var _ = Describe("Termination", func() {
var ctx context.Context

BeforeEach(func() {
ctx = context.Background()
})
Expand Down
13 changes: 6 additions & 7 deletions pkg/controllers/terminating/v1alpha1/terminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type Terminator struct {
coreV1Client corev1.CoreV1Interface
}

// cordonNode cordons a node
func (t *Terminator) cordonNode(ctx context.Context, node *v1.Node) error {
// cordon cordons a node
func (t *Terminator) cordon(ctx context.Context, node *v1.Node) error {
if node.Spec.Unschedulable {
return nil
}
Expand All @@ -52,8 +52,8 @@ func (t *Terminator) cordonNode(ctx context.Context, node *v1.Node) error {
return nil
}

// drainNode evicts pods from the node and returns true when fully drained
func (t *Terminator) drainNode(ctx context.Context, node *v1.Node) (bool, error) {
// drain evicts pods from the node and returns true when fully drained
func (t *Terminator) drain(ctx context.Context, node *v1.Node) (bool, error) {
// 1. Get pods on node
pods, err := t.getPods(ctx, node)
if err != nil {
Expand All @@ -76,8 +76,8 @@ func (t *Terminator) drainNode(ctx context.Context, node *v1.Node) (bool, error)
return empty, nil
}

// terminateNode terminates the node then removes the finalizer to delete the node
func (t *Terminator) terminateNode(ctx context.Context, node *v1.Node) error {
// terminate terminates the node then removes the finalizer to delete the node
func (t *Terminator) terminate(ctx context.Context, node *v1.Node) error {
// 1. Terminate instance associated with node
if err := t.cloudProvider.Terminate(ctx, node); err != nil {
return fmt.Errorf("terminating cloudprovider instance, %w", err)
Expand All @@ -89,7 +89,6 @@ func (t *Terminator) terminateNode(ctx context.Context, node *v1.Node) error {
if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
return fmt.Errorf("removing finalizer from node %s, %w", node.Name, err)
}
zap.S().Debugf("Deleted node %s", node.Name)
return nil
}

Expand Down
16 changes: 0 additions & 16 deletions pkg/controllers/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@ package controllers

import (
"context"
"time"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// Controller is an interface implemented by Karpenter custom resources.
Expand All @@ -32,21 +28,9 @@ type Controller interface {
// reconciliation. Any changes made to the resource's status are persisted
// after Reconcile returns, even if it returns an error.
Reconcile(context.Context, client.Object) (reconcile.Result, error)
// Interval returns an interval that the controller should wait before
// executing another reconciliation loop. If set to zero, will only execute
// on watch events or the global resync interval.
Interval() time.Duration
// For returns a default instantiation of the resource and is injected by
// data from the API Server at the start of the reconciliation loop.
For() client.Object
// Owns returns a slice of resources that are watched by this resources.
// Watch events are triggered if owner references are set on the resource.
Owns() []client.Object
// WatchDescription returns the necessary information to create a watch
// a. Source: the resource that is being watched
// b. EventHandler: which controller objects to be reconciled
// c. WatchesOption: which events can be filtered out before processed
Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption)
}

// NamedController allows controllers to optionally implement a Name() function which will be used instead of the
Expand Down

0 comments on commit 687795b

Please sign in to comment.