Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/aws asg unsafe decommission 5829 #6818

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,36 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}
}

// Initialize the success flag for recent scaling activity.
var recentScalingActivitySuccess = false
var err error

// Check if there are any placeholder instances in the list.
if m.HasPlaceholder(instances) {
// Log the check for placeholders in the ASG.
klog.V(4).Infof("Detected a placeholder instance, checking recent scaling activity for ASG %s", commonAsg.Name)

// Retrieve the most recent scaling activity to determine its success state.
recentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg)

// Handle errors from retrieving scaling activity.
if err != nil {
// Log the error if the scaling activity check fails and return the error.
klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err)
return err // Return error to prevent further processing with uncertain state information.
}
}

for _, instance := range instances {
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
if !recentScalingActivitySuccess {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ruiscosta I don't believe this actually solves the problem. Consider the following scenario:

  1. We scale up the ASG by 10 instances. AWS creates 3 of them, and then fails on the remaining 7.
  2. DeleteInstances is called with those 10 instances; 7 of them are placeholders
  3. In lines 321 we check to see if the most recent scaling activity was successful or not, which returns false since 7 instances could not be created.
  4. Now for each instance in the loop, we decrease the ASG size by one, which reduces the ASG size by 7.
  5. In between the check in line 321 and (say) the 5th iteration of the loop, AWS launches a new instance, which joins the cluster. Our information about what instances are actually placeholders is now out of date, and we get the same problem that we had before.

We "could" check the recent scaling activity in every iteration of the loop, at the expense of making a lot more API calls, which I think is undesirable, and is still subject to a race between when you make the check and when you change the ASG size.

// Log that scaling down due to unsuccessful recent activity
klog.V(4).Infof("Recent scaling activity unsuccessful; reducing ASG size for placeholder %s in ASG %s", instance.Name, commonAsg.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
continue // Continue to the next iteration after handling placeholder
}
klog.V(4).Infof("Skipping actions for placeholder %s in ASG %s due to successful recent scaling", instance.Name, commonAsg.Name)
continue
} else {
// check if the instance is already terminating - if it is, don't bother terminating again
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
Expand Down Expand Up @@ -352,6 +375,45 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return nil
}

func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String(asg.Name),
MaxRecords: aws.Int64(1),
}

var response *autoscaling.DescribeScalingActivitiesOutput
var err error
attempts := 3

for i := 0; i < attempts; i++ {
response, err = m.awsService.DescribeScalingActivities(input)
if err == nil {
break
}
klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err)
time.Sleep(time.Second * 2)
}

if err != nil {
klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err)
return false, err
}

if len(response.Activities) == 0 {
klog.Info("No scaling activities found for ASG:", asg.Name)
return false, nil
}

lastActivity := response.Activities[0]
if *lastActivity.StatusCode == "Successful" {
klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name)
return true, nil
} else {
klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
return false, fmt.Errorf("most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage)
}
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix)
Expand Down Expand Up @@ -620,6 +682,18 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
}
}

func (m *asgCache) HasPlaceholder(instances []*AwsInstanceRef) bool {
// check if there's a placeholder instance and verify most recent scaling activity before terminating it
var containsPlaceholder bool = false
for _, instance := range instances {
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
containsPlaceholder = true
break
}
}
return containsPlaceholder
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
Expand Down
50 changes: 50 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,53 @@ func TestCreatePlaceholders(t *testing.T) {
})
}
}

func TestGetMostRecentScalingActivity(t *testing.T) {
a := &autoScalingMock{}
asgCache := &asgCache{
awsService: &awsWrapper{
autoScalingI: a,
},
}

asg := &asg{AwsRef: AwsRef{Name: "test-asg"}}

// Test case: Successful scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Successful")}},
}, nil).Once()

success, err := asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.True(t, success)

// Test case: Failed scaling activity
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{{StatusCode: aws.String("Failed")}},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Test case: No activities found
a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
}).Return(&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{},
}, nil).Once()

success, err = asgCache.getMostRecentScalingActivity(asg)
assert.NoError(t, err)
assert.False(t, success)

// Verify that all expectations are met
a.AssertExpectations(t)
}
25 changes: 22 additions & 3 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package aws

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apiv1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +25,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/config"
"testing"
"time"
)

var testAwsManager = &AwsManager{
Expand Down Expand Up @@ -589,12 +589,30 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
},
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)

a.On("DescribeScalingActivities",
&autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: aws.String("test-asg"),
MaxRecords: aws.Int64(1),
},
).Return(
&autoscaling.DescribeScalingActivitiesOutput{
Activities: []*autoscaling.Activity{
{
StatusCode: aws.String("Successful"),
StartTime: aws.Time(time.Now().Add(-10 * time.Minute)),
},
{
StatusCode: aws.String("Failed"),
StartTime: aws.Time(time.Now().Add(-30 * time.Minute)),
},
},
}, nil)

provider.Refresh()

initialSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 2, initialSize)

node := &apiv1.Node{
Spec: apiv1.NodeSpec{
ProviderID: "aws:///us-east-1a/i-placeholder-test-asg-1",
Expand All @@ -608,6 +626,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, newSize)

}

func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
Expand Down
Loading