Skip to content

Commit

Permalink
Added finalizers addition and removal logic for nodes (#466)
Browse files Browse the repository at this point in the history
* Added finalizers addition and removal logic for nodes

* addressed comments

* added comments and checks

* Fixed termination tests
  • Loading branch information
njtran authored Jun 22, 2021
1 parent ba219bf commit e1d4251
Show file tree
Hide file tree
Showing 19 changed files with 148 additions and 177 deletions.
10 changes: 4 additions & 6 deletions pkg/apis/provisioning/v1alpha1/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ var (
OperatingSystemLabelKey = "kubernetes.io/os"

// Reserved labels
ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name"
ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace"
ProvisionerPhaseLabel = SchemeGroupVersion.Group + "/lifecycle-phase"
ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name"
ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace"
ProvisionerUnderutilizedLabelKey = SchemeGroupVersion.Group + "/underutilized"

// Reserved annotations
ProvisionerTTLKey = SchemeGroupVersion.Group + "/ttl"
Expand All @@ -113,9 +113,7 @@ var (
)

const (
ProvisionerUnderutilizedPhase = "underutilized"
ProvisionerTerminablePhase = "terminable"
ProvisionerDrainingPhase = "draining"
KarpenterFinalizer = "karpenter.sh/termination"
)

// Provisioner is the Schema for the Provisioners API
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha1/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
OperatingSystemLabelKey,
ProvisionerNameLabelKey,
ProvisionerNamespaceLabelKey,
ProvisionerPhaseLabel,
ProvisionerUnderutilizedLabelKey,
ProvisionerTTLKey,
ZoneLabelKey,
InstanceTypeLabelKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = Describe("Validation", func() {
OperatingSystemLabelKey,
ProvisionerNameLabelKey,
ProvisionerNamespaceLabelKey,
ProvisionerPhaseLabel,
ProvisionerUnderutilizedLabelKey,
ZoneLabelKey,
InstanceTypeLabelKey,
} {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.I
return c.instanceTypeProvider.Get(ctx)
}

func (c *CloudProvider) Terminate(ctx context.Context, nodes []*v1.Node) error {
return c.instanceProvider.Terminate(ctx, nodes)
func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error {
return c.instanceProvider.Terminate(ctx, node)
}

// Validate cloud provider specific components of the cluster spec
Expand Down
35 changes: 14 additions & 21 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

type InstanceProvider struct {
Expand Down Expand Up @@ -108,31 +109,23 @@ func (p *InstanceProvider) Create(ctx context.Context,
return createFleetOutput.Instances[0].InstanceIds[0], nil
}

func (p *InstanceProvider) Terminate(ctx context.Context, nodes []*v1.Node) error {
if len(nodes) == 0 {
return nil
}
ids := p.getInstanceIDs(nodes)

_, err := p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{
InstanceIds: ids,
})
func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error {
id, err := p.getInstanceID(node)
if err != nil {
return fmt.Errorf("terminating %d instances, %w", len(ids), err)
return fmt.Errorf("getting instance ID for node %s, %w", node.Name, err)
}
if _, err = p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{
InstanceIds: []*string{id},
}); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("terminating instance %s, %w", node.Name, err)
}

return nil
}

func (p *InstanceProvider) getInstanceIDs(nodes []*v1.Node) []*string {
ids := []*string{}
for _, node := range nodes {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
zap.S().Debugf("Continuing after failure to parse instance id, %s has invalid format", node.Name)
continue
}
ids = append(ids, aws.String(id[4]))
func (p *InstanceProvider) getInstanceID(node *v1.Node) (*string, error) {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
return nil, fmt.Errorf("parsing instance id %s", node.Spec.ProviderID)
}
return ids
return aws.String(id[4]), nil
}
1 change: 1 addition & 0 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning/v1alpha1/allocation"

"github.com/awslabs/karpenter/pkg/test"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ func (c *CloudProvider) Validate(context.Context, *v1alpha1.Constraints) *apis.F
return nil
}

func (c *CloudProvider) Terminate(context.Context, []*v1.Node) error {
func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type CloudProvider interface {
GetInstanceTypes(context.Context) ([]InstanceType, error)
// Validate is a hook for additional constraint validation logic specific to the cloud provider
Validate(context.Context, *v1alpha1.Constraints) *apis.FieldError
// Terminate nodes in cloudprovider
Terminate(context.Context, []*v1.Node) error
// Terminate node in cloudprovider
Terminate(context.Context, *v1.Node) error
}

// Packing is a binpacking solution of equivalently schedulable pods to a set of
Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/provisioning/v1alpha1/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,7 +33,9 @@ type Binder struct {
}

func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
// 1. Mark NodeReady=Unknown
// 1. Add the Karpenter finalizer to the node to enable the termination workflow
node.Finalizers = append(node.Finalizers, v1alpha1.KarpenterFinalizer)
// 2. Mark NodeReady=Unknown
// Unfortunately, this detail is necessary to prevent kube-scheduler from
// scheduling pods to nodes before they're created. Node Lifecycle
// Controller will attach a Effect=NoSchedule taint in response to this
Expand All @@ -44,7 +47,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
}}
// 2. Idempotently create a node. In rare cases, nodes can come online and
// 3. Idempotently create a node. In rare cases, nodes can come online and
// self register before the controller is able to register a node object
// with the API server. In the common case, we create the node object
// ourselves to enforce the binding decision and enable images to be pulled
Expand All @@ -55,7 +58,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
}
}

