Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ASG Launch Lifecycle Hook #940

Merged
merged 27 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ccbe23f
Added check of ASG lifecycle hook informing of an EC2 instance launch…
GavinBurris42 Aug 17, 2023
ed2a317
Completes ASG launch lifecycle hook if new node is ready in cluster
GavinBurris42 Aug 21, 2023
4f47cb1
Avoid processing of interuption event for launching
GavinBurris42 Aug 21, 2023
2415c4c
Fixed logic flow for how ASG hooks were checked. Added error logs and…
GavinBurris42 Aug 23, 2023
a28edd6
Created ASG launch lifecyle test script
GavinBurris42 Aug 25, 2023
12995c2
ASG Launch Lifecyle can be completed. ASG Terminate Lifecycle hook is…
GavinBurris42 Sep 8, 2023
000aa2c
Created bash script for testing ASG lifecycle hook completion
GavinBurris42 Sep 19, 2023
75ed706
E2E tests for ASG launch lifecycle hook completion is complete
GavinBurris42 Oct 5, 2023
aed12d1
Added bash script to run test files. Revised method names and placeme…
GavinBurris42 Nov 1, 2023
6838bcc
Refactored the unmarshalling of the SQS message
GavinBurris42 Nov 21, 2023
3ee0880
Refactor the creation and usage of the K8s client
GavinBurris42 Nov 21, 2023
470bc5f
Updated run-test with inclusive terminology
GavinBurris42 Dec 1, 2023
9612714
Fix boolean logic in bash scripts
GavinBurris42 Dec 1, 2023
7fbfe35
Refactored the processing of ASG Launch Lifecycle events as interrupt…
GavinBurris42 Dec 1, 2023
fee8077
Revise log messages and formatting
GavinBurris42 Dec 5, 2023
45869af
Removed ignore errors and getNodes method. Changed method names
GavinBurris42 Dec 5, 2023
10f0f34
Refactor error handling, and add helpful comments
GavinBurris42 Dec 5, 2023
1ab824b
Refactored interruption event handling into a seperate package with d…
GavinBurris42 Dec 7, 2023
9832d2b
Revised formatting, logging, and error message issues
GavinBurris42 Dec 8, 2023
307b6a8
Refactors for log and error handling for eventhandlers. Refacors for …
GavinBurris42 Dec 19, 2023
b4ea84a
Refactored error and logging messages and bash script tests for ASG l…
GavinBurris42 Dec 20, 2023
f52c7b5
Update ReadME for ASG launch lifecycle hook changes
GavinBurris42 Dec 21, 2023
ebf8e43
Fixed changes for README update
GavinBurris42 Dec 26, 2023
d16182d
Use ASG launch event in SQS testing
GavinBurris42 Jan 2, 2024
8a94281
Revise formatting for updated README
GavinBurris42 Jan 3, 2024
cd91851
Revised ASG Launch Lifecycle Assertion test to adhere to shellcheck. …
GavinBurris42 Jan 5, 2024
a8197a4
Update E22 EKS cluster test with ASG test script
GavinBurris42 Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,12 @@ You'll need the following AWS infrastructure components:

1. Amazon Simple Queue Service (SQS) Queue
2. AutoScaling Group Termination Lifecycle Hook
3. Amazon EventBridge Rule
4. IAM Role for the aws-node-termination-handler Queue Processing Pods
3. Instance Tagging
4. Amazon EventBridge Rule
5. IAM Role for the aws-node-termination-handler Queue Processing Pods

Optional AWS infrastructure components:
1. AutoScaling Group Launch Lifecycle Hook

#### 1. Create an SQS Queue:

Expand Down Expand Up @@ -290,7 +294,7 @@ aws autoscaling put-lifecycle-hook \
--lifecycle-transition=autoscaling:EC2_INSTANCE_TERMINATING \
--default-result=CONTINUE \
--heartbeat-timeout=300 \
--notification-target-arn <your test queue ARN here> \
--notification-target-arn <your queue ARN here> \
--role-arn <your SQS access role ARN here>
```

Expand Down Expand Up @@ -398,6 +402,36 @@ IAM Policy for aws-node-termination-handler Deployment:
}
```

#### 1. Handle ASG Instance Launch Lifecycle Notifications (optional):

NTH can monitor for new instances launched by an ASG and notify the ASG when the instance is available in the EKS cluster.

NTH will need to receive notifications of new instance launches within the ASG. We can add a lifecycle hook to the ASG that will send instance launch notifications via EventBridge:

```
aws autoscaling put-lifecycle-hook \
--lifecycle-hook-name=my-k8s-launch-hook \
--auto-scaling-group-name=my-k8s-asg \
--lifecycle-transition=autoscaling:EC2_INSTANCE_LAUNCHING \
--default-result="ABANDON" \
--heartbeat-timeout=300
```

