Skip to content

Commit

Permalink
lifecycle integ test, docs, & small cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
haugenj committed Apr 9, 2021
1 parent 14b885e commit 7fa7338
Show file tree
Hide file tree
Showing 49 changed files with 2,097 additions and 985 deletions.
1 change: 1 addition & 0 deletions cloudmock/aws/mockautoscaling/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type MockAutoscaling struct {
mutex sync.Mutex
Groups map[string]*autoscaling.Group
LaunchConfigurations map[string]*autoscaling.LaunchConfiguration
LifecycleHooks map[string]*autoscaling.LifecycleHook
}

var _ autoscalingiface.AutoScalingAPI = &MockAutoscaling{}
36 changes: 35 additions & 1 deletion cloudmock/aws/mockautoscaling/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,5 +329,39 @@ func (m *MockAutoscaling) DeleteAutoScalingGroupRequest(*autoscaling.DeleteAutoS
}

func (m *MockAutoscaling) PutLifecycleHook(input *autoscaling.PutLifecycleHookInput) (*autoscaling.PutLifecycleHookOutput, error) {
panic("Not implemented")
m.mutex.Lock()
defer m.mutex.Unlock()
hook := &autoscaling.LifecycleHook{
AutoScalingGroupName: input.AutoScalingGroupName,
DefaultResult: input.DefaultResult,
GlobalTimeout: input.HeartbeatTimeout,
HeartbeatTimeout: input.HeartbeatTimeout,
LifecycleHookName: input.LifecycleHookName,
LifecycleTransition: input.LifecycleTransition,
NotificationMetadata: input.NotificationMetadata,
NotificationTargetARN: input.NotificationTargetARN,
RoleARN: input.RoleARN,
}

if m.LifecycleHooks == nil {
m.LifecycleHooks = make(map[string]*autoscaling.LifecycleHook)
}
m.LifecycleHooks[*hook.AutoScalingGroupName] = hook

return &autoscaling.PutLifecycleHookOutput{}, nil
}

func (m *MockAutoscaling) DescribeLifecycleHooks(input *autoscaling.DescribeLifecycleHooksInput) (*autoscaling.DescribeLifecycleHooksOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

name := *input.AutoScalingGroupName
response := &autoscaling.DescribeLifecycleHooksOutput{}

hook := m.LifecycleHooks[name]
if hook == nil {
return response, nil
}
response.LifecycleHooks = []*autoscaling.LifecycleHook{hook}
return response, nil
}
83 changes: 70 additions & 13 deletions cloudmock/aws/mockeventbridge/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,99 @@ limitations under the License.
package mockeventbridge

import (
"sync"

"github.com/aws/aws-sdk-go/service/eventbridge"
"github.com/aws/aws-sdk-go/service/eventbridge/eventbridgeiface"
)

type MockEventBridge struct {
eventbridgeiface.EventBridgeAPI
mutex sync.Mutex

Rules []*eventbridge.Rule
Rules map[string]*eventbridge.Rule
TagsByArn map[string][]*eventbridge.Tag
TargetsByRule map[string][]*eventbridge.Target
}

var _ eventbridgeiface.EventBridgeAPI = &MockEventBridge{}

