Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added finalizers addition and removal logic for nodes #466

Merged
merged 4 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions pkg/apis/provisioning/v1alpha1/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ var (
OperatingSystemLabelKey = "kubernetes.io/os"

// Reserved labels
ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name"
ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace"
ProvisionerPhaseLabel = SchemeGroupVersion.Group + "/lifecycle-phase"
ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name"
ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace"
ProvisionerUnderutilizedLabelKey = SchemeGroupVersion.Group + "/underutilized"

// Reserved annotations
ProvisionerTTLKey = SchemeGroupVersion.Group + "/ttl"
Expand All @@ -113,9 +113,7 @@ var (
)

const (
ProvisionerUnderutilizedPhase = "underutilized"
ProvisionerTerminablePhase = "terminable"
ProvisionerDrainingPhase = "draining"
KarpenterFinalizer = "karpenter.sh/termination"
)

// Provisioner is the Schema for the Provisioners API
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/provisioning/v1alpha1/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
OperatingSystemLabelKey,
ProvisionerNameLabelKey,
ProvisionerNamespaceLabelKey,
ProvisionerPhaseLabel,
ProvisionerUnderutilizedLabelKey,
ProvisionerTTLKey,
ZoneLabelKey,
InstanceTypeLabelKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = Describe("Validation", func() {
OperatingSystemLabelKey,
ProvisionerNameLabelKey,
ProvisionerNamespaceLabelKey,
ProvisionerPhaseLabel,
ProvisionerUnderutilizedLabelKey,
ZoneLabelKey,
InstanceTypeLabelKey,
} {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.I
return c.instanceTypeProvider.Get(ctx)
}

func (c *CloudProvider) Terminate(ctx context.Context, nodes []*v1.Node) error {
return c.instanceProvider.Terminate(ctx, nodes)
func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error {
return c.instanceProvider.Terminate(ctx, node)
}

// Validate cloud provider specific components of the cluster spec
Expand Down
35 changes: 14 additions & 21 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)

type InstanceProvider struct {
Expand Down Expand Up @@ -108,31 +109,23 @@ func (p *InstanceProvider) Create(ctx context.Context,
return createFleetOutput.Instances[0].InstanceIds[0], nil
}

func (p *InstanceProvider) Terminate(ctx context.Context, nodes []*v1.Node) error {
if len(nodes) == 0 {
return nil
}
ids := p.getInstanceIDs(nodes)

_, err := p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{
InstanceIds: ids,
})
func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error {
id, err := p.getInstanceID(node)
if err != nil {
return fmt.Errorf("terminating %d instances, %w", len(ids), err)
return fmt.Errorf("getting instance ID for node %s, %w", node.Name, err)
JacobGabrielson marked this conversation as resolved.
Show resolved Hide resolved
}
if _, err = p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{
InstanceIds: []*string{id},
}); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("terminating instance %s, %w", node.Name, err)
}

return nil
}

func (p *InstanceProvider) getInstanceIDs(nodes []*v1.Node) []*string {
ids := []*string{}
for _, node := range nodes {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
zap.S().Debugf("Continuing after failure to parse instance id, %s has invalid format", node.Name)
continue
}
ids = append(ids, aws.String(id[4]))
func (p *InstanceProvider) getInstanceID(node *v1.Node) (*string, error) {
id := strings.Split(node.Spec.ProviderID, "/")
if len(id) < 5 {
return nil, fmt.Errorf("parsing instance id %s", node.Spec.ProviderID)
}
return ids
return aws.String(id[4]), nil
}
1 change: 1 addition & 0 deletions pkg/cloudprovider/aws/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/provisioning/v1alpha1/allocation"

"github.com/awslabs/karpenter/pkg/test"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ func (c *CloudProvider) Validate(context.Context, *v1alpha1.Constraints) *apis.F
return nil
}

func (c *CloudProvider) Terminate(context.Context, []*v1.Node) error {
func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error {
return nil
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type CloudProvider interface {
GetInstanceTypes(context.Context) ([]InstanceType, error)
// Validate is a hook for additional constraint validation logic specific to the cloud provider
Validate(context.Context, *v1alpha1.Constraints) *apis.FieldError
// Terminate nodes in cloudprovider
Terminate(context.Context, []*v1.Node) error
// Terminate node in cloudprovider
Terminate(context.Context, *v1.Node) error
}

// Packing is a binpacking solution of equivalently schedulable pods to a set of
Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/provisioning/v1alpha1/allocation/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -32,7 +33,9 @@ type Binder struct {
}

func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error {
// 1. Mark NodeReady=Unknown
// 1. Add the Karpenter finalizer to the node to enable the termination workflow
node.Finalizers = append(node.Finalizers, v1alpha1.KarpenterFinalizer)
njtran marked this conversation as resolved.
Show resolved Hide resolved
njtran marked this conversation as resolved.
Show resolved Hide resolved
// 2. Mark NodeReady=Unknown
// Unfortunately, this detail is necessary to prevent kube-scheduler from
// scheduling pods to nodes before they're created. Node Lifecycle
// Controller will attach a Effect=NoSchedule taint in response to this
Expand All @@ -44,7 +47,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
}}
// 2. Idempotently create a node. In rare cases, nodes can come online and
// 3. Idempotently create a node. In rare cases, nodes can come online and
// self register before the controller is able to register a node object
// with the API server. In the common case, we create the node object
// ourselves to enforce the binding decision and enable images to be pulled
Expand All @@ -55,7 +58,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error
}
}

