Skip to content

Commit

Permalink
PR#6911 Backport for 1.29: Fix/aws asg unsafe decommission kubernetes…
Browse files Browse the repository at this point in the history
  • Loading branch information
kmsarabu committed Jul 10, 2024
1 parent b36d4fb commit a5b23b3
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 38 deletions.
115 changes: 80 additions & 35 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,46 +308,78 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s",
placeHolderInstancesCount, commonAsg.Name)

asgNames := []string{commonAsg.Name}
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)

if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}

activeInstancesInAsg := len(asgDetail[0].Instances)
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)

// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG

err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}
if m.isPlaceholderInstance(instance) {
// skipping placeholder as placeholder instances don't exist
// and we have already reduced ASG size during placeholder check.
continue
}
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

}
return nil
}
Expand Down Expand Up @@ -624,3 +656,16 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
func (m *asgCache) Cleanup() {
close(m.interrupt)
}

// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {

placeholderInstancesCount := 0
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
placeholderInstancesCount++

}
}
return placeholderInstancesCount
}
115 changes: 112 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ limitations under the License.
package aws

import (
"testing"

"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
Expand Down Expand Up @@ -648,6 +648,115 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
assert.NoError(t, err)
}

func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) {
// This test validates the scenario where ASG cache is not in sync with Autoscaling configuration.
// we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002
// But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" )
// and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders

a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"}))
asgs := provider.NodeGroups()
commonAsg := &asg{
AwsRef: AwsRef{Name: asgs[0].Id()},
minSize: asgs[0].MinSize(),
maxSize: asgs[0].MaxSize(),
}

// desired capacity will be set as 6 as ASG has 4 placeholders
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(6),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

// Look up the current number of instances...
var expectedInstancesCount int64 = 10
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false)

expectedInstancesCount = 4
}).Return(nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 10, initialSize)

var awsInstanceRefs []AwsInstanceRef
instanceToAsg := make(map[AwsInstanceRef]*asg)

var nodes []*apiv1.Node
for i := 3; i <= 9; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
nodes = append(nodes, node)
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-placeholder-test-asg-%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

for i := 0; i <= 2; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
// only setting 2 instances to be terminated out of 3 active instances
if i < 2 {
nodes = append(nodes, node)
a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(fmt.Sprintf("i-000%d", i)),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{
Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")},
})
}
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-000%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

// modifying provider to bring disparity between ASG and cache
provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs
provider.awsManager.asgCache.instanceToAsg = instanceToAsg

// calling delete nodes 2 nodes and remaining placeholders
err = asgs[0].DeleteNodes(nodes)
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

// This ensures only 2 instances are terminated which are mocked in this unit test
a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2)

}

func TestGetResourceLimiter(t *testing.T) {
mockAutoScaling := &autoScalingMock{}
mockEC2 := &ec2Mock{}
Expand Down

0 comments on commit a5b23b3

Please sign in to comment.