func (c *MockEventBridge) ListTargetsByRule(*eventbridge.ListTargetsByRuleInput) (*eventbridge.ListTargetsByRuleOutput, error) {
panic("Not implemented")
func (m *MockEventBridge) PutRule(input *eventbridge.PutRuleInput) (*eventbridge.PutRuleOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

name := *input.Name
arn := "arn:aws:events:us-east-1:012345678901:rule/" + name

rule := &eventbridge.Rule{
Arn: &arn,
EventPattern: input.EventPattern,
}
if m.Rules == nil {
m.Rules = make(map[string]*eventbridge.Rule)
}
if m.TagsByArn == nil {
m.TagsByArn = make(map[string][]*eventbridge.Tag)
}
m.Rules[name] = rule
m.TagsByArn[arn] = input.Tags

response := &eventbridge.PutRuleOutput{
RuleArn: &arn,
}
return response, nil
}

func (c *MockEventBridge) RemoveTargets(*eventbridge.RemoveTargetsInput) (*eventbridge.RemoveTargetsOutput, error) {
panic("Not implemented")
func (m *MockEventBridge) ListRules(input *eventbridge.ListRulesInput) (*eventbridge.ListRulesOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &eventbridge.ListRulesOutput{}

rule := m.Rules[*input.NamePrefix]
if rule == nil {
return response, nil
}
response.Rules = []*eventbridge.Rule{rule}
return response, nil
}

func (c *MockEventBridge) DeleteRule(*eventbridge.DeleteRuleInput) (*eventbridge.DeleteRuleOutput, error) {
func (m *MockEventBridge) DeleteRule(*eventbridge.DeleteRuleInput) (*eventbridge.DeleteRuleOutput, error) {
panic("Not implemented")
}

func (c *MockEventBridge) ListRules(*eventbridge.ListRulesInput) (*eventbridge.ListRulesOutput, error) {
response := &eventbridge.ListRulesOutput{
Rules: c.Rules,
}
func (m *MockEventBridge) ListTagsForResource(input *eventbridge.ListTagsForResourceInput) (*eventbridge.ListTagsForResourceOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &eventbridge.ListTagsForResourceOutput{
Tags: m.TagsByArn[*input.ResourceARN],
}
return response, nil
}

func (c *MockEventBridge) PutRule(*eventbridge.PutRuleInput) (*eventbridge.PutRuleOutput, error) {
panic("Not implemented")
func (m *MockEventBridge) PutTargets(input *eventbridge.PutTargetsInput) (*eventbridge.PutTargetsOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

if m.TargetsByRule == nil {
m.TargetsByRule = make(map[string][]*eventbridge.Target)
}
m.TargetsByRule[*input.Rule] = input.Targets

return &eventbridge.PutTargetsOutput{}, nil
}

func (m *MockEventBridge) ListTargetsByRule(input *eventbridge.ListTargetsByRuleInput) (*eventbridge.ListTargetsByRuleOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &eventbridge.ListTargetsByRuleOutput{
Targets: m.TargetsByRule[*input.Rule],
}
return response, nil
}

func (c *MockEventBridge) PutTargets(*eventbridge.PutTargetsInput) (*eventbridge.PutTargetsOutput, error) {
func (m *MockEventBridge) RemoveTargets(*eventbridge.RemoveTargetsInput) (*eventbridge.RemoveTargetsOutput, error) {
panic("Not implemented")
}
77 changes: 68 additions & 9 deletions cloudmock/aws/mocksqs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,93 @@ limitations under the License.
package mocksqs

import (
"sync"

"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
)

type MockSQS struct {
sqsiface.SQSAPI
mutex sync.Mutex

Queues map[string]mockQueue
}

QueueUrls []*string
type mockQueue struct {
url *string
attributes map[string]*string
tags map[string]*string
}

var _ sqsiface.SQSAPI = &MockSQS{}

func (c *MockSQS) DeleteQueue(*sqs.DeleteQueueInput) (*sqs.DeleteQueueOutput, error) {
panic("Not implemented")
func (m *MockSQS) CreateQueue(input *sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

name := *input.QueueName
url := "https://sqs.us-east-1.amazonaws.com/123456789123/" + name

if m.Queues == nil {
m.Queues = make(map[string]mockQueue)
}
queue := mockQueue{
url: &url,
attributes: input.Attributes,
tags: input.Tags,
}

m.Queues[name] = queue

response := &sqs.CreateQueueOutput{
QueueUrl: &url,
}
return response, nil
}

func (c *MockSQS) ListQueues(*sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) {
response := &sqs.ListQueuesOutput{
QueueUrls: c.QueueUrls,
func (m *MockSQS) ListQueues(input *sqs.ListQueuesInput) (*sqs.ListQueuesOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &sqs.ListQueuesOutput{}

if queue, ok := m.Queues[*input.QueueNamePrefix]; ok {
response.QueueUrls = []*string{queue.url}
}
return response, nil
}

func (m *MockSQS) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.GetQueueAttributesOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &sqs.GetQueueAttributesOutput{}

for _, v := range m.Queues {
if *v.url == *input.QueueUrl {
response.Attributes = v.attributes
return response, nil
}
}
return response, nil
}

func (c *MockSQS) ListQueueTags(*sqs.ListQueueTagsInput) (*sqs.ListQueueTagsOutput, error) {
panic("Not implemented")
func (m *MockSQS) ListQueueTags(input *sqs.ListQueueTagsInput) (*sqs.ListQueueTagsOutput, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

response := &sqs.ListQueueTagsOutput{}

for _, v := range m.Queues {
if *v.url == *input.QueueUrl {
response.Tags = v.tags
return response, nil
}
}
return response, nil
}

func (c *MockSQS) CreateQueue(*sqs.CreateQueueInput) (*sqs.CreateQueueOutput, error) {
func (m *MockSQS) DeleteQueue(*sqs.DeleteQueueInput) (*sqs.DeleteQueueOutput, error) {
panic("Not implemented")
}
1 change: 0 additions & 1 deletion cmd/kops/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions cmd/kops/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import (
"testing"
"time"

"k8s.io/kops/pkg/model"

"k8s.io/kops/cmd/kops/util"
"k8s.io/kops/pkg/featureflag"
"k8s.io/kops/pkg/jsonutils"
Expand Down Expand Up @@ -396,9 +394,10 @@ func TestAPIServerNodes(t *testing.T) {
newIntegrationTest("minimal.example.com", "apiservernodes").runTestCloudformation(t)
}

// TestNTHQueueProcessor tests the output for resources required by NTH Queue Processor mode
func TestNTHQueueProcessor(t *testing.T) {
newIntegrationTest("queueprocessor.example.com", "nodeterminationhandler_sqs_resources").withNTH().runTestTerraformAWS(t)
newIntegrationTest("queueprocessor.example.com", "nodeterminationhandler_sqs_resources").runTestCloudformation(t)
newIntegrationTest("nthsqsresources.example.com", "nth_sqs_resources").withNTH().runTestTerraformAWS(t)
newIntegrationTest("nthsqsresources.example.com", "nth_sqs_resources").runTestCloudformation(t)
}

func (i *integrationTest) runTest(t *testing.T, h *testutils.IntegrationTestHarness, expectedDataFilenames []string, tfFileName string, expectedTfFileName string, phase *cloudup.Phase) {
Expand Down Expand Up @@ -598,7 +597,7 @@ func (i *integrationTest) runTestTerraformAWS(t *testing.T) {
"aws_cloudwatch_event_rule_" + i.clusterName + "-ASGLifecycle_event_pattern",
"aws_cloudwatch_event_rule_" + i.clusterName + "-RebalanceRecommendation_event_pattern",
"aws_cloudwatch_event_rule_" + i.clusterName + "-SpotInterruption_event_pattern",
"aws_sqs_queue_" + model.QueueNamePrefix(i.clusterName) + "-nth_policy",
"aws_sqs_queue_" + strings.Replace(i.clusterName, ".", "-", -1) + "-nth_policy",
}...)
}
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/kops/lifecycle_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ func TestLifecyclePrivateSharedIP(t *testing.T) {
})
}

// TestLifecycleNodeTerminationHandlerQueueProcessor runs the test on a cluster with requisite resources for NTH Queue Processor
func TestLifecycleNodeTerminationHandlerQueueProcessor(t *testing.T) {
runLifecycleTestAWS(&LifecycleTestOptions{
t: t,
SrcDir: "nth_sqs_resources",
})
}

func runLifecycleTest(h *testutils.IntegrationTestHarness, o *LifecycleTestOptions, cloud *awsup.MockAWSCloud) {
ctx := context.Background()

Expand Down
5 changes: 4 additions & 1 deletion docs/addons.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,15 @@ spec:

{{ kops_feature_table(kops_added_default='1.19') }}

Node Termination Handler ensures that the Kubernetes control plane responds appropriately to events that can cause your EC2 instance to become unavailable, such as EC2 maintenance events, EC2 Spot interruptions, ASG Scale-In, ASG AZ Rebalance, and EC2 Instance Termination via the API or Console. If not handled, your application code may not stop gracefully, take longer to recover full availability, or accidentally schedule work to nodes that are going down.
[Node Termination Handler](https://github.com/aws/aws-node-termination-handler) ensures that the Kubernetes control plane responds appropriately to events that can cause your EC2 instance to become unavailable, such as EC2 maintenance events, EC2 Spot interruptions, and EC2 instance rebalance recommendations. If not handled, your application code may not stop gracefully, take longer to recover full availability, or accidentally schedule work to nodes that are going down.

If `enableSqsTerminationDraining` is enabled Node Termination Handler will operate in Queue Processor mode. In addition to the events mentioned above, Queue Processor mode allows Node Termination Handler to take care of ASG Scale-In, AZ-Rebalance, Unhealthy Instances, EC2 Instance Termination via the API or Console, and more. kOps will provision the necessary infrastructure: an SQS queue, EventBridge rules, and ASG Lifecycle hooks.

```yaml
spec:
nodeTerminationHandler:
enabled: true
enableSqsTerminationDraining: true
```

## Static addons
Expand Down
1 change: 0 additions & 1 deletion pkg/model/awsmodel/nodeterminationhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (b *NodeTerminationHandlerBuilder) configureASG(c *fi.ModelBuilderContext,
DefaultResult: aws.String("CONTINUE"),
HeartbeatTimeout: aws.Int64(DefaultMessageRetentionPeriod),
LifecycleTransition: aws.String("autoscaling:EC2_INSTANCE_TERMINATING"),
Tags: tags,
}

c.AddTask(lifecyleTask)
Expand Down
2 changes: 1 addition & 1 deletion pkg/model/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,5 +268,5 @@ func (b *KopsModelContext) InstanceName(ig *kops.InstanceGroup, suffix string) s

func QueueNamePrefix(clusterName string) string {
// periods aren't allowed in queue name
return strings.Replace(clusterName, ".", "-", -1)
return strings.ReplaceAll(clusterName, ".", "-")
}
1 change: 0 additions & 1 deletion pkg/resources/aws/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7fa7338

Please sign in to comment.