diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 4667e285b153..6c34b7c726dc 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,46 +308,78 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } + placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances) + // Check if there are any placeholder instances in the list. + if placeHolderInstancesCount > 0 { + // Log the check for placeholders in the ASG. + klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s", + placeHolderInstancesCount, commonAsg.Name) + + asgNames := []string{commonAsg.Name} + asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames) + + if err != nil { + klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err) + return err + } + + activeInstancesInAsg := len(asgDetail[0].Instances) + desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity) + klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count", + commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg) + + // If the difference between the active instances and the desired capacity is greater than 1, + // it means that the ASG is under-provisioned and the desired capacity is not being reached. + // In this case, we would reduce the size of ASG by the count of unprovisioned instances + // which is equal to the total count of active instances in ASG + + err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg) + + if err != nil { + klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) + return err + } + } + for _, instance := range instances { - // check if the instance is a placeholder - a requested instance that was never created by the node group - // if it is, just decrease the size of the node group, as there's no specific instance we can remove - if m.isPlaceholderInstance(instance) { - klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+ - "of deleting instance", instance.Name) - m.decreaseAsgSizeByOneNoLock(commonAsg) - } else { - // check if the instance is already terminating - if it is, don't bother terminating again - // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement - // unnecessarily. - lifecycle, err := m.findInstanceLifecycle(*instance) - if err != nil { - return err - } - if lifecycle != nil && - *lifecycle == autoscaling.LifecycleStateTerminated || - *lifecycle == autoscaling.LifecycleStateTerminating || - *lifecycle == autoscaling.LifecycleStateTerminatingWait || - *lifecycle == autoscaling.LifecycleStateTerminatingProceed { - klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle) - continue - } + if m.isPlaceholderInstance(instance) { + // skipping placeholder as placeholder instances don't exist + // and we have already reduced ASG size during placeholder check. + continue + } + // check if the instance is already terminating - if it is, don't bother terminating again + // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement + // unnecessarily. + lifecycle, err := m.findInstanceLifecycle(*instance) + if err != nil { + return err + } - params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ - InstanceId: aws.String(instance.Name), - ShouldDecrementDesiredCapacity: aws.Bool(true), - } - start := time.Now() - resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params) - observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start) - if err != nil { - return err - } - klog.V(4).Infof(*resp.Activity.Description) + if lifecycle != nil && + *lifecycle == autoscaling.LifecycleStateTerminated || + *lifecycle == autoscaling.LifecycleStateTerminating || + *lifecycle == autoscaling.LifecycleStateTerminatingWait || + *lifecycle == autoscaling.LifecycleStateTerminatingProceed { + klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle) + continue + } - // Proactively decrement the size so autoscaler makes better decisions - commonAsg.curSize-- + params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String(instance.Name), + ShouldDecrementDesiredCapacity: aws.Bool(true), + } + start := time.Now() + resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params) + observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start) + if err != nil { + return err } + klog.V(4).Infof(*resp.Activity.Description) + + // Proactively decrement the size so autoscaler makes better decisions + commonAsg.curSize-- + } return nil } @@ -624,3 +656,16 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn func (m *asgCache) Cleanup() { close(m.interrupt) } + +// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache +func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int { + + placeholderInstancesCount := 0 + for _, instance := range instances { + if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) { + placeholderInstancesCount++ + + } + } + return placeholderInstancesCount +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 0033d27c68ea..478a13112779 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -17,8 +17,7 @@ limitations under the License. package aws import ( - "testing" - + "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" apiv1 "k8s.io/api/core/v1" @@ -27,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/config" + "testing" ) var testAwsManager = &AwsManager{ @@ -603,7 +603,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) @@ -648,6 +648,115 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) { assert.NoError(t, err) } +func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) { + // This test validates the scenario where ASG cache is not in sync with Autoscaling configuration. + // we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002 + // But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" ) + // and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders + + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"})) + asgs := provider.NodeGroups() + commonAsg := &asg{ + AwsRef: AwsRef{Name: asgs[0].Id()}, + minSize: asgs[0].MinSize(), + maxSize: asgs[0].MaxSize(), + } + + // desired capacity will be set as 6 as ASG has 4 placeholders + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(6), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + + // Look up the current number of instances... + var expectedInstancesCount int64 = 10 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false) + + expectedInstancesCount = 4 + }).Return(nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 10, initialSize) + + var awsInstanceRefs []AwsInstanceRef + instanceToAsg := make(map[AwsInstanceRef]*asg) + + var nodes []*apiv1.Node + for i := 3; i <= 9; i++ { + providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i) + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: providerId, + }, + } + nodes = append(nodes, node) + awsInstanceRef := AwsInstanceRef{ + ProviderID: providerId, + Name: fmt.Sprintf("i-placeholder-test-asg-%d", i), + } + awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef) + instanceToAsg[awsInstanceRef] = commonAsg + } + + for i := 0; i <= 2; i++ { + providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i) + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: providerId, + }, + } + // only setting 2 instances to be terminated out of 3 active instances + if i < 2 { + nodes = append(nodes, node) + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String(fmt.Sprintf("i-000%d", i)), + ShouldDecrementDesiredCapacity: aws.Bool(true), + }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ + Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")}, + }) + } + awsInstanceRef := AwsInstanceRef{ + ProviderID: providerId, + Name: fmt.Sprintf("i-000%d", i), + } + awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef) + instanceToAsg[awsInstanceRef] = commonAsg + } + + // modifying provider to bring disparity between ASG and cache + provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs + provider.awsManager.asgCache.instanceToAsg = instanceToAsg + + // calling delete nodes 2 nodes and remaining placeholders + err = asgs[0].DeleteNodes(nodes) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + + // This ensures only 2 instances are terminated which are mocked in this unit test + a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2) + +} + func TestGetResourceLimiter(t *testing.T) { mockAutoScaling := &autoScalingMock{} mockEC2 := &ec2Mock{}