Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Imds asg lifecycle support #1067

Merged
merged 10 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ When using the EC2 Console or EC2 API to terminate the instance, a state-change
| Spot Instance Termination Notifications (ITN) | ✅ | ✅ |
| Scheduled Events | ✅ | ✅ |
| Instance Rebalance Recommendation | ✅ | ✅ |
| ASG Termination Lifecycle Hooks | ✅ | ✅ |
| AZ Rebalance Recommendation | ❌ | ✅ |
| ASG Termination Lifecycle Hooks | ❌ | ✅ |
| Instance State Change Events | ❌ | ✅ |

### Kubernetes Compatibility
Expand Down
6 changes: 6 additions & 0 deletions cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"context"
"fmt"
"github.com/aws/aws-node-termination-handler/pkg/monitor/asglifecycle"
"os"
"os/signal"
"strings"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
const (
scheduledMaintenance = "Scheduled Maintenance"
spotITN = "Spot ITN"
asgLifecycle = "ASG Lifecycle"
rebalanceRecommendation = "Rebalance Recommendation"
sqsEvents = "SQS Event"
timeFormat = "2006/01/02 15:04:05"
Expand Down Expand Up @@ -188,6 +190,10 @@ func main() {
imdsSpotMonitor := spotitn.NewSpotInterruptionMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
monitoringFns[spotITN] = imdsSpotMonitor
}
if nthConfig.EnableASGLifecycleDraining {
asgLifecycleMonitor := asglifecycle.NewASGLifecycleMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
monitoringFns[asgLifecycle] = asgLifecycleMonitor
}
if nthConfig.EnableScheduledEventDraining {
imdsScheduledEventMonitor := scheduledevent.NewScheduledEventMonitor(imds, interruptionChan, cancelChan, nthConfig.NodeName)
monitoringFns[scheduledMaintenance] = imdsScheduledEventMonitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ spec:
{{- end }}
- name: ENABLE_SPOT_INTERRUPTION_DRAINING
value: {{ .Values.enableSpotInterruptionDraining | quote }}
- name: ENABLE_ASG_LIFECYCLE_DRAINING
value: {{ .Values.enableASGLifecycleDraining | quote }}
- name: ENABLE_SCHEDULED_EVENT_DRAINING
value: {{ .Values.enableScheduledEventDraining | quote }}
- name: ENABLE_REBALANCE_MONITORING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ spec:
{{- end }}
- name: ENABLE_SPOT_INTERRUPTION_DRAINING
value: {{ .Values.enableSpotInterruptionDraining | quote }}
- name: ENABLE_ASG_LIFECYCLE_DRAINING
value: {{ .Values.enableASGLifecycleDraining | quote }}
- name: ENABLE_SCHEDULED_EVENT_DRAINING
value: {{ .Values.enableScheduledEventDraining | quote }}
- name: ENABLE_REBALANCE_MONITORING
Expand Down
3 changes: 3 additions & 0 deletions config/helm/aws-node-termination-handler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ metadataTries: 3
# enableSpotInterruptionDraining If false, do not drain nodes when the spot interruption termination notice is received. Only used in IMDS mode.
enableSpotInterruptionDraining: true

# enableASGLifecycleDraining If false, do not drain nodes when ASG target lifecycle state Terminated is received. Only used in IMDS mode.
enableASGLifecycleDraining: true

# enableScheduledEventDraining If false, do not drain nodes before the maintenance window starts for an EC2 instance scheduled event. Only used in IMDS mode.
enableScheduledEventDraining: true

Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
enableScheduledEventDrainingDefault = true
enableSpotInterruptionDrainingConfigKey = "ENABLE_SPOT_INTERRUPTION_DRAINING"
enableSpotInterruptionDrainingDefault = true
enableASGLifecycleDrainingConfigKey = "ENABLE_ASG_LIFECYCLE_DRAINING"
enableASGLifecycleDrainingDefault = true
enableSQSTerminationDrainingConfigKey = "ENABLE_SQS_TERMINATION_DRAINING"
enableSQSTerminationDrainingDefault = false
enableRebalanceMonitoringConfigKey = "ENABLE_REBALANCE_MONITORING"
Expand Down Expand Up @@ -132,6 +134,7 @@ type Config struct {
WebhookProxy string
EnableScheduledEventDraining bool
EnableSpotInterruptionDraining bool
EnableASGLifecycleDraining bool
EnableSQSTerminationDraining bool
EnableRebalanceMonitoring bool
EnableRebalanceDraining bool
Expand Down Expand Up @@ -195,6 +198,7 @@ func ParseCliArgs() (config Config, err error) {
flag.StringVar(&config.WebhookTemplateFile, "webhook-template-file", getEnv(webhookTemplateFileConfigKey, ""), "If specified, replaces the default webhook message template with content from template file.")
flag.BoolVar(&config.EnableScheduledEventDraining, "enable-scheduled-event-draining", getBoolEnv(enableScheduledEventDrainingConfigKey, enableScheduledEventDrainingDefault), "If true, drain nodes before the maintenance window starts for an EC2 instance scheduled event")
flag.BoolVar(&config.EnableSpotInterruptionDraining, "enable-spot-interruption-draining", getBoolEnv(enableSpotInterruptionDrainingConfigKey, enableSpotInterruptionDrainingDefault), "If true, drain nodes when the spot interruption termination notice is received")
flag.BoolVar(&config.EnableASGLifecycleDraining, "enable-asg-lifecycle-draining", getBoolEnv(enableASGLifecycleDrainingConfigKey, enableASGLifecycleDrainingDefault), "If true, drain nodes when the ASG target lifecyle state is Terminated is received")
flag.BoolVar(&config.EnableSQSTerminationDraining, "enable-sqs-termination-draining", getBoolEnv(enableSQSTerminationDrainingConfigKey, enableSQSTerminationDrainingDefault), "If true, drain nodes when an SQS termination event is received")
flag.BoolVar(&config.EnableRebalanceMonitoring, "enable-rebalance-monitoring", getBoolEnv(enableRebalanceMonitoringConfigKey, enableRebalanceMonitoringDefault), "If true, cordon nodes when the rebalance recommendation notice is received. If you'd like to drain the node in addition to cordoning, then also set \"enableRebalanceDraining\".")
flag.BoolVar(&config.EnableRebalanceDraining, "enable-rebalance-draining", getBoolEnv(enableRebalanceDrainingConfigKey, enableRebalanceDrainingDefault), "If true, drain nodes when the rebalance recommendation notice is received")
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
t.Setenv("DRY_RUN", "true")
t.Setenv("ENABLE_SCHEDULED_EVENT_DRAINING", "true")
t.Setenv("ENABLE_SPOT_INTERRUPTION_DRAINING", "false")
t.Setenv("ENABLE_ASG_LIFECYCLE_DRAINING", "false")
t.Setenv("ENABLE_SQS_TERMINATION_DRAINING", "false")
t.Setenv("ENABLE_REBALANCE_MONITORING", "true")
t.Setenv("ENABLE_REBALANCE_DRAINING", "true")
Expand All @@ -62,6 +63,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) {
h.Equals(t, true, nthConfig.DryRun)
h.Equals(t, true, nthConfig.EnableScheduledEventDraining)
h.Equals(t, false, nthConfig.EnableSpotInterruptionDraining)
h.Equals(t, false, nthConfig.EnableASGLifecycleDraining)
h.Equals(t, false, nthConfig.EnableSQSTerminationDraining)
h.Equals(t, true, nthConfig.EnableRebalanceMonitoring)
h.Equals(t, true, nthConfig.EnableRebalanceDraining)
Expand Down Expand Up @@ -98,6 +100,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
"--dry-run=true",
"--enable-scheduled-event-draining=true",
"--enable-spot-interruption-draining=false",
"--enable-asg-lifecycle-draining=false",
"--enable-sqs-termination-draining=false",
"--enable-rebalance-monitoring=true",
"--enable-rebalance-draining=true",
Expand All @@ -124,6 +127,7 @@ func TestParseCliArgsSuccess(t *testing.T) {
h.Equals(t, true, nthConfig.DryRun)
h.Equals(t, true, nthConfig.EnableScheduledEventDraining)
h.Equals(t, false, nthConfig.EnableSpotInterruptionDraining)
h.Equals(t, false, nthConfig.EnableASGLifecycleDraining)
h.Equals(t, false, nthConfig.EnableSQSTerminationDraining)
h.Equals(t, true, nthConfig.EnableRebalanceMonitoring)
h.Equals(t, true, nthConfig.EnableRebalanceDraining)
Expand Down Expand Up @@ -155,6 +159,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
t.Setenv("DRY_RUN", "false")
t.Setenv("ENABLE_SCHEDULED_EVENT_DRAINING", "false")
t.Setenv("ENABLE_SPOT_INTERRUPTION_DRAINING", "true")
t.Setenv("ENABLE_ASG_LIFECYCLE_DRAINING", "true")
t.Setenv("ENABLE_SQS_TERMINATION_DRAINING", "false")
t.Setenv("ENABLE_REBALANCE_MONITORING", "true")
t.Setenv("ENABLE_REBALANCE_DRAINING", "true")
Expand All @@ -178,6 +183,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
"--dry-run=true",
"--enable-scheduled-event-draining=true",
"--enable-spot-interruption-draining=false",
"--enable-asg-lifecycle-draining=false",
"--enable-sqs-termination-draining=true",
"--enable-rebalance-monitoring=false",
"--enable-rebalance-draining=false",
Expand Down Expand Up @@ -205,6 +211,7 @@ func TestParseCliArgsOverrides(t *testing.T) {
h.Equals(t, true, nthConfig.DryRun)
h.Equals(t, true, nthConfig.EnableScheduledEventDraining)
h.Equals(t, false, nthConfig.EnableSpotInterruptionDraining)
h.Equals(t, false, nthConfig.EnableASGLifecycleDraining)
h.Equals(t, true, nthConfig.EnableSQSTerminationDraining)
h.Equals(t, false, nthConfig.EnableRebalanceMonitoring)
h.Equals(t, false, nthConfig.EnableRebalanceDraining)
Expand Down
24 changes: 24 additions & 0 deletions pkg/ec2metadata/ec2metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
const (
// SpotInstanceActionPath is the context path to spot/instance-action within IMDS
SpotInstanceActionPath = "/latest/meta-data/spot/instance-action"
// ASGTargetLifecycleStatePath path to autoscaling target lifecycle state within IMDS
ASGTargetLifecycleStatePath = "/latest/meta-data/autoscaling/target-lifecycle-state"
// ScheduledEventPath is the context path to events/maintenance/scheduled within IMDS
ScheduledEventPath = "/latest/meta-data/events/maintenance/scheduled"
// RebalanceRecommendationPath is the context path to events/recommendations/rebalance within IMDS
Expand Down Expand Up @@ -193,6 +195,28 @@ func (e *Service) GetRebalanceRecommendationEvent() (rebalanceRec *RebalanceReco
return rebalanceRec, nil
}

// GetASGTargetLifecycleState retrieves ASG target lifecycle state from imds. State can be empty string
// if the lifecycle hook is not present on the ASG
func (e *Service) GetASGTargetLifecycleState() (state string, err error) {
resp, err := e.Request(ASGTargetLifecycleStatePath)
// 404s should not happen, but there can be a case if the instance is brand new and the field is not populated yet
if resp != nil && resp.StatusCode == 404 {
return "", nil
} else if resp != nil && (resp.StatusCode < 200 || resp.StatusCode >= 300) {
return "", fmt.Errorf("Metadata request received http status code: %d", resp.StatusCode)
}
if err != nil {
return "", fmt.Errorf("Unable to parse metadata response: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("Unable to parse http response. Status code: %d. %w", resp.StatusCode, err)
}
return string(body), nil
}

// GetMetadataInfo generic function for retrieving ec2 metadata
func (e *Service) GetMetadataInfo(path string) (info string, err error) {
metadataInfo := ""
Expand Down
85 changes: 85 additions & 0 deletions pkg/ec2metadata/ec2metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,91 @@ func TestGetRebalanceRecommendationEventRequestFailure(t *testing.T) {
h.Assert(t, err != nil, "error expected because no server should be running")
}

func TestGetASGTargetLifecycleStateSuccess(t *testing.T) {
requestPath := "/latest/meta-data/autoscaling/target-lifecycle-state"

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Add("X-aws-ec2-metadata-token-ttl-seconds", "100")
if req.URL.String() == "/latest/api/token" {
rw.WriteHeader(200)
_, err := rw.Write([]byte(`token`))
h.Ok(t, err)
return
}
h.Equals(t, req.Header.Get("X-aws-ec2-metadata-token"), "token")
h.Equals(t, req.URL.String(), requestPath)
_, err := rw.Write([]byte("InService"))
h.Ok(t, err)
}))
defer server.Close()

expectedState := "InService"

// Use URL from our local test server
imds := ec2metadata.New(server.URL, 1)

state, err := imds.GetASGTargetLifecycleState()
h.Ok(t, err)
h.Equals(t, expectedState, state)
}

func TestGetASGTargetLifecycleState404Success(t *testing.T) {
requestPath := "/latest/meta-data/autoscaling/target-lifecycle-state"

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Add("X-aws-ec2-metadata-token-ttl-seconds", "100")
if req.URL.String() == "/latest/api/token" {
rw.WriteHeader(200)
_, err := rw.Write([]byte(`token`))
h.Ok(t, err)
return
}
h.Equals(t, req.Header.Get("X-aws-ec2-metadata-token"), "token")
h.Equals(t, req.URL.String(), requestPath)
rw.WriteHeader(404)
}))
defer server.Close()

// Use URL from our local test server
imds := ec2metadata.New(server.URL, 1)

state, err := imds.GetASGTargetLifecycleState()
h.Ok(t, err)
h.Assert(t, state == "", "ASG target lifecycle state should be empty")
}

func TestGetASGTargetLifecycleState500Failure(t *testing.T) {
requestPath := "/latest/meta-data/autoscaling/target-lifecycle-state"

server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Add("X-aws-ec2-metadata-token-ttl-seconds", "100")
if req.URL.String() == "/latest/api/token" {
rw.WriteHeader(200)
_, err := rw.Write([]byte(`token`))
h.Ok(t, err)
return
}
h.Equals(t, req.Header.Get("X-aws-ec2-metadata-token"), "token")
h.Equals(t, req.URL.String(), requestPath)
rw.WriteHeader(500)
}))
defer server.Close()

// Use URL from our local test server
imds := ec2metadata.New(server.URL, 1)

_, err := imds.GetASGTargetLifecycleState()
h.Assert(t, err != nil, "error expected on non-200 or non-404 status code")
}