// 3. Bind pods
// 4. Bind pods
for _, pod := range pods {
if err := b.bind(ctx, node, pod); err != nil {
zap.S().Errorf("Continuing after failing to bind, %s", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
return reconcile.Result{}, fmt.Errorf("removing ttl from node, %w", err)
}

// 3. Mark any Node past TTL as expired
if err := c.utilization.markTerminable(ctx, provisioner); err != nil {
// 3. Delete any node past its TTL
if err := c.utilization.terminateExpired(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err)
}
return reconcile.Result{}, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/provisioning/v1alpha1/reallocation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ var _ = Describe("Reallocation", func() {

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).To(HaveKeyWithValue(v1alpha1.ProvisionerPhaseLabel, v1alpha1.ProvisionerUnderutilizedPhase))
Expect(updatedNode.Labels).To(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).To(HaveKey(v1alpha1.ProvisionerTTLKey))
})
It("should remove labels from utilized nodes", func() {
node := test.NodeWith(test.NodeOptions{
Labels: map[string]string{
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase,
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerUnderutilizedLabelKey: "true",
},
Annotations: map[string]string{
v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(100) * time.Second).Format(time.RFC3339),
Expand All @@ -124,7 +124,7 @@ var _ = Describe("Reallocation", func() {

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerPhaseLabel))
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha1.ProvisionerTTLKey))
})
})
Expand Down
27 changes: 10 additions & 17 deletions pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (u *Utilization) Reconcile(ctx context.Context, provisioner *v1alpha1.Provi
}

