From e74e4f93b2446bed40f2d060ac45ba5c0d16d21a Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Thu, 3 Mar 2022 13:01:17 -0800 Subject: [PATCH] Merge pull request #4489 from airbnb/drmorr--early-abort-if-aws-node-group-no-capacity Early abort if AWS node group has no capacity --- .../cloudprovider/aws/CA_with_AWS_IAM_OIDC.md | 1 + .../cloudprovider/aws/README.md | 36 +++++ .../cloudprovider/aws/auto_scaling_groups.go | 134 ++++++++++++------ .../aws/auto_scaling_groups_test.go | 120 ++++++++++++++++ .../cloudprovider/aws/aws_cloud_provider.go | 27 +++- .../aws/aws_cloud_provider_test.go | 10 +- .../cloudprovider/aws/aws_manager.go | 7 +- .../cloudprovider/aws/aws_manager_test.go | 18 ++- .../cloudprovider/aws/aws_wrapper.go | 1 + .../cloudprovider/aws/aws_wrapper_test.go | 5 + .../cloudprovider/aws/instance_type_cache.go | 2 +- .../aws/instance_type_cache_test.go | 7 +- 12 files changed, 312 insertions(+), 56 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md index 217bd3d505eb..53db71aa6cc5 100644 --- a/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md +++ b/cluster-autoscaler/cloudprovider/aws/CA_with_AWS_IAM_OIDC.md @@ -65,6 +65,7 @@ Note: The keys for the tags that you entered don't have values. Cluster Autoscal "autoscaling:DescribeAutoScalingGroups", "autoscaling:DescribeAutoScalingInstances", "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", "autoscaling:DescribeTags", "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup" diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index 2076f0fb7ba2..1fbd5a2c1be8 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -48,6 +48,42 @@ by specifying Auto Scaling Group ARNs in the `Resource` list of the policy. More information can be found [here](https://docs.aws.amazon.com/autoscaling/latest/userguide/control-access-using-iam.html#policy-auto-scaling-resources). +*NOTE:* The below policies/arguments to the Cluster Autoscaler need to be modified as appropriate +for the names of your ASGs, as well as account ID and AWS region before being used. + +The following policy provides the minimum privileges necessary for Cluster Autoscaler to run. +When using this policy, you cannot use autodiscovery of ASGs. In addition, it restricts the +IAM permissions to the node groups the Cluster Autoscaler is configured to scale. + +This in turn means that you must pass the following arguments to the Cluster Autoscaler +binary, replacing min and max node counts and the ASG: + +```bash +--aws-use-static-instance-list=false +--nodes=1:100:exampleASG1 +--nodes=1:100:exampleASG2 +``` + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups", + "autoscaling:DescribeAutoScalingInstances", + "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", + "autoscaling:SetDesiredCapacity", + "autoscaling:TerminateInstanceInAutoScalingGroup" + ], + "Resource": ["arn:aws:autoscaling:${YOUR_CLUSTER_AWS_REGION}:${YOUR_AWS_ACCOUNT_ID}:autoScalingGroup:*:autoScalingGroupName/${YOUR_ASG_NAME}"] + } + ] +} +``` + ### Using OIDC Federated Authentication OIDC federated authentication allows your service to assume an IAM role and interact with AWS services without having to store credentials as environment variables. For an example of how to use AWS IAM OIDC with the Cluster Autoscaler please see [here](CA_with_AWS_IAM_OIDC.md). diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index ebcf41000b1f..cf719e23532f 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -31,14 +31,16 @@ import ( ) const ( - scaleToZeroSupported = true - placeholderInstanceNamePrefix = "i-placeholder" + scaleToZeroSupported = true + placeholderInstanceNamePrefix = "i-placeholder" + placeholderUnfulfillableStatus = "placeholder-cannot-be-fulfilled" ) type asgCache struct { - registeredAsgs []*asg + registeredAsgs map[AwsRef]*asg asgToInstances map[AwsRef][]AwsInstanceRef instanceToAsg map[AwsInstanceRef]*asg + instanceStatus map[AwsInstanceRef]*string asgInstanceTypeCache *instanceTypeExpirationStore mutex sync.Mutex awsService *awsWrapper @@ -62,9 +64,10 @@ type mixedInstancesPolicy struct { type asg struct { AwsRef - minSize int - maxSize int - curSize int + minSize int + maxSize int + curSize int + lastUpdateTime time.Time AvailabilityZones []string LaunchConfigurationName string @@ -75,10 +78,11 @@ type asg struct { func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { registry := &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + instanceStatus: make(map[AwsInstanceRef]*string), asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, @@ -121,53 +125,44 @@ func (m *asgCache) parseExplicitAsgs(specs []string) error { // Register ASG. Returns the registered ASG. func (m *asgCache) register(asg *asg) *asg { - for i := range m.registeredAsgs { - if existing := m.registeredAsgs[i]; existing.AwsRef == asg.AwsRef { - if reflect.DeepEqual(existing, asg) { - return existing - } + if existing, asgExists := m.registeredAsgs[asg.AwsRef]; asgExists { + if reflect.DeepEqual(existing, asg) { + return existing + } - klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) + klog.V(4).Infof("Updating ASG %s", asg.AwsRef.Name) - // Explicit registered groups should always use the manually provided min/max - // values and the not the ones returned by the API - if !m.explicitlyConfigured[asg.AwsRef] { - existing.minSize = asg.minSize - existing.maxSize = asg.maxSize - } + // Explicit registered groups should always use the manually provided min/max + // values and the not the ones returned by the API + if !m.explicitlyConfigured[asg.AwsRef] { + existing.minSize = asg.minSize + existing.maxSize = asg.maxSize + } - existing.curSize = asg.curSize + existing.curSize = asg.curSize - // Those information are mainly required to create templates when scaling - // from zero - existing.AvailabilityZones = asg.AvailabilityZones - existing.LaunchConfigurationName = asg.LaunchConfigurationName - existing.LaunchTemplate = asg.LaunchTemplate - existing.MixedInstancesPolicy = asg.MixedInstancesPolicy - existing.Tags = asg.Tags + // Those information are mainly required to create templates when scaling + // from zero + existing.AvailabilityZones = asg.AvailabilityZones + existing.LaunchConfigurationName = asg.LaunchConfigurationName + existing.LaunchTemplate = asg.LaunchTemplate + existing.MixedInstancesPolicy = asg.MixedInstancesPolicy + existing.Tags = asg.Tags - return existing - } + return existing } klog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name) - m.registeredAsgs = append(m.registeredAsgs, asg) + m.registeredAsgs[asg.AwsRef] = asg return asg } // Unregister ASG. Returns the unregistered ASG. func (m *asgCache) unregister(a *asg) *asg { - updated := make([]*asg, 0, len(m.registeredAsgs)) - var changed *asg - for _, existing := range m.registeredAsgs { - if existing.AwsRef == a.AwsRef { - klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) - changed = a - continue - } - updated = append(updated, existing) + if _, asgExists := m.registeredAsgs[a.AwsRef]; asgExists { + klog.V(1).Infof("Unregistered ASG %s", a.AwsRef.Name) + delete(m.registeredAsgs, a.AwsRef) } - m.registeredAsgs = updated - return changed + return a } func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { @@ -184,7 +179,7 @@ func (m *asgCache) buildAsgFromSpec(spec string) (*asg, error) { } // Get returns the currently registered ASGs -func (m *asgCache) Get() []*asg { +func (m *asgCache) Get() map[AwsRef]*asg { m.mutex.Lock() defer m.mutex.Unlock() @@ -226,6 +221,17 @@ func (m *asgCache) InstancesByAsg(ref AwsRef) ([]AwsInstanceRef, error) { return nil, fmt.Errorf("error while looking for instances of ASG: %s", ref) } +func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if status, found := m.instanceStatus[ref]; found { + return status, nil + } + + return nil, fmt.Errorf("could not find instance %v", ref) +} + func (m *asgCache) SetAsgSize(asg *asg, size int) error { m.mutex.Lock() defer m.mutex.Unlock() @@ -248,6 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions + asg.lastUpdateTime = start asg.curSize = size return nil @@ -367,6 +374,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) + newInstanceStatusMap := make(map[AwsInstanceRef]*string) // Build list of known ASG names refreshNames, err := m.buildAsgNames() @@ -403,6 +411,7 @@ func (m *asgCache) regenerate() error { ref := m.buildInstanceRefFromAWS(instance) newInstanceToAsgCache[ref] = asg newAsgToInstancesCache[asg.AwsRef][i] = ref + newInstanceStatusMap[ref] = instance.HealthStatus } } @@ -431,6 +440,7 @@ func (m *asgCache) regenerate() error { m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache m.autoscalingOptions = newAutoscalingOptions + m.instanceStatus = newInstanceStatusMap return nil } @@ -444,17 +454,57 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+ "Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired) + + healthStatus := "" + isAvailable, err := m.isNodeGroupAvailable(g) + if err != nil { + klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err) + } else if !isAvailable { + klog.Warningf("Instance group %s cannot provision any more nodes!", *g.AutoScalingGroupName) + healthStatus = placeholderUnfulfillableStatus + } + for i := realInstances; i < desired; i++ { id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i) g.Instances = append(g.Instances, &autoscaling.Instance{ InstanceId: &id, AvailabilityZone: g.AvailabilityZones[0], + HealthStatus: &healthStatus, }) } } return groups } +func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: group.AutoScalingGroupName, + } + + start := time.Now() + response, err := m.awsService.DescribeScalingActivities(input) + observeAWSRequest("DescribeScalingActivities", err, start) + if err != nil { + return true, err // If we can't describe the scaling activities we assume the node group is available + } + + for _, activity := range response.Activities { + asgRef := AwsRef{Name: *group.AutoScalingGroupName} + if a, ok := m.registeredAsgs[asgRef]; ok { + lut := a.lastUpdateTime + if activity.StartTime.Before(lut) { + break + } else if *activity.StatusCode == "Failed" { + klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity) + return false, nil + } + } else { + klog.V(4).Infof("asg %v is not registered yet, skipping DescribeScalingActivities check", asgRef.Name) + } + } + return true, nil +} + func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { spec := dynamic.NodeGroupSpec{ Name: aws.StringValue(g.AutoScalingGroupName), diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index e9f9ad1ab79e..c5078b03b5b7 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -17,8 +17,12 @@ limitations under the License. package aws import ( + "errors" "testing" + "time" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/stretchr/testify/assert" ) @@ -46,3 +50,119 @@ func validateAsg(t *testing.T, asg *asg, name string, minSize int, maxSize int) assert.Equal(t, minSize, asg.minSize) assert.Equal(t, maxSize, asg.maxSize) } + +func TestCreatePlaceholders(t *testing.T) { + registeredAsgName := aws.String("test-asg") + registeredAsgRef := AwsRef{Name: *registeredAsgName} + + cases := []struct { + name string + desiredCapacity *int64 + activities []*autoscaling.Activity + groupLastUpdateTime time.Time + describeErr error + asgToCheck *string + }{ + { + name: "add placeholders successful", + desiredCapacity: aws.Int64(10), + }, + { + name: "no placeholders needed", + desiredCapacity: aws.Int64(0), + }, + { + name: "DescribeScalingActivities failed", + desiredCapacity: aws.Int64(1), + describeErr: errors.New("timeout"), + }, + { + name: "early abort if AWS scaling up fails", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + }, + { + name: "AWS scaling failed event before CA scale_up", + desiredCapacity: aws.Int64(1), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(9, 0)), + }, + }, + groupLastUpdateTime: time.Unix(10, 0), + }, + { + name: "asg not registered", + desiredCapacity: aws.Int64(10), + activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StartTime: aws.Time(time.Unix(10, 0)), + }, + }, + groupLastUpdateTime: time.Unix(9, 0), + asgToCheck: aws.String("unregisteredAsgName"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + shouldCallDescribeScalingActivities := true + if *tc.desiredCapacity == int64(0) { + shouldCallDescribeScalingActivities = false + } + + asgName := registeredAsgName + if tc.asgToCheck != nil { + asgName = tc.asgToCheck + } + + a := &autoScalingMock{} + if shouldCallDescribeScalingActivities { + a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: asgName, + }).Return( + &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, + tc.describeErr, + ).Once() + } + + asgCache := &asgCache{ + awsService: &awsWrapper{ + autoScalingI: a, + ec2I: nil, + }, + registeredAsgs: map[AwsRef]*asg{ + registeredAsgRef: { + AwsRef: registeredAsgRef, + lastUpdateTime: tc.groupLastUpdateTime, + }, + }, + } + + groups := []*autoscaling.Group{ + { + AutoScalingGroupName: asgName, + AvailabilityZones: []*string{aws.String("westeros-1a")}, + DesiredCapacity: tc.desiredCapacity, + Instances: []*autoscaling.Instance{}, + }, + } + asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) + assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) + } else if len(groups[0].Instances) > 0 { + assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") + } + a.AssertExpectations(t) + }) + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index de067ff19bcc..e7c197b7aa4d 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -85,12 +85,12 @@ func (aws *awsCloudProvider) GetAvailableGPUTypes() map[string]struct{} { // NodeGroups returns all node groups configured for this cloud provider. func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { asgs := aws.awsManager.getAsgs() - ngs := make([]cloudprovider.NodeGroup, len(asgs)) - for i, asg := range asgs { - ngs[i] = &AwsNodeGroup{ + ngs := make([]cloudprovider.NodeGroup, 0, len(asgs)) + for _, asg := range asgs { + ngs = append(ngs, &AwsNodeGroup{ asg: asg, awsManager: aws.awsManager, - } + }) } return ngs @@ -319,7 +319,24 @@ func (ng *AwsNodeGroup) Nodes() ([]cloudprovider.Instance, error) { instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { - instances[i] = cloudprovider.Instance{Id: asgNode.ProviderID} + var status *cloudprovider.InstanceStatus + instanceStatusString, err := ng.awsManager.GetInstanceStatus(asgNode) + if err != nil { + klog.V(4).Infof("Could not get instance status, continuing anyways: %v", err) + } else if instanceStatusString != nil && *instanceStatusString == placeholderUnfulfillableStatus { + status = &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: placeholderUnfulfillableStatus, + ErrorMessage: "AWS cannot provision any more instances for this node group", + }, + } + } + instances[i] = cloudprovider.Instance{ + Id: asgNode.ProviderID, + Status: status, + } } return instances, nil } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index ff7c82fb4010..76d201b6d476 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -31,7 +31,7 @@ import ( var testAwsManager = &AwsManager{ asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), interrupt: make(chan struct{}), @@ -45,7 +45,7 @@ func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2 return &AwsManager{ awsService: awsService, asgCache: &asgCache{ - registeredAsgs: make([]*asg, 0), + registeredAsgs: make(map[AwsRef]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), asgInstanceTypeCache: newAsgInstanceTypeCache(&awsService), @@ -482,6 +482,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { expectedInstancesCount = 1 }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + provider.Refresh() initialSize, err := asgs[0].TargetSize() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 128fc9c9e761..bc8acc9d8182 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -273,7 +273,7 @@ func (m *AwsManager) Cleanup() { m.asgCache.Cleanup() } -func (m *AwsManager) getAsgs() []*asg { +func (m *AwsManager) getAsgs() map[AwsRef]*asg { return m.asgCache.Get() } @@ -301,6 +301,11 @@ func (m *AwsManager) GetAsgNodes(ref AwsRef) ([]AwsInstanceRef, error) { return m.asgCache.InstancesByAsg(ref) } +// GetInstanceStatus returns the status of ASG nodes +func (m *AwsManager) GetInstanceStatus(ref AwsInstanceRef) (*string, error) { + return m.asgCache.InstanceStatus(ref) +} + func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { if len(asg.AvailabilityZones) < 1 { return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 646ef763e4e4..b454da759027 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -332,6 +332,7 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { func TestFetchExplicitAsgs(t *testing.T) { min, max, groupname := 1, 10, "coolasg" + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} a.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ @@ -364,6 +365,12 @@ func TestFetchExplicitAsgs(t *testing.T) { }}, false) }).Return(nil) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ // Register the same node group twice with different max nodes. // The intention is to test that the asgs.Register method will update @@ -382,7 +389,7 @@ func TestFetchExplicitAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) } func TestGetASGTemplate(t *testing.T) { @@ -478,6 +485,7 @@ func TestGetASGTemplate(t *testing.T) { func TestFetchAutoAsgs(t *testing.T) { min, max := 1, 10 groupname, tags := "coolasg", []string{"tag", "anothertag"} + asgRef := AwsRef{Name: groupname} a := &autoScalingMock{} // Lookup groups associated with tags @@ -521,6 +529,12 @@ func TestFetchAutoAsgs(t *testing.T) { }}}, false) }).Return(nil).Twice() + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("coolasg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + do := cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, } @@ -535,7 +549,7 @@ func TestFetchAutoAsgs(t *testing.T) { asgs := m.asgCache.Get() assert.Equal(t, 1, len(asgs)) - validateAsg(t, asgs[0], groupname, min, max) + validateAsg(t, asgs[asgRef], groupname, min, max) // Simulate the previously discovered ASG disappearing a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index 3ef87f91fa74..5c23da622bbb 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -30,6 +30,7 @@ import ( type autoScalingI interface { DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go index 86c12efab308..c16c290b104a 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -42,6 +42,11 @@ func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLa return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } +func (a *autoScalingMock) DescribeScalingActivities(i *autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeScalingActivitiesOutput), args.Error(1) +} + func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { args := a.Called(i, fn) return args.Error(0) diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go index 02d71823b285..f67b0cc3791f 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go @@ -84,7 +84,7 @@ func (c *jitterClock) Since(ts time.Time) time.Duration { return since } -func (es instanceTypeExpirationStore) populate(autoscalingGroups []*asg) error { +func (es instanceTypeExpirationStore) populate(autoscalingGroups map[AwsRef]*asg) error { asgsToQuery := []*asg{} if c, ok := es.jitterClock.(*jitterClock); ok { diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go index 45fcc844a9f4..7b9f56b77c9b 100644 --- a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go @@ -79,9 +79,10 @@ func TestLTVersionChange(t *testing.T) { m := newAsgInstanceTypeCacheWithClock(&awsWrapper{a, e}, fakeClock, fakeStore) for i := 0; i < 2; i++ { - err := m.populate([]*asg{ - { - AwsRef: AwsRef{Name: asgName}, + asgRef := AwsRef{Name: asgName} + err := m.populate(map[AwsRef]*asg{ + asgRef: { + AwsRef: asgRef, LaunchTemplate: &launchTemplate{ name: ltName, version: aws.StringValue(ltVersions[i]),