Skip to content

Commit

Permalink
Fix/aws asg unsafe decommission 5829
Browse files Browse the repository at this point in the history
This change expands on pr kubernetes#6818

This merge resolves an issue in the Kubernetes Cluster Autoscaler where actual instances within AWS Auto Scaling Groups (ASGs) were incorrectly decommissioned instead of placeholders. The updates ensure that placeholders are exclusively targeted for scaling down under conditions where recent scaling activities have failed. This prevents the accidental termination of active nodes and enhances the reliability of the autoscaler in AWS environments.

Fixes kubernetes#5829
  • Loading branch information
ravisinha0506 committed Jun 10, 2024
1 parent 83db225 commit 63d1fed
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 0 deletions.
127 changes: 127 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,54 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

var isRecentScalingActivitySuccess = false
var err error

placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s",
placeHolderInstancesCount, commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
isRecentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}

if !isRecentScalingActivitySuccess {
asgDetail, err := m.getDescribeAutoScalingGroupResults(commonAsg)

if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}

activeInstancesInAsg := len(asgDetail.Instances)
desiredCapacityInAsg := int(*asgDetail.DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d ",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)

// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG

err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
return nil
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
Expand Down Expand Up @@ -352,6 +400,33 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return nil
}

func (m *asgCache) getDescribeAutoScalingGroupResults(commonAsg *asg) (*autoscaling.Group, error) {
asgs := make([]*autoscaling.Group, 0)
commonAsgNames := []string{commonAsg.Name}
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice(commonAsgNames),
MaxRecords: aws.Int64(100),
}

err := m.awsService.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool {
asgs = append(asgs, output.AutoScalingGroups...)
// We return true while we want to be called with the next page of
// results, if any.
return false
})

if err != nil {
klog.Errorf("Failed while performing DescribeAutoScalingGroupsPages: %v", err)
return nil, err
}

if len(asgs) == 0 {
return nil, fmt.Errorf("no ASGs found for %s", commonAsgNames)
}

return asgs[0], nil
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
Expand Down Expand Up @@ -624,3 +699,55 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
func (m *asgCache) Cleanup() {
close(m.interrupt)
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, nil
}
}

// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {

placeholderInstancesCount := 0
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
placeholderInstancesCount++

}
}
return placeholderInstancesCount
}
98 changes: 98 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package aws

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -568,6 +569,22 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StatusMessage: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

// Look up the current number of instances...
var expectedInstancesCount int64 = 2
a.On("DescribeAutoScalingGroupsPages",
Expand Down Expand Up @@ -739,3 +756,84 @@ func TestHasInstance(t *testing.T) {
assert.NoError(t, err)
assert.False(t, present)
}

// write unit test for DeleteInstances function
func TestDeleteInstances_scalingActivityFailure(t *testing.T) {

a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}))

asgs := provider.NodeGroups()
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(1),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})
var expectedInstancesCount int64 = 5
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(100),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0c257f8f05fd1c64b", "i-0c257f8f05fd1c64c", "i-0c257f8f05fd1c64d"), false)
// we expect the instance count to be 1 after the call to DeleteNodes
//expectedInstancesCount =
}).Return(nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Failed"),
StatusMessage: aws.String("Launching a new EC2 instance. Status Reason: We currently do not have sufficient p5.48xlarge capacity in zones with support for 'gp2' volumes. Our system will be working on provisioning additional capacity. Launching EC2 instance failed.\t"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(3),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 5, initialSize)

nodes := []*apiv1.Node{}
asgToInstances := provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}]
for _, instance := range asgToInstances {
nodes = append(nodes, &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: instance.ProviderID,
},
})
}

err = asgs[0].DeleteNodes(nodes)
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, newSize)

}

0 comments on commit 63d1fed

Please sign in to comment.