Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR#6911 Backport for 1.29: Fix/aws asg unsafe decommission #5829 #7025

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 80 additions & 35 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,46 +308,78 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
// Check if there are any placeholder instances in the list.
if placeHolderInstancesCount > 0 {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s",
placeHolderInstancesCount, commonAsg.Name)

asgNames := []string{commonAsg.Name}
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)

if err != nil {
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
return err
}

activeInstancesInAsg := len(asgDetail[0].Instances)
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)

// If the difference between the active instances and the desired capacity is greater than 1,
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
// which is equal to the total count of active instances in ASG

err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)

if err != nil {
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
return err
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}
if m.isPlaceholderInstance(instance) {
// skipping placeholder as placeholder instances don't exist
// and we have already reduced ASG size during placeholder check.
continue
}
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
lifecycle, err := m.findInstanceLifecycle(*instance)
if err != nil {
return err
}

params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
continue
}

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

}
return nil
}
Expand Down Expand Up @@ -624,3 +656,16 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
func (m *asgCache) Cleanup() {
close(m.interrupt)
}

// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {

placeholderInstancesCount := 0
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
placeholderInstancesCount++

}
}
return placeholderInstancesCount
}
115 changes: 112 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ limitations under the License.
package aws

import (
"testing"

"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +26,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
Expand Down Expand Up @@ -648,6 +648,115 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
assert.NoError(t, err)
}

func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) {
// This test validates the scenario where ASG cache is not in sync with Autoscaling configuration.
// we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002
// But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" )
// and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders

a := &autoScalingMock{}
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"}))
asgs := provider.NodeGroups()
commonAsg := &asg{
AwsRef: AwsRef{Name: asgs[0].Id()},
minSize: asgs[0].MinSize(),
maxSize: asgs[0].MaxSize(),
}

// desired capacity will be set as 6 as ASG has 4 placeholders
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asgs[0].Id()),
DesiredCapacity: aws.Int64(6),
HonorCooldown: aws.Bool(false),
}).Return(&autoscaling.SetDesiredCapacityOutput{})

// Look up the current number of instances...
var expectedInstancesCount int64 = 10
a.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
},
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false)

expectedInstancesCount = 4
}).Return(nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 10, initialSize)

var awsInstanceRefs []AwsInstanceRef
instanceToAsg := make(map[AwsInstanceRef]*asg)

var nodes []*apiv1.Node
for i := 3; i <= 9; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
nodes = append(nodes, node)
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-placeholder-test-asg-%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

for i := 0; i <= 2; i++ {
providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i)
node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: providerId,
},
}
// only setting 2 instances to be terminated out of 3 active instances
if i < 2 {
nodes = append(nodes, node)
a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(fmt.Sprintf("i-000%d", i)),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{
Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")},
})
}
awsInstanceRef := AwsInstanceRef{
ProviderID: providerId,
Name: fmt.Sprintf("i-000%d", i),
}
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
instanceToAsg[awsInstanceRef] = commonAsg
}

// modifying provider to bring disparity between ASG and cache
provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs
provider.awsManager.asgCache.instanceToAsg = instanceToAsg

// calling delete nodes 2 nodes and remaining placeholders
err = asgs[0].DeleteNodes(nodes)
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

// This ensures only 2 instances are terminated which are mocked in this unit test
a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2)

}

func TestGetResourceLimiter(t *testing.T) {
mockAutoScaling := &autoScalingMock{}
mockEC2 := &ec2Mock{}
Expand Down
Loading