Skip to content

Commit

Permalink
this change expands on pr kubernetes#6818
Browse files Browse the repository at this point in the history
This merge resolves an issue in the Kubernetes Cluster Autoscaler where actual instances within AWS Auto Scaling Groups (ASGs) were incorrectly decommissioned instead of placeholders. The updates ensure that placeholders are exclusively targeted for scaling down under conditions where recent scaling activities have failed. This prevents the accidental termination of active nodes and enhances the reliability of the autoscaler in AWS environments.

Fixes kubernetes#5829
  • Loading branch information
ravisinha0506 committed Jun 7, 2024
1 parent bd8153b commit 1aa13d8
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 8 deletions.
77 changes: 72 additions & 5 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,29 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

// Initialize the success flag for recent scaling activity.
var recentScalingActivitySuccess = false
var err error

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}

if !recentScalingActivitySuccess {
// Log that scaling down due to unsuccessful recent activity
klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
continue // Continue to the next iteration after handling placeholder
}
klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name)
continue
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
Expand Down Expand Up @@ -352,6 +368,45 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return nil
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
}
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
Expand Down Expand Up @@ -620,6 +675,18 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
}
}

func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool {
// check if there's a placeholder instance and verify most recent scaling activity before terminating it
var containsPlaceholder bool = false
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
containsPlaceholder = true
break
}
}
return containsPlaceholder
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
Expand Down
50 changes: 50 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,53 @@ func TestCreatePlaceholders(t *testing.T) {
})
}
}

func TestGetMostRecentScalingActivity(t *testing.T) {
a := &autoScalingMock{}
asgCache := &asgCache{
awsService: &awsWrapper{
autoScalingI: a,
},
}

asg := &asg{AwsRef: AwsRef{Name: "test-asg"}}

// Test case: Successful scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}},
}, nil).Once()

success, err := asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.True(t, success)

// Test case: Failed scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Test case: No activities found
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Verify that all expectations are met
a.AssertExpectations(t)
}
25 changes: 22 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package aws

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +25,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
"time"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -589,12 +589,30 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-10 * time.Minute)),
},
{
StatusCode: aws.String("Failed"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, initialSize)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "aws:///us-east-1a/i-placeholder-test-asg-1",
Expand All @@ -608,6 +626,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)

}

func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
Expand Down

0 comments on commit 1aa13d8

Please sign in to comment.