Skip to content

Commit

Permalink
Taint nodes on spot and scheduled events
Browse files Browse the repository at this point in the history
Fixes aws#160.

Signed-off-by: Ilya Shaisultanov <[email protected]>
  • Loading branch information
diversario committed May 15, 2020
1 parent b647783 commit 77ff34a
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 8 deletions.
1 change: 1 addition & 0 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func watchForCancellationEvents(cancelChan <-chan interruptionevent.Interruption
log.Log().Msgf("Uncordoning the node failed: %v", err)
}
node.RemoveNTHLabels()
node.CleanAllTaints()
} else {
log.Log().Msg("Another interruption event is active, not uncordoning the node")
}
Expand Down
14 changes: 10 additions & 4 deletions pkg/interruptionevent/scheduled-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,26 @@ func checkForScheduledEvents(imds *ec2metadata.Service) ([]InterruptionEvent, er
return events, nil
}

func uncordonAfterRebootPreDrain(interruptionEvent InterruptionEvent, node node.Node) error {
err := node.MarkWithEventID(interruptionEvent.EventID)
func uncordonAfterRebootPreDrain(interruptionEvent InterruptionEvent, n node.Node) error {
err := n.MarkWithEventID(interruptionEvent.EventID)
if err != nil {
return fmt.Errorf("Unable to mark node with event ID: %w", err)
}

err = n.TaintScheduledMaintenance(interruptionEvent.EventID)
if err != nil {
return fmt.Errorf("Unable to taint node with taint %s:%s: %w", node.ScheduledMaintenanceTaint, interruptionEvent.EventID, err)
}

// if the node is already marked as unschedulable, then don't do anything
unschedulable, err := node.IsUnschedulable()
unschedulable, err := n.IsUnschedulable()
if err == nil && unschedulable {
log.Log().Msg("Node is already marked unschedulable, not taking any action to add uncordon label.")
return nil
} else if err != nil {
return fmt.Errorf("Encountered an error while checking if the node is unschedulable. Not setting an uncordon label: %w", err)
}
err = node.MarkForUncordonAfterReboot()
err = n.MarkForUncordonAfterReboot()
if err != nil {
return fmt.Errorf("Unable to mark the node for uncordon: %w", err)
}
Expand Down
22 changes: 19 additions & 3 deletions pkg/interruptionevent/scheduled-event_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package interruptionevent

import (
"flag"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -66,15 +67,30 @@ func getNode(t *testing.T, drainHelper *drain.Helper) *node.Node {
}

func TestUncordonAfterRebootPreDrainSuccess(t *testing.T) {
drainEvent := InterruptionEvent{}
drainEvent := InterruptionEvent{
EventID: "some-id",
}
nthConfig := config.Config{
DryRun: true,
NodeName: nodeName,
}
tNode, _ := node.New(nthConfig)

err := uncordonAfterRebootPreDrain(drainEvent, *tNode)
client := fake.NewSimpleClientset()
_, err := client.CoreV1().Nodes().Create(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
h.Ok(t, err)

tNode, err := node.NewWithValues(nthConfig, getDrainHelper(client))

err = uncordonAfterRebootPreDrain(drainEvent, *tNode)

n, _ := client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
h.Assert(t, n.Spec.Taints[0].Key == node.ScheduledMaintenanceTaint, fmt.Sprintf("Missing expected taint key %s", node.ScheduledMaintenanceTaint))
h.Assert(t, n.Spec.Taints[0].Value == drainEvent.EventID, fmt.Sprintf("Missing expected taint value %s", drainEvent.EventID))
h.Assert(t, n.Spec.Taints[0].Effect == v1.TaintEffectNoSchedule, fmt.Sprintf("Missing expected taint effect %s", v1.TaintEffectNoSchedule))

h.Ok(t, err)
}

func TestUncordonAfterRebootPreDrainMarkWithEventIDFailure(t *testing.T) {
resetFlagsForTest()

Expand Down
13 changes: 13 additions & 0 deletions pkg/interruptionevent/spot-itn-event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/aws/aws-node-termination-handler/pkg/ec2metadata"
"github.com/aws/aws-node-termination-handler/pkg/node"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -59,10 +60,22 @@ func checkForSpotInterruptionNotice(imds *ec2metadata.Service) (*InterruptionEve
hash := sha256.New()
hash.Write([]byte(fmt.Sprintf("%v", instanceAction)))

var preDrainFunc preDrainTask = setInterruptionTaint

return &InterruptionEvent{
EventID: fmt.Sprintf("spot-itn-%x", hash.Sum(nil)),
Kind: SpotITNKind,
StartTime: interruptionTime,
Description: fmt.Sprintf("Spot ITN received. Instance will be interrupted at %s \n", instanceAction.Time),
PreDrainTask: preDrainFunc,
}, nil
}

func setInterruptionTaint(interruptionEvent InterruptionEvent, n node.Node) error {
err := n.TaintSpotItn(interruptionEvent.EventID)
if err != nil {
return fmt.Errorf("Unable to taint node with taint %s:%s: %w", node.ScheduledMaintenanceTaint, interruptionEvent.EventID, err)
}

return nil
}
69 changes: 69 additions & 0 deletions pkg/interruptionevent/spot-itn-event_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package interruptionevent

import (
"fmt"
"os"
"testing"
"time"

"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/aws/aws-node-termination-handler/pkg/node"
h "github.com/aws/aws-node-termination-handler/pkg/test"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubectl/pkg/drain"
)

var spotNodeName = "NAME"

func getSpotDrainHelper(client *fake.Clientset) *drain.Helper {
return &drain.Helper{
Client: client,
Force: true,
GracePeriodSeconds: -1,
IgnoreAllDaemonSets: true,
DeleteLocalData: true,
Timeout: time.Duration(120) * time.Second,
Out: os.Stdout,
ErrOut: os.Stderr,
}
}

func TestSetInterruptionTaint(t *testing.T) {
drainEvent := InterruptionEvent{
EventID: "some-id",
}
nthConfig := config.Config{
DryRun: true,
NodeName: spotNodeName,
}

client := fake.NewSimpleClientset()
_, err := client.CoreV1().Nodes().Create(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: spotNodeName}})
h.Ok(t, err)

tNode, err := node.NewWithValues(nthConfig, getSpotDrainHelper(client))

err = setInterruptionTaint(drainEvent, *tNode)

n, _ := client.CoreV1().Nodes().Get(spotNodeName, metav1.GetOptions{})
h.Assert(t, n.Spec.Taints[0].Key == node.SpotInterruptionTaint, fmt.Sprintf("Missing expected taint key %s", node.SpotInterruptionTaint))
h.Assert(t, n.Spec.Taints[0].Value == drainEvent.EventID, fmt.Sprintf("Missing expected taint value %s", drainEvent.EventID))
h.Assert(t, n.Spec.Taints[0].Effect == v1.TaintEffectNoSchedule, fmt.Sprintf("Missing expected taint effect %s", v1.TaintEffectNoSchedule))

h.Ok(t, err)
}
163 changes: 162 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand All @@ -43,6 +44,19 @@ const (
EventIDLabelKey = "aws-node-termination-handler/event-id"
)

const (
// SpotInterruptionTaint is a taint used to make spot instance unschedulable
SpotInterruptionTaint = "aws-node-termination-handler/spot-itn"
// ScheduledMaintenanceTaint is a taint used to make spot instance unschedulable
ScheduledMaintenanceTaint = "aws-node-termination-handler/scheduled-maintenance"
)

var (
maxRetryDeadline time.Duration = 5 * time.Second
conflictRetryInterval time.Duration = 750 * time.Millisecond
)


var uptimeFile = "/proc/uptime"

// Node represents a kubernetes node with functions to manipulate its state via the kubernetes api server
Expand Down Expand Up @@ -255,6 +269,42 @@ func (n Node) removeLabel(key string) error {
return nil
}

func (n Node) TaintSpotItn(eventID string) error {
k8sNode, err := n.fetchKubernetesNode()
if err != nil {
return fmt.Errorf("Unable to fetch kubernetes node from API: %w", err)
}

return addTaint(k8sNode, n.drainHelper.Client, SpotInterruptionTaint, eventID, corev1.TaintEffectNoSchedule)
}

func (n Node) TaintScheduledMaintenance(eventID string) error {
k8sNode, err := n.fetchKubernetesNode()
if err != nil {
return fmt.Errorf("Unable to fetch kubernetes node from API: %w", err)
}

return addTaint(k8sNode, n.drainHelper.Client, ScheduledMaintenanceTaint, eventID, corev1.TaintEffectNoSchedule)
}

func (n Node) CleanAllTaints() error {
k8sNode, err := n.fetchKubernetesNode()
if err != nil {
return fmt.Errorf("Unable to fetch kubernetes node from API: %w", err)
}

taints := []string{SpotInterruptionTaint, ScheduledMaintenanceTaint}

for _, taint := range taints {
_, err = cleanTaint(k8sNode, n.drainHelper.Client, taint)
if err != nil {
return fmt.Errorf("Unable to clean taint %s from node %s", taint, n.nthConfig.NodeName)
}
}

return nil
}

// IsLabeledWithAction will return true if the current node is labeled with NTH action labels
func (n Node) IsLabeledWithAction() (bool, error) {
k8sNode, err := n.fetchKubernetesNode()
Expand Down Expand Up @@ -300,6 +350,12 @@ func (n Node) UncordonIfRebooted() error {
if err != nil {
return err
}

err = n.CleanAllTaints()
if err != nil {
return err
}

log.Log().Msgf("Successfully completed action %s.", UncordonAfterRebootLabelVal)
default:
log.Log().Msg("There are no label actions to handle.")
Expand All @@ -309,7 +365,9 @@ func (n Node) UncordonIfRebooted() error {

// fetchKubernetesNode will send an http request to the k8s api server and return the corev1 model node
func (n Node) fetchKubernetesNode() (*corev1.Node, error) {
node := &corev1.Node{}
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{Name: n.nthConfig.NodeName},
}
if n.nthConfig.DryRun {
return node, nil
}
Expand Down Expand Up @@ -363,3 +421,106 @@ func jsonPatchEscape(value string) string {
value = strings.Replace(value, "~", "~0", -1)
return strings.Replace(value, "/", "~1", -1)
}

func addTaint(node *corev1.Node, client kubernetes.Interface, taintKey string, taintValue string, effect corev1.TaintEffect) error {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
refresh := false
for {
if refresh {
// Get the newest version of the node.
freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
log.Log().Msgf("Error while adding %v taint on node %v: %v", taintKey, node.Name, err)
return fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
}

if !addTaintToSpec(freshNode, taintKey, taintValue, effect) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
continue
}
return nil
}
_, err = client.CoreV1().Nodes().Update(freshNode)
if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
refresh = true
time.Sleep(conflictRetryInterval)
continue
}

if err != nil {
log.Log().Msgf("Error while adding %v taint on node %v: %v", taintKey, node.Name, err)
return err
}
log.Log().Msgf("Successfully added %v on node %v", taintKey, node.Name)
return nil
}
}

func addTaintToSpec(node *corev1.Node, taintKey string, taintValue string, effect corev1.TaintEffect) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == taintKey {
log.Log().Msgf("%v already present on node %v, taint: %v", taintKey, node.Name, taint)
return false
}
}
node.Spec.Taints = append(node.Spec.Taints, corev1.Taint{
Key: taintKey,
Value: taintValue,
Effect: effect,
})
return true
}

func cleanTaint(node *corev1.Node, client kubernetes.Interface, taintKey string) (bool, error) {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
refresh := false
for {
if refresh {
// Get the newest version of the node.
freshNode, err = client.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
log.Log().Msgf("Error while adding %v taint on node %v: %v", taintKey, node.Name, err)
return false, fmt.Errorf("failed to get node %v: %v", node.Name, err)
}
}
newTaints := make([]corev1.Taint, 0)
for _, taint := range freshNode.Spec.Taints {
if taint.Key == taintKey {
log.Log().Msgf("Releasing taint %+v on node %v", taint, node.Name)
} else {
newTaints = append(newTaints, taint)
}
}
if len(newTaints) == len(freshNode.Spec.Taints) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
continue
}
return false, nil
}

freshNode.Spec.Taints = newTaints
_, err = client.CoreV1().Nodes().Update(freshNode)

if err != nil && errors.IsConflict(err) && time.Now().Before(retryDeadline) {
refresh = true
time.Sleep(conflictRetryInterval)
continue
}

if err != nil {
log.Log().Msgf("Error while releasing %v taint on node %v: %v", taintKey, node.Name, err)
return false, err
}
log.Log().Msgf("Successfully released %v on node %v", taintKey, node.Name)
return true, nil
}
}

0 comments on commit 77ff34a

Please sign in to comment.