// 3. Mark any Node past TTL as expired
if err := u.markTerminable(ctx, provisioner); err != nil {
if err := u.terminateExpired(ctx, provisioner); err != nil {
return fmt.Errorf("marking nodes terminable, %w", err)
}
return nil
Expand Down Expand Up @@ -77,7 +77,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
persisted := node.DeepCopy()
node.Labels = functional.UnionStringMaps(
node.Labels,
map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase},
map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"},
)
node.Annotations = functional.UnionStringMaps(
node.Annotations,
Expand All @@ -94,9 +94,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
// clearUnderutilized removes the TTL on underutilized nodes if there is sufficient resource usage
func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// 1. Get underutilized nodes
nodes, err := u.getNodes(ctx, provisioner, map[string]string{
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase,
})
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"})
if err != nil {
return fmt.Errorf("listing labeled underutilized nodes, %w", err)
}
Expand All @@ -110,7 +108,7 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp

if !utilsnode.IsUnderutilized(node, pods) {
persisted := node.DeepCopy()
delete(node.Labels, v1alpha1.ProvisionerPhaseLabel)
delete(node.Labels, v1alpha1.ProvisionerUnderutilizedLabelKey)
delete(node.Annotations, v1alpha1.ProvisionerTTLKey)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
zap.S().Debugf("Could not remove underutilized labels on node %s, %w", node.Name, err)
Expand All @@ -122,24 +120,19 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp
return nil
}

// markTerminable checks if a node is past its ttl and marks it
func (u *Utilization) markTerminable(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// terminateExpired checks if a node is past its ttl and marks it
func (u *Utilization) terminateExpired(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// 1. Get underutilized nodes
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerPhaseLabel: "underutilized"})
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"})
if err != nil {
return fmt.Errorf("listing underutilized nodes, %w", err)
}

// 2. Check if node is past TTL
// 2. Delete node if past TTL
for _, node := range nodes {
if utilsnode.IsPastTTL(node) {
persisted := node.DeepCopy()
node.Labels = functional.UnionStringMaps(
node.Labels,
map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase},
)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
return fmt.Errorf("patching node %s, %w", node.Name, err)
if err := u.kubeClient.Delete(ctx, node); err != nil {
return fmt.Errorf("sending delete for node %s, %w", node.Name, err)
}
}
}
Expand Down
31 changes: 22 additions & 9 deletions pkg/controllers/terminating/v1alpha1/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"
"time"

provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/functional"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -63,18 +65,29 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface
}
}

// Reconcile executes a reallocation control loop for the resource
// Reconcile executes a termination control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) {
// 1. Cordon terminable nodes
if err := c.terminator.cordonNodes(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning terminable nodes, %w", err)
node := object.(*v1.Node)
// 1. Check if node is terminable
if node.DeletionTimestamp == nil || !functional.ContainsString(node.Finalizers, provisioning.KarpenterFinalizer) {
return reconcile.Result{}, nil
}

// 2. Drain and delete nodes
if err := c.terminator.terminateNodes(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err)
// 2. Cordon node
if err := c.terminator.cordonNode(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err)
}
// 3. Drain node
drained, err := c.terminator.drainNode(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err)
}
// 4. If fully drained, terminate the node
if drained {
if err := c.terminator.terminateNode(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err)
}
}
return reconcile.Result{}, nil
return reconcile.Result{Requeue: !drained}, nil
}

func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
Expand Down
31 changes: 8 additions & 23 deletions pkg/controllers/terminating/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ limitations under the License.
package v1alpha1

import (
"strings"
"context"
"testing"
"time"

"github.com/Pallinder/go-randomdata"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

. "github.com/awslabs/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -58,38 +55,26 @@ var _ = AfterSuite(func() {
})

var _ = Describe("Reallocation", func() {
var provisioner *v1alpha1.Provisioner

var ctx context.Context
BeforeEach(func() {
// Create Provisioner to give some time for the node to reconcile
provisioner = &v1alpha1.Provisioner{
ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName()),
Namespace: "default",
},
Spec: v1alpha1.ProvisionerSpec{
Cluster: &v1alpha1.ClusterSpec{Name: "test-cluster", Endpoint: "http://test-cluster", CABundle: "dGVzdC1jbHVzdGVyCg=="},
},
}
ctx = context.Background()
})

AfterEach(func() {
ExpectCleanedUp(env.Manager.GetClient())
})

Context("Reconciliation", func() {
It("should terminate nodes marked terminable", func() {
It("should terminate deleted nodes", func() {
node := test.NodeWith(test.NodeOptions{
Finalizers: []string{v1alpha1.KarpenterFinalizer},
Labels: map[string]string{
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase,
},
Annotations: map[string]string{
v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(-100) * time.Second).Format(time.RFC3339),
v1alpha1.ProvisionerNameLabelKey: "default",
v1alpha1.ProvisionerNamespaceLabelKey: "default",
},
})
ExpectCreatedWithStatus(env.Client, node)
ExpectCreated(env.Client, provisioner)
Expect(env.Client.Delete(ctx, node)).To(Succeed())
ExpectNotFound(env.Client, node)
})
})
Expand Down
Loading

0 comments on commit e1d4251

Please sign in to comment.