func TestGetASGTargetLifecycleStateRequestFailure(t *testing.T) {
// Use URL from our local test server
imds := ec2metadata.New("/some-path-that-will-error", 1)

_, err := imds.GetASGTargetLifecycleState()
h.Assert(t, err != nil, "error expected because no server should be running")
}

func TestGetMetadataServiceRequest404(t *testing.T) {
var requestPath string = "/latest/meta-data/instance-type"

Expand Down
103 changes: 103 additions & 0 deletions pkg/monitor/asglifecycle/asg-lifecycle-monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

package asglifecycle

import (
"crypto/sha256"
"fmt"
"time"

"github.com/aws/aws-node-termination-handler/pkg/ec2metadata"
"github.com/aws/aws-node-termination-handler/pkg/monitor"
"github.com/aws/aws-node-termination-handler/pkg/node"
)

// ASGLifecycleMonitorKind is a const to define this monitor kind
const ASGLifecycleMonitorKind = "ASG_LIFECYCLE_MONITOR"

// ASGLifecycleMonitor is a struct definition which facilitates monitoring of ASG target lifecycle state from IMDS
type ASGLifecycleMonitor struct {
IMDS *ec2metadata.Service
InterruptionChan chan<- monitor.InterruptionEvent
CancelChan chan<- monitor.InterruptionEvent
NodeName string
}

// NewASGLifecycleMonitor creates an instance of a ASG lifecycle IMDS monitor
func NewASGLifecycleMonitor(imds *ec2metadata.Service, interruptionChan chan<- monitor.InterruptionEvent, cancelChan chan<- monitor.InterruptionEvent, nodeName string) ASGLifecycleMonitor {
return ASGLifecycleMonitor{
IMDS: imds,
InterruptionChan: interruptionChan,
CancelChan: cancelChan,
NodeName: nodeName,
}
}

// Monitor continuously monitors metadata for ASG target lifecycle state and sends interruption events to the passed in channel
func (m ASGLifecycleMonitor) Monitor() error {
interruptionEvent, err := m.checkForASGTargetLifecycleStateNotice()
if err != nil {
return err
}
if interruptionEvent != nil && interruptionEvent.Kind == monitor.ASGLifecycleKind {
m.InterruptionChan <- *interruptionEvent
}
return nil
}

// Kind denotes the kind of monitor
func (m ASGLifecycleMonitor) Kind() string {
return ASGLifecycleMonitorKind
}

// checkForASGTargetLifecycleStateNotice Checks EC2 instance metadata for a asg lifecycle termination notice
func (m ASGLifecycleMonitor) checkForASGTargetLifecycleStateNotice() (*monitor.InterruptionEvent, error) {
state, err := m.IMDS.GetASGTargetLifecycleState()
if err != nil {
return nil, fmt.Errorf("There was a problem checking for ASG target lifecycle state: %w", err)
}
if state != "Terminated" {
// if the state is not "Terminated", we can skip. State can also be empty (no hook configured).
return nil, nil
}

nodeName := m.NodeName
// there is no time in the response, we just set time to the latest check
interruptionTime := time.Now()

// There's no EventID returned, so we'll create it using a hash to prevent duplicates.
hash := sha256.New()
if _, err = hash.Write([]byte(fmt.Sprintf("%s:%s", state, interruptionTime))); err != nil {
return nil, fmt.Errorf("There was a problem creating an event ID from the event: %w", err)
}

return &monitor.InterruptionEvent{
EventID: fmt.Sprintf("target-lifecycle-state-terminated-%x", hash.Sum(nil)),
Kind: monitor.ASGLifecycleKind,
Monitor: ASGLifecycleMonitorKind,
StartTime: interruptionTime,
NodeName: nodeName,
Description: "AST target lifecycle state received. Instance will be terminated\n",
PreDrainTask: setInterruptionTaint,
}, nil
}

func setInterruptionTaint(interruptionEvent monitor.InterruptionEvent, n node.Node) error {
err := n.TaintASGLifecycleTermination(interruptionEvent.NodeName, interruptionEvent.EventID)
if err != nil {
return fmt.Errorf("Unable to taint node with taint %s:%s: %w", node.ASGLifecycleTerminationTaint, interruptionEvent.EventID, err)
}

return nil
}
Loading
Loading