Skip to content

Commit

Permalink
fix: Handle placeholder instance decommission safely in AWS ASGs
Browse files Browse the repository at this point in the history
This merge resolves the issue where the Kubernetes Cluster Autoscaler incorrectly decommissions actual instances instead of placeholders within AWS ASGs. The fix ensures that only placeholders are considered for scaling down when recent scaling activities fail, thereby preventing the accidental removal of active nodes. Enhanced unit tests and checks are included to ensure robustness.

Fixes #5829
  • Loading branch information
ruiscosta committed May 10, 2024
1 parent 3dda9c1 commit fb69323
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 18 deletions.
92 changes: 83 additions & 9 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,37 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
// Initialize the success flag for recent scaling activity.
var recentScalingActivitySuccess = false
var err error

// Check if there are any placeholder instances in the list.
if m.HasPlaceholder(instances) {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}
}

for _, instance := range instances {
if m.isPlaceholderInstance(instance) {
if !recentScalingActivitySuccess {
// Log that scaling down due to unsuccessful recent activity
klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
continue // Continue to the next iteration after handling placeholder
}
klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name)
continue
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
// unnecessarily.
Expand Down Expand Up @@ -352,6 +375,45 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return nil
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
}
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
Expand Down Expand Up @@ -620,7 +682,19 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
}
}

func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool {
// check if there's a placeholder instance and verify most recent scaling activity before terminating it
var containsPlaceholder bool = false
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
containsPlaceholder = true
break
}
}
return containsPlaceholder
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
}
51 changes: 51 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,54 @@ func TestCreatePlaceholders(t *testing.T) {
})
}
}

func TestGetMostRecentScalingActivity(t *testing.T) {
a := &autoScalingMock{}
asgCache := &asgCache{
awsService: &awsWrapper{
autoScalingI: a,
},
}

asg := &asg{AwsRef: AwsRef{Name: "test-asg"}}

// Test case: Successful scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}},
}, nil).Once()

success, err := asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.True(t, success)

// Test case: Failed scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Test case: No activities found
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Verify that all expectations are met
a.AssertExpectations(t)
}

37 changes: 28 additions & 9 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package aws

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +25,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
"time"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -589,25 +589,44 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-10 * time.Minute)),
},
{
StatusCode: aws.String("Failed"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, initialSize)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "aws:///us-east-1a/i-placeholder-test-asg-1",
},
}
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)
}

func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
Expand Down Expand Up @@ -738,4 +757,4 @@ func TestHasInstance(t *testing.T) {
present, err = provider.HasInstance(node4)
assert.NoError(t, err)
assert.False(t, present)
}
}

0 comments on commit fb69323

Please sign in to comment.