// 3. Bind pods
// 4. Bind pods
for _, pod := range pods {
if err := b.bind(ctx, node, pod); err != nil {
zap.S().Errorf("Continuing after failing to bind, %s", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon
return reconcile.Result{}, fmt.Errorf("removing ttl from node, %w", err)
}

// 3. Mark any Node past TTL as expired
if err := c.utilization.markTerminable(ctx, provisioner); err != nil {
// 3. Delete any node past its TTL
if err := c.utilization.terminateExpired(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err)
}
return reconcile.Result{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ var _ = Describe("Reallocation", func() {

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).To(HaveKeyWithValue(v1alpha1.ProvisionerPhaseLabel, v1alpha1.ProvisionerUnderutilizedPhase))
Expect(updatedNode.Labels).To(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).To(HaveKey(v1alpha1.ProvisionerTTLKey))
})
It("should remove labels from utilized nodes", func() {
node := test.NodeWith(test.NodeOptions{
Labels: map[string]string{
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase,
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerUnderutilizedLabelKey: "true",
},
Annotations: map[string]string{
v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(100) * time.Second).Format(time.RFC3339),
Expand All @@ -124,7 +124,7 @@ var _ = Describe("Reallocation", func() {

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerPhaseLabel))
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha1.ProvisionerTTLKey))
})
})
Expand Down
27 changes: 10 additions & 17 deletions pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (u *Utilization) Reconcile(ctx context.Context, provisioner *v1alpha1.Provi
}

// 3. Mark any Node past TTL as expired
if err := u.markTerminable(ctx, provisioner); err != nil {
if err := u.terminateExpired(ctx, provisioner); err != nil {
return fmt.Errorf("marking nodes terminable, %w", err)
}
return nil
Expand Down Expand Up @@ -77,7 +77,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
persisted := node.DeepCopy()
node.Labels = functional.UnionStringMaps(
node.Labels,
map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase},
map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"},
)
node.Annotations = functional.UnionStringMaps(
node.Annotations,
Expand All @@ -94,9 +94,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
// clearUnderutilized removes the TTL on underutilized nodes if there is sufficient resource usage
func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// 1. Get underutilized nodes
nodes, err := u.getNodes(ctx, provisioner, map[string]string{
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase,
})
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"})
if err != nil {
return fmt.Errorf("listing labeled underutilized nodes, %w", err)
}
Expand All @@ -110,7 +108,7 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp

if !utilsnode.IsUnderutilized(node, pods) {
persisted := node.DeepCopy()
delete(node.Labels, v1alpha1.ProvisionerPhaseLabel)
delete(node.Labels, v1alpha1.ProvisionerUnderutilizedLabelKey)
delete(node.Annotations, v1alpha1.ProvisionerTTLKey)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
zap.S().Debugf("Could not remove underutilized labels on node %s, %w", node.Name, err)
Expand All @@ -122,24 +120,19 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp
return nil
}

// markTerminable checks if a node is past its ttl and marks it
func (u *Utilization) markTerminable(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// terminateExpired checks if a node is past its ttl and marks it
func (u *Utilization) terminateExpired(ctx context.Context, provisioner *v1alpha1.Provisioner) error {
// 1. Get underutilized nodes
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerPhaseLabel: "underutilized"})
nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"})
if err != nil {
return fmt.Errorf("listing underutilized nodes, %w", err)
}

// 2. Check if node is past TTL
// 2. Delete node if past TTL
for _, node := range nodes {
if utilsnode.IsPastTTL(node) {
persisted := node.DeepCopy()
node.Labels = functional.UnionStringMaps(
node.Labels,
map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase},
)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
return fmt.Errorf("patching node %s, %w", node.Name, err)
if err := u.kubeClient.Delete(ctx, node); err != nil {
return fmt.Errorf("sending delete for node %s, %w", node.Name, err)
}
}
}
Expand Down
31 changes: 22 additions & 9 deletions pkg/controllers/terminating/v1alpha1/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"fmt"
"time"

provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider"
"github.com/awslabs/karpenter/pkg/utils/functional"

v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -63,18 +65,29 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface
}
}

// Reconcile executes a reallocation control loop for the resource
// Reconcile executes a termination control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) {
// 1. Cordon terminable nodes
if err := c.terminator.cordonNodes(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning terminable nodes, %w", err)
node := object.(*v1.Node)
// 1. Check if node is terminable
if node.DeletionTimestamp == nil || !functional.ContainsString(node.Finalizers, provisioning.KarpenterFinalizer) {
return reconcile.Result{}, nil
}

// 2. Drain and delete nodes
if err := c.terminator.terminateNodes(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err)
// 2. Cordon node
if err := c.terminator.cordonNode(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err)
}
// 3. Drain node
drained, err := c.terminator.drainNode(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err)
}
// 4. If fully drained, terminate the node
if drained {
njtran marked this conversation as resolved.
Show resolved Hide resolved
if err := c.terminator.terminateNode(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err)
}
}
return reconcile.Result{}, nil
return reconcile.Result{Requeue: !drained}, nil
}

func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) {
Expand Down
31 changes: 8 additions & 23 deletions pkg/controllers/terminating/v1alpha1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ limitations under the License.
package v1alpha1

import (
"strings"
"context"
"testing"
"time"

"github.com/Pallinder/go-randomdata"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1"
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/test"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"

. "github.com/awslabs/karpenter/pkg/test/expectations"
Expand Down Expand Up @@ -58,38 +55,26 @@ var _ = AfterSuite(func() {
})

var _ = Describe("Reallocation", func() {
var provisioner *v1alpha1.Provisioner

var ctx context.Context
BeforeEach(func() {
// Create Provisioner to give some time for the node to reconcile
provisioner = &v1alpha1.Provisioner{
ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName()),
Namespace: "default",
},
Spec: v1alpha1.ProvisionerSpec{
Cluster: &v1alpha1.ClusterSpec{Name: "test-cluster", Endpoint: "http://test-cluster", CABundle: "dGVzdC1jbHVzdGVyCg=="},
},
}
ctx = context.Background()
})

AfterEach(func() {
ExpectCleanedUp(env.Manager.GetClient())
})

Context("Reconciliation", func() {
It("should terminate nodes marked terminable", func() {
It("should terminate deleted nodes", func() {
node := test.NodeWith(test.NodeOptions{
Finalizers: []string{v1alpha1.KarpenterFinalizer},
Labels: map[string]string{
v1alpha1.ProvisionerNameLabelKey: provisioner.Name,
v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace,
v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase,
},
Annotations: map[string]string{
v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(-100) * time.Second).Format(time.RFC3339),
v1alpha1.ProvisionerNameLabelKey: "default",
v1alpha1.ProvisionerNamespaceLabelKey: "default",
},
})
ExpectCreatedWithStatus(env.Client, node)
ExpectCreated(env.Client, provisioner)
Expect(env.Client.Delete(ctx, node)).To(Succeed())
ExpectNotFound(env.Client, node)
})
})
Expand Down
Loading