From ff7a96d3cf15fd658dedc10f94d5022f2c09a76f Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Thu, 3 Mar 2022 13:01:17 -0800 Subject: [PATCH] Merge pull request #4489 from airbnb/drmorr--early-abort-if-aws-node-group-no-capacity Early abort if AWS node group has no capacity --- .../cloudprovider/aws/CA_with_AWS_IAM_OIDC.md | 1 + .../cloudprovider/aws/README.md | 29 ++-- .../cloudprovider/aws/auto_scaling_groups.go | 142 ++++++++++++------ .../aws/auto_scaling_groups_test.go | 120 +++++++++++++++ .../cloudprovider/aws/aws_cloud_provider.go | 27 +++- .../aws/aws_cloud_provider_test.go | 10 +- .../cloudprovider/aws/aws_manager.go | 7 +- .../cloudprovider/aws/aws_manager_test.go | 18 ++- .../cloudprovider/aws/aws_wrapper.go | 1 + .../cloudprovider/aws/aws_wrapper_test.go | 5 + .../cloudprovider/aws/instance_type_cache.go | 2 +- .../aws/instance_type_cache_test.go | 7 +- 12 files changed, 296 insertions(+), 73 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md index 217bd3d505eb..53db71aa6cc5 100644 --- a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md +++ b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md @@ -65,6 +65,7 @@ Note: The keys for the tags that you entered don't have values. Cluster Autoscal "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:DescribeTags", "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup" diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index 94df148bdb2f..b2944cca238d 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -19,20 +19,21 @@ The following policy provides the minimum privileges necessary for Cluster Autos ```json { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Action": [ - "autoscaling:DescribeAutoScalingGroups", - "autoscaling:DescribeAutoScalingInstances", - "autoscaling:DescribeLaunchConfigurations", - "autoscaling:SetDesiredCapacity", - "autoscaling:TerminateInstanceInAutoScalingGroup" - ], - "Resource": ["*"] - } - ] + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups", + "autoscaling:DescribeAutoScalingInstances", + "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", + "autoscaling:SetDesiredCapacity", + "autoscaling:TerminateInstanceInAutoScalingGroup" + ], + "Resource": ["arn:aws:autoscaling:${YOUR_CLUSTER_AWS_REGION}:${YOUR_AWS_ACCOUNT_ID}:autoScalingGroup:*:autoScalingGroupName/${YOUR_ASG_NAME}"] + } + ] } ``` diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index c534d748caf3..c2875b9b3e71 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -31,14 +31,16 @@ import ( ) const ( - scaleToZeroSupported = true - placeholderInstanceNamePrefix = "i-placeholder" + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled" ) type asgCache struct { - registeredAsgs []*asg + registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg + instanceStatus map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -61,9 +63,10 @@ type mixedInstancesPolicy struct { type asg struct { AwsRef - minSize int - maxSize int - curSize int + minSize int + maxSize int + curSize int + lastUpdateTime time.Time AvailabilityZones []string LaunchConfigurationName string @@ -74,10 +77,11 @@ type asg struct { func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { registry := &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + instanceStatus: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -119,53 +123,44 @@ func (m *asgCache) parseExplicitAsgs(specs []string) error { // Register ASG. Returns the registered ASG. func (m *asgCache) register(asg *asg) *asg { - for i := range m.registeredAsgs { - if existing := m.registeredAsgs[i]; existing.AwsRef == asg.AwsRef { - if reflect.DeepEqual(existing, asg) { - return existing - } + if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists { + if reflect.DeepEqual(existing, asg) { + return existing + } - klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) + klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) - // Explicit registered groups should always use the manually provided min/max - // values and the not the ones returned by the API - if !m.explicitlyConfigured[asg.AwsRef] { - existing.minSize = asg.minSize - existing.maxSize = asg.maxSize - } + // Explicit registered groups should always use the manually provided min/max + // values and the not the ones returned by the API + if !m.explicitlyConfigured[asg.AwsRef] { + existing.minSize = asg.minSize + existing.maxSize = asg.maxSize + } - existing.curSize = asg.curSize + existing.curSize = asg.curSize - // Those information are mainly required to create templates when scaling - // from zero - existing.AvailabilityZones = asg.AvailabilityZones - existing.LaunchConfigurationName = asg.LaunchConfigurationName - existing.LaunchTemplate = asg.LaunchTemplate - existing.MixedInstancesPolicy = asg.MixedInstancesPolicy - existing.Tags = asg.Tags + // Those information are mainly required to create templates when scaling + // from zero + existing.AvailabilityZones = asg.AvailabilityZones + existing.LaunchConfigurationName = asg.LaunchConfigurationName + existing.LaunchTemplate = asg.LaunchTemplate + existing.MixedInstancesPolicy = asg.MixedInstancesPolicy + existing.Tags = asg.Tags - return existing - } + return existing } klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name) - m.registeredAsgs = append(m.registeredAsgs, asg) + m.registeredAsgs[asg.AwsRef] = asg return asg } // Unregister ASG. Returns the unregistered ASG. func (m *asgCache) unregister(a *asg) *asg { - updated := make([]*asg, 0, len(m.registeredAsgs)) - var changed *asg - for _, existing := range m.registeredAsgs { - if existing.AwsRef == a.AwsRef { - klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) - changed = a - continue - } - updated = append(updated, existing) + if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists { + klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) + delete(m.registeredAsgs, a.AwsRef) } - m.registeredAsgs = updated - return changed + return a } func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { @@ -182,7 +177,7 @@ func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { } // Get returns the currently registered ASGs -func (m *asgCache) Get() []*asg { +func (m *asgCache) Get() map[AwsRef]*asg { m.mutex.Lock() defer m.mutex.Unlock() @@ -217,6 +212,17 @@ func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) { return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref) } +func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if status, found := m.instanceStatus[ref]; found { + return status, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -239,6 +245,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions + asg.lastUpdateTime = start asg.curSize = size return nil @@ -358,6 +365,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) + newInstanceStatusMap := make(map[AwsInstanceRef]*string) // Build list of known ASG names refreshNames, err := m.buildAsgNames() @@ -394,6 +402,7 @@ func (m *asgCache) regenerate() error { ref := m.buildInstanceRefFromAWS(instance) newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref + newInstanceStatusMap[ref] = instance.HealthStatus } } @@ -411,30 +420,73 @@ func (m *asgCache) regenerate() error { m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache + m.instanceStatus = newInstanceStatusMap return nil } func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group { for _, g := range groups { desired := *g.DesiredCapacity - real := int64(len(g.Instances)) - if desired <= real { + realInstances := int64(len(g.Instances)) + if desired <= realInstances { continue } - for i := real; i < desired; i++ { + klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ + "Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired) + + healthStatus := "" + isAvailable, err := m.isNodeGroupAvailable(g) + if err != nil { + klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err) + } else if !isAvailable { + klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName) + healthStatus = placeholderUnfulfillableStatus + } + + for i := realInstances; i < desired; i++ { id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ "Creating placeholder instance with ID %s.", *g.AutoScalingGroupName, real, desired, id) g.Instances = append(g.Instances, &autoscaling.Instance{ InstanceId: &id, AvailabilityZone: g.AvailabilityZones[0], + HealthStatus: &healthStatus, }) } } return groups } +func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: group.AutoScalingGroupName, + } + + start := time.Now() + response, err := m.awsService.DescribeScalingActivities(input) + observeAWSRequest("DescribeScalingActivities", err, start) + if err != nil { + return true, err // If we can't describe the scaling activities we assume the node group is available + } + + for _, activity := range response.Activities { + asgRef := AwsRef{Name: *group.AutoScalingGroupName} + if a, ok := m.registeredAsgs[asgRef]; ok { + lut := a.lastUpdateTime + if activity.StartTime.Before(lut) { + break + } else if *activity.StatusCode == "Failed" { + klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity) + return false, nil + } + } else { + klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name) + } + } + return true, nil +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index e9f9ad1ab79e..c5078b03b5b7 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -17,8 +17,12 @@ limitations under the License. package aws import ( + "errors" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/stretchr/testify/assert" ) @@ -46,3 +50,119 @@ func validateAsg(t *testing.T, asg *asg, name string, minSize int, maxSize int) assert.Equal(t, minSize, asg.minSize) assert.Equal(t, maxSize, asg.maxSize) } + +func TestCreatePlaceholders(t *testing.T) { + registeredAsgName := aws.String("test-asg") + registeredAsgRef := AwsRef{Name: *registeredAsgName} + + cases := []struct { + name string + desiredCapacity *int64 + activities []*autoscaling.Activity + groupLastUpdateTime time.Time + describeErr error + asgToCheck *string + }{ + { + name: "add placeholders successful", + desiredCapacity: aws.Int64(10), + }, + { + name: "no placeholders needed", + desiredCapacity: aws.Int64(0), + }, + { + name: "DescribeScalingActivities failed", + desiredCapacity: aws.Int64(1), + describeErr: errors.New("timeout"), + }, + { + name: "early abort if AWS scaling up fails", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + }, + { + name: "AWS scaling failed event before CA scale_up", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(9, 0)), + }, + }, + groupLastUpdateTime: time.Unix(10, 0), + }, + { + name: "asg not registered", + desiredCapacity: aws.Int64(10), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + asgToCheck: aws.String("unregisteredAsgName"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + shouldCallDescribeScalingActivities := true + if *tc.desiredCapacity == int64(0) { + shouldCallDescribeScalingActivities = false + } + + asgName := registeredAsgName + if tc.asgToCheck != nil { + asgName = tc.asgToCheck + } + + a := &autoScalingMock{} + if shouldCallDescribeScalingActivities { + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: asgName, + }).Return( + &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, + tc.describeErr, + ).Once() + } + + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + ec2I: nil, + }, + registeredAsgs: map[AwsRef]*asg{ + registeredAsgRef: { + AwsRef: registeredAsgRef, + lastUpdateTime: tc.groupLastUpdateTime, + }, + }, + } + + groups := []*autoscaling.Group{ + { + AutoScalingGroupName: asgName, + AvailabilityZones: []*string{aws.String("westeros-1a")}, + DesiredCapacity: tc.desiredCapacity, + Instances: []*autoscaling.Instance{}, + }, + } + asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) + assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) + } else if len(groups[0].Instances) > 0 { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") + } + a.AssertExpectations(t) + }) + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index ebc61d477401..3126c491d997 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -84,12 +84,12 @@ func (aws *awsCloudProvider) GetAvailableGPUTypes() map[string]struct{} { // NodeGroups returns all node groups configured for this cloud provider. func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { asgs := aws.awsManager.getAsgs() - ngs := make([]cloudprovider.NodeGroup, len(asgs)) - for i, asg := range asgs { - ngs[i] = &AwsNodeGroup{ + ngs := make([]cloudprovider.NodeGroup, 0, len(asgs)) + for _, asg := range asgs { + ngs = append(ngs, &AwsNodeGroup{ asg: asg, awsManager: aws.awsManager, - } + }) } return ngs @@ -309,7 +309,24 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { - instances[i] = cloudprovider.Instance{Id: asgNode.ProviderID} + var status *cloudprovider.InstanceStatus + instanceStatusString, err := ng.awsManager.GetInstanceStatus(asgNode) + if err != nil { + klog.V(4).Infof("Could not get instance status, continuing anyways: %v", err) + } else if instanceStatusString != nil && *instanceStatusString == placeholderUnfulfillableStatus { + status = &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: placeholderUnfulfillableStatus, + ErrorMessage: "AWS cannot provision any more instances for this node group", + }, + } + } + instances[i] = cloudprovider.Instance{ + Id: asgNode.ProviderID, + Status: status, + } } return instances, nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index d92be0060cf3..e6b6d4137608 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -29,7 +29,7 @@ import ( var testAwsManager = &AwsManager{ asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), interrupt: make(chan struct{}), @@ -43,7 +43,7 @@ func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2 return &AwsManager{ awsService: awsService, asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), asgInstanceTypeCache: newAsgInstanceTypeCache(&awsService), @@ -462,6 +462,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { expectedInstancesCount = 1 }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + provider.Refresh() initialSize, err := asgs[0].TargetSize() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 6faee7bb5965..5352d5794f49 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -270,7 +270,7 @@ func (m *AwsManager) Cleanup() { m.asgCache.Cleanup() } -func (m *AwsManager) getAsgs() []*asg { +func (m *AwsManager) getAsgs() map[AwsRef]*asg { return m.asgCache.Get() } @@ -293,6 +293,11 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +// GetInstanceStatus returns the status of ASG nodes +func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { + return m.asgCache.InstanceStatus(ref) +} + func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 0c6d03d11f03..890c80075d6c 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -259,6 +259,7 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { func TestFetchExplicitAsgs(t *testing.T) { min, max, groupname := 1, 10, "coolasg" + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} a.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ @@ -291,6 +292,12 @@ func TestFetchExplicitAsgs(t *testing.T) { }}, false) }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. // The intention is to test that the asgs.Register method will update @@ -309,7 +316,7 @@ func TestFetchExplicitAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) } func TestGetASGTemplate(t *testing.T) { @@ -405,6 +412,7 @@ func TestGetASGTemplate(t *testing.T) { func TestFetchAutoAsgs(t *testing.T) { min, max := 1, 10 groupname, tags := "coolasg", []string{"tag", "anothertag"} + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} // Lookup groups associated with tags @@ -448,6 +456,12 @@ func TestFetchAutoAsgs(t *testing.T) { }}}, false) }).Return(nil).Twice() + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, } @@ -462,7 +476,7 @@ func TestFetchAutoAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) // Simulate the previously discovered ASG disappearing a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index 3ef87f91fa74..5c23da622bbb 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -30,6 +30,7 @@ import ( type autoScalingI interface { DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index 86c12efab308..c16c290b104a 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -42,6 +42,11 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } +func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), args.Error(1) +} + func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { args := a.Called(i, fn) return args.Error(0) diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go index 02d71823b285..f67b0cc3791f 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go @@ -84,7 +84,7 @@ func (c *jitterClock) Since(ts time.Time) time.Duration { return since } -func (es instanceTypeExpirationStore) populate(autoscalingGroups []*asg) error { +func (es instanceTypeExpirationStore) populate(autoscalingGroups map[AwsRef]*asg) error { asgsToQuery := []*asg{} if c, ok := es.jitterClock.(*jitterClock); ok { diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go index 45fcc844a9f4..7b9f56b77c9b 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go @@ -79,9 +79,10 @@ func TestLTVersionChange(t *testing.T) { m := newAsgInstanceTypeCacheWithClock(&awsWrapper{a, e}, fakeClock, fakeStore) for i := 0; i < 2; i++ { - err := m.populate([]*asg{ - { - AwsRef: AwsRef{Name: asgName}, + asgRef := AwsRef{Name: asgName} + err := m.populate(map[AwsRef]*asg{ + asgRef: { + AwsRef: asgRef, LaunchTemplate: &launchTemplate{ name: ltName, version: aws.StringValue(ltVersions[i]),