Alternatively, ASG can send the instance launch notification directly to an SQS Queue:

```
aws autoscaling put-lifecycle-hook \
--lifecycle-hook-name=my-k8s-launch-hook \
--auto-scaling-group-name=my-k8s-asg \
--lifecycle-transition=autoscaling:EC2_INSTANCE_LAUNCHING \
--default-result="ABANDON" \
--heartbeat-timeout=300 \
--notification-target-arn <your queue ARN here> \
--role-arn <your SQS access role ARN here>
```

When NTH receives a launch notification, it will periodically check for a node backed by the EC2 instance to join the cluster and for the node to have a status of 'ready.' Once a node becomes ready, NTH will complete the lifecycle hook, prompting the ASG to proceed with terminating the previous instance. If the lifecycle hook is not completed before the timeout, the ASG will take the default action. If the default action is 'ABANDON', the new instance will be terminated, and the notification process will be repeated with another new instance.

### Installation

#### Pod Security Admission
Expand Down
144 changes: 33 additions & 111 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

"github.com/aws/aws-node-termination-handler/pkg/config"
"github.com/aws/aws-node-termination-handler/pkg/ec2metadata"
"github.com/aws/aws-node-termination-handler/pkg/interruptionevent/asg/launch"
"github.com/aws/aws-node-termination-handler/pkg/interruptionevent/draincordon"
"github.com/aws/aws-node-termination-handler/pkg/interruptioneventstore"
"github.com/aws/aws-node-termination-handler/pkg/logging"
"github.com/aws/aws-node-termination-handler/pkg/monitor"
Expand All @@ -43,8 +45,9 @@ import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const (
Expand All @@ -56,6 +59,10 @@ const (
duplicateErrThreshold = 3
)

type interruptionEventHandler interface {
HandleEvent(*monitor.InterruptionEvent) error
}

func main() {
// Zerolog uses json formatting by default, so change that to a human-readable format instead
log.Logger = log.Output(logging.RoutingLevelWriter{
Expand Down Expand Up @@ -97,7 +104,16 @@ func main() {
nthConfig.Print()
log.Fatal().Err(err).Msg("Webhook validation failed,")
}
node, err := node.New(nthConfig)

clusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatal().Err(err).Msgf("retreiving cluster config")
}
clientset, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
log.Fatal().Err(err).Msgf("creating new clientset with config: %v", err)
}
node, err := node.New(nthConfig, clientset)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to instantiate a node for various kubernetes node functions,")
Expand Down Expand Up @@ -137,7 +153,7 @@ func main() {
log.Fatal().Msgf("Unable to find the AWS region to process queue events.")
}

recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations)
recorder, err := observability.InitK8sEventRecorder(nthConfig.EmitKubernetesEvents, nthConfig.NodeName, nthConfig.EnableSQSTerminationDraining, nodeMetadata, nthConfig.KubernetesEventsExtraAnnotations, clientset)
if err != nil {
nthConfig.Print()
log.Fatal().Err(err).Msg("Unable to create Kubernetes event recorder,")
Expand Down Expand Up @@ -243,6 +259,9 @@ func main() {

var wg sync.WaitGroup

asgLaunchHandler := launch.New(interruptionEventStore, *node, nthConfig, metrics, recorder, clientset)
drainCordonHander := draincordon.New(interruptionEventStore, *node, nthConfig, nodeMetadata, metrics, recorder)

for range time.NewTicker(1 * time.Second).C {
select {
case <-signalChan:
Expand All @@ -257,7 +276,7 @@ func main() {
event.InProgress = true
wg.Add(1)
recorder.Emit(event.NodeName, observability.Normal, observability.GetReasonForKind(event.Kind, event.Monitor), event.Description)
go drainOrCordonIfNecessary(interruptionEventStore, event, *node, nthConfig, nodeMetadata, metrics, recorder, &wg)
go processInterruptionEvent(interruptionEventStore, event, []interruptionEventHandler{asgLaunchHandler, drainCordonHander}, &wg)
default:
log.Warn().Msg("all workers busy, waiting")
break EventLoop
Expand Down Expand Up @@ -329,122 +348,25 @@ func watchForCancellationEvents(cancelChan <-chan monitor.InterruptionEvent, int
}
}

func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, drainEvent *monitor.InterruptionEvent, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata, metrics observability.Metrics, recorder observability.K8sEventRecorder, wg *sync.WaitGroup) {
func processInterruptionEvent(interruptionEventStore *interruptioneventstore.Store, event *monitor.InterruptionEvent, eventHandlers []interruptionEventHandler, wg *sync.WaitGroup) {
defer wg.Done()
nodeFound := true
nodeName := drainEvent.NodeName

if nthConfig.UseProviderId {
newNodeName, err := node.GetNodeNameFromProviderID(drainEvent.ProviderID)
if event == nil {
log.Error().Msg("processing nil interruption event")
<-interruptionEventStore.Workers
return
}

var err error
for _, eventHandler := range eventHandlers {
err = eventHandler.HandleEvent(event)
if err != nil {
log.Err(err).Msgf("Unable to get node name for node with ProviderID '%s' using original AWS event node name ", drainEvent.ProviderID)
} else {
nodeName = newNodeName
log.Error().Err(err).Interface("event", event).Msg("handling event")
}
}

nodeLabels, err := node.GetNodeLabels(nodeName)
if err != nil {
log.Err(err).Msgf("Unable to fetch node labels for node '%s' ", nodeName)
nodeFound = false
}
drainEvent.NodeLabels = nodeLabels
if drainEvent.PreDrainTask != nil {
runPreDrainTask(node, nodeName, drainEvent, metrics, recorder)
}

podNameList, err := node.FetchPodNameList(nodeName)
if err != nil {
log.Err(err).Msgf("Unable to fetch running pods for node '%s' ", nodeName)
}
drainEvent.Pods = podNameList
err = node.LogPods(podNameList, nodeName)
if err != nil {
log.Err(err).Msg("There was a problem while trying to log all pod names on the node")
}

if nthConfig.CordonOnly || (!nthConfig.EnableSQSTerminationDraining && drainEvent.IsRebalanceRecommendation() && !nthConfig.EnableRebalanceDraining) {
err = cordonNode(node, nodeName, drainEvent, metrics, recorder)
} else {
err = cordonAndDrainNode(node, nodeName, drainEvent, metrics, recorder, nthConfig.EnableSQSTerminationDraining)
}

if nthConfig.WebhookURL != "" {
webhook.Post(nodeMetadata, drainEvent, nthConfig)
}

if err != nil {
interruptionEventStore.CancelInterruptionEvent(drainEvent.EventID)
} else {
interruptionEventStore.MarkAllAsProcessed(nodeName)
}

if (err == nil || (!nodeFound && nthConfig.DeleteSqsMsgIfNodeNotFound)) && drainEvent.PostDrainTask != nil {
runPostDrainTask(node, nodeName, drainEvent, metrics, recorder)
}
<-interruptionEventStore.Workers
}

func runPreDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
err := drainEvent.PreDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the pre-drain task")
recorder.Emit(nodeName, observability.Warning, observability.PreDrainErrReason, observability.PreDrainErrMsgFmt, err.Error())
} else {
recorder.Emit(nodeName, observability.Normal, observability.PreDrainReason, observability.PreDrainMsg)
}
metrics.NodeActionsInc("pre-drain", nodeName, drainEvent.EventID, err)
}

func cordonNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) error {
err := node.Cordon(nodeName, drainEvent.Description)
if err != nil {
if errors.IsNotFound(err) {
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Err(err).Msg("There was a problem while trying to cordon the node")
recorder.Emit(nodeName, observability.Warning, observability.CordonErrReason, observability.CordonErrMsgFmt, err.Error())
}
return err
} else {
log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned")
metrics.NodeActionsInc("cordon", nodeName, drainEvent.EventID, err)
recorder.Emit(nodeName, observability.Normal, observability.CordonReason, observability.CordonMsg)
}
return nil
}

func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder)
if err != nil {
if errors.IsNotFound(err) {
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)
} else {
log.Err(err).Msg("There was a problem while trying to cordon and drain the node")
metrics.NodeActionsInc("cordon-and-drain", nodeName, drainEvent.EventID, err)
recorder.Emit(nodeName, observability.Warning, observability.CordonAndDrainErrReason, observability.CordonAndDrainErrMsgFmt, err.Error())
}
return err
} else {
log.Info().Str("node_name", nodeName).Str("reason", drainEvent.Description).Msg("Node successfully cordoned and drained")
metrics.NodeActionsInc("cordon-and-drain", nodeName, drainEvent.EventID, err)
recorder.Emit(nodeName, observability.Normal, observability.CordonAndDrainReason, observability.CordonAndDrainMsg)
}
return nil
}

func runPostDrainTask(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder) {
err := drainEvent.PostDrainTask(*drainEvent, node)
if err != nil {
log.Err(err).Msg("There was a problem executing the post-drain task")
recorder.Emit(nodeName, observability.Warning, observability.PostDrainErrReason, observability.PostDrainErrMsgFmt, err.Error())
} else {
recorder.Emit(nodeName, observability.Normal, observability.PostDrainReason, observability.PostDrainMsg)
}
metrics.NodeActionsInc("post-drain", nodeName, drainEvent.EventID, err)
}

func getRegionFromQueueURL(queueURL string) string {
for _, partition := range endpoints.DefaultPartitions() {
for regionID := range partition.Regions() {
Expand Down
Loading
Loading