From 7f025094e1b5fef8d678baa12b0687abf969ad54 Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Wed, 27 Oct 2021 10:35:25 -0700 Subject: [PATCH] Merge pull request #4073 from airbnb/drmorr--airbnb--cache-launch-templates cache ASG InstanceTypes for AWS --- .../cloudprovider/aws/auto_scaling.go | 263 ----------------- .../cloudprovider/aws/auto_scaling_groups.go | 83 +++--- .../aws/auto_scaling_groups_test.go | 64 ----- .../cloudprovider/aws/auto_scaling_test.go | 84 ------ .../aws/aws_cloud_provider_test.go | 153 ++++------ .../cloudprovider/aws/aws_manager.go | 57 +--- .../cloudprovider/aws/aws_manager_test.go | 132 ++------- .../cloudprovider/aws/aws_wrapper.go | 264 +++++++++++++++++ .../cloudprovider/aws/aws_wrapper_test.go | 269 ++++++++++++++++++ cluster-autoscaler/cloudprovider/aws/ec2.go | 57 ---- .../cloudprovider/aws/instance_type_cache.go | 128 +++++++++ .../aws/instance_type_cache_test.go | 103 +++++++ 12 files changed, 892 insertions(+), 765 deletions(-) delete mode 100644 cluster-autoscaler/cloudprovider/aws/auto_scaling.go delete mode 100644 cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go create mode 100644 cluster-autoscaler/cloudprovider/aws/aws_wrapper.go create mode 100644 cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go delete mode 100644 cluster-autoscaler/cloudprovider/aws/ec2.go create mode 100644 cluster-autoscaler/cloudprovider/aws/instance_type_cache.go create mode 100644 cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling.go deleted file mode 100644 index 00d50df0b873..000000000000 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling.go +++ /dev/null @@ -1,263 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package aws - -import ( - "fmt" - "sync" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/autoscaling" - "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/client-go/tools/cache" - "k8s.io/klog" -) - -const ( - launchConfigurationCachedTTL = time.Minute * 20 - cacheMinTTL = 120 - cacheMaxTTL = 600 -) - -// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA -type autoScaling interface { - DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error - DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) - DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error - SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) - TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) -} - -// autoScalingWrapper provides several utility methods over the auto-scaling service provided by AWS SDK -type autoScalingWrapper struct { - autoScaling - launchConfigurationInstanceTypeCache *expirationStore -} - -// expirationStore cache the launch configuration with their instance type. -// The store expires its keys based on a TTL. This TTL can have a jitter applied to it. -// This allows to get a better repartition of the AWS queries. -type expirationStore struct { - cache.Store - jitterClock *jitterClock -} - -type instanceTypeCachedObject struct { - name string - instanceType string -} - -type jitterClock struct { - clock.Clock - - jitter bool - sync.RWMutex -} - -func newLaunchConfigurationInstanceTypeCache() *expirationStore { - jc := &jitterClock{} - return &expirationStore{ - cache.NewExpirationStore(func(obj interface{}) (s string, e error) { - return obj.(instanceTypeCachedObject).name, nil - }, &cache.TTLPolicy{ - TTL: launchConfigurationCachedTTL, - Clock: jc, - }), - jc, - } -} - -func (c *jitterClock) Since(ts time.Time) time.Duration { - since := time.Since(ts) - c.RLock() - defer c.RUnlock() - if c.jitter { - return since + (time.Second * time.Duration(rand.IntnRange(cacheMinTTL, cacheMaxTTL))) - } - return since -} - -func (m autoScalingWrapper) getInstanceTypeByLCNames(launchConfigToQuery []*string) ([]*autoscaling.LaunchConfiguration, error) { - var launchConfigurations []*autoscaling.LaunchConfiguration - - for i := 0; i < len(launchConfigToQuery); i += 50 { - end := i + 50 - - if end > len(launchConfigToQuery) { - end = len(launchConfigToQuery) - } - params := &autoscaling.DescribeLaunchConfigurationsInput{ - LaunchConfigurationNames: launchConfigToQuery[i:end], - MaxRecords: aws.Int64(50), - } - r, err := m.DescribeLaunchConfigurations(params) - if err != nil { - return nil, err - } - launchConfigurations = append(launchConfigurations, r.LaunchConfigurations...) - for _, lc := range r.LaunchConfigurations { - _ = m.launchConfigurationInstanceTypeCache.Add(instanceTypeCachedObject{ - name: *lc.LaunchConfigurationName, - instanceType: *lc.InstanceType, - }) - } - } - return launchConfigurations, nil -} - -func (m autoScalingWrapper) getInstanceTypeByLCName(name string) (string, error) { - if obj, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(name); found { - return obj.(instanceTypeCachedObject).instanceType, nil - } - - launchConfigs, err := m.getInstanceTypeByLCNames([]*string{aws.String(name)}) - if err != nil { - klog.Errorf("Failed to query the launch configuration %s to get the instance type: %v", name, err) - return "", err - } - if len(launchConfigs) < 1 || launchConfigs[0].InstanceType == nil { - return "", fmt.Errorf("unable to get first LaunchConfiguration for %s", name) - } - return *launchConfigs[0].InstanceType, nil -} - -func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) { - if len(names) == 0 { - return nil, nil - } - - asgs := make([]*autoscaling.Group, 0) - - // AWS only accepts up to 50 ASG names as input, describe them in batches - for i := 0; i < len(names); i += maxAsgNamesPerDescribe { - end := i + maxAsgNamesPerDescribe - - if end > len(names) { - end = len(names) - } - - input := &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice(names[i:end]), - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - } - if err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { - asgs = append(asgs, output.AutoScalingGroups...) - // We return true while we want to be called with the next page of - // results, if any. - return true - }); err != nil { - return nil, err - } - } - - return asgs, nil -} - -func (m autoScalingWrapper) populateLaunchConfigurationInstanceTypeCache(autoscalingGroups []*autoscaling.Group) error { - var launchConfigToQuery []*string - - m.launchConfigurationInstanceTypeCache.jitterClock.Lock() - m.launchConfigurationInstanceTypeCache.jitterClock.jitter = true - m.launchConfigurationInstanceTypeCache.jitterClock.Unlock() - for _, asg := range autoscalingGroups { - if asg == nil { - continue - } - if asg.LaunchConfigurationName == nil { - continue - } - _, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(*asg.LaunchConfigurationName) - if found { - continue - } - launchConfigToQuery = append(launchConfigToQuery, asg.LaunchConfigurationName) - } - m.launchConfigurationInstanceTypeCache.jitterClock.Lock() - m.launchConfigurationInstanceTypeCache.jitterClock.jitter = false - m.launchConfigurationInstanceTypeCache.jitterClock.Unlock() - - // List expire old entries - _ = m.launchConfigurationInstanceTypeCache.List() - - if len(launchConfigToQuery) == 0 { - klog.V(4).Infof("%d launch configurations already in cache", len(autoscalingGroups)) - return nil - } - klog.V(4).Infof("%d launch configurations to query", len(launchConfigToQuery)) - - _, err := m.getInstanceTypeByLCNames(launchConfigToQuery) - if err != nil { - klog.Errorf("Failed to query %d launch configurations", len(launchConfigToQuery)) - return err - } - - klog.V(4).Infof("Successfully query %d launch configurations", len(launchConfigToQuery)) - return nil -} - -func (m *autoScalingWrapper) getAutoscalingGroupNamesByTags(kvs map[string]string) ([]string, error) { - // DescribeTags does an OR query when multiple filters on different tags are - // specified. In other words, DescribeTags returns [asg1, asg1] for keys - // [t1, t2] when there's only one asg tagged both t1 and t2. - filters := []*autoscaling.Filter{} - for key, value := range kvs { - filter := &autoscaling.Filter{ - Name: aws.String("key"), - Values: []*string{aws.String(key)}, - } - filters = append(filters, filter) - if value != "" { - filters = append(filters, &autoscaling.Filter{ - Name: aws.String("value"), - Values: []*string{aws.String(value)}, - }) - } - } - - tags := []*autoscaling.TagDescription{} - input := &autoscaling.DescribeTagsInput{ - Filters: filters, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - } - if err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool { - tags = append(tags, out.Tags...) - // We return true while we want to be called with the next page of - // results, if any. - return true - }); err != nil { - return nil, err - } - - // According to how DescribeTags API works, the result contains ASGs which - // not all but only subset of tags are associated. Explicitly select ASGs to - // which all the tags are associated so that we won't end up calling - // DescribeAutoScalingGroups API multiple times on an ASG. - asgNames := []string{} - asgNameOccurrences := make(map[string]int) - for _, t := range tags { - asgName := aws.StringValue(t.ResourceId) - occurrences := asgNameOccurrences[asgName] + 1 - if occurrences >= len(kvs) { - asgNames = append(asgNames, asgName) - } - asgNameOccurrences[asgName] = occurrences - } - - return asgNames, nil -} diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index cec6c3598781..eee0bb43eb0d 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -35,12 +35,13 @@ const ( ) type asgCache struct { - registeredAsgs []*asg - asgToInstances map[AwsRef][]AwsInstanceRef - instanceToAsg map[AwsInstanceRef]*asg - mutex sync.Mutex - service autoScalingWrapper - interrupt chan struct{} + registeredAsgs []*asg + asgToInstances map[AwsRef][]AwsInstanceRef + instanceToAsg map[AwsInstanceRef]*asg + asgInstanceTypeCache *instanceTypeExpirationStore + mutex sync.Mutex + awsService *awsWrapper + interrupt chan struct{} asgAutoDiscoverySpecs []asgAutoDiscoveryConfig explicitlyConfigured map[AwsRef]bool @@ -70,12 +71,13 @@ type asg struct { Tags []*autoscaling.TagDescription } -func newASGCache(service autoScalingWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { +func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) { registry := &asgCache{ registeredAsgs: make([]*asg, 0), - service: service, + awsService: awsService, asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + asgInstanceTypeCache: newAsgInstanceTypeCache(awsService), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, explicitlyConfigured: make(map[AwsRef]bool), @@ -88,6 +90,17 @@ func newASGCache(service autoScalingWrapper, explicitSpecs []string, autoDiscove return registry, nil } +// Use a function variable for ease of testing +var getInstanceTypeForAsg = func(m *asgCache, group *asg) (string, error) { + if obj, found, _ := m.asgInstanceTypeCache.GetByKey(group.AwsRef.Name); found { + return obj.(instanceTypeCachedObject).instanceType, nil + } else if result, err := m.awsService.getInstanceTypesForAsgs([]*asg{group}); err == nil { + return result[group.AwsRef.Name], nil + } + + return "", fmt.Errorf("Could not find instance type for %s", group.AwsRef.Name) +} + // Fetch explicitly configured ASGs. These ASGs should never be unregistered // during refreshes, even if they no longer exist in AWS. func (m *asgCache) parseExplicitAsgs(specs []string) error { @@ -217,7 +230,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { HonorCooldown: aws.Bool(false), } klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size) - _, err := m.service.SetDesiredCapacity(params) + _, err := m.awsService.SetDesiredCapacity(params) if err != nil { return err } @@ -270,7 +283,7 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { InstanceId: aws.String(instance.Name), ShouldDecrementDesiredCapacity: aws.Bool(true), } - resp, err := m.service.TerminateInstanceInAutoScalingGroup(params) + resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params) if err != nil { return err } @@ -294,7 +307,7 @@ func (m *asgCache) fetchAutoAsgNames() ([]string, error) { groupNames := make([]string, 0) for _, spec := range m.asgAutoDiscoverySpecs { - names, err := m.service.getAutoscalingGroupNamesByTags(spec.Tags) + names, err := m.awsService.getAutoscalingGroupNamesByTags(spec.Tags) if err != nil { return nil, fmt.Errorf("cannot autodiscover ASGs: %s", err) } @@ -341,7 +354,7 @@ func (m *asgCache) regenerate() error { newInstanceToAsgCache := make(map[AwsInstanceRef]*asg) newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef) - // Build list of knowns ASG names + // Build list of known ASG names refreshNames, err := m.buildAsgNames() if err != nil { return err @@ -349,16 +362,11 @@ func (m *asgCache) regenerate() error { // Fetch details of all ASGs klog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", refreshNames) - groups, err := m.service.getAutoscalingGroupsByNames(refreshNames) + groups, err := m.awsService.getAutoscalingGroupsByNames(refreshNames) if err != nil { return err } - err = m.service.populateLaunchConfigurationInstanceTypeCache(groups) - if err != nil { - klog.Warningf("Failed to fully populate all launchConfigurations: %v", err) - } - // If currently any ASG has more Desired than running Instances, introduce placeholders // for the instances to come up. This is required to track Desired instances that // will never come up, like with Spot Request that can't be fulfilled @@ -391,6 +399,11 @@ func (m *asgCache) regenerate() error { } } + err = m.asgInstanceTypeCache.populate(m.registeredAsgs) + if err != nil { + klog.Warningf("Failed to fully populate ASG->instanceType mapping: %v", err) + } + m.asgToInstances = newAsgToInstancesCache m.instanceToAsg = newInstanceToAsgCache return nil @@ -441,7 +454,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { } if g.LaunchTemplate != nil { - asg.LaunchTemplate = m.buildLaunchTemplateFromSpec(g.LaunchTemplate) + asg.LaunchTemplate = buildLaunchTemplateFromSpec(g.LaunchTemplate) } if g.MixedInstancesPolicy != nil { @@ -454,7 +467,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { } asg.MixedInstancesPolicy = &mixedInstancesPolicy{ - launchTemplate: m.buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification), + launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification), instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides), } } @@ -462,36 +475,6 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { return asg, nil } -func (m *asgCache) buildLaunchTemplateFromSpec(ltSpec *autoscaling.LaunchTemplateSpecification) *launchTemplate { - // NOTE(jaypipes): The LaunchTemplateSpecification.Version is a pointer to - // string. When the pointer is nil, EC2 AutoScaling API considers the value - // to be "$Default", however aws.StringValue(ltSpec.Version) will return an - // empty string (which is not considered the same as "$Default" or a nil - // string pointer. So, in order to not pass an empty string as the version - // for the launch template when we communicate with the EC2 AutoScaling API - // using the information in the launchTemplate, we store the string - // "$Default" here when the ltSpec.Version is a nil pointer. - // - // See: - // - // https://github.com/kubernetes/autoscaler/issues/1728 - // https://github.com/aws/aws-sdk-go/blob/81fad3b797f4a9bd1b452a5733dd465eefef1060/service/autoscaling/api.go#L10666-L10671 - // - // A cleaner alternative might be to make launchTemplate.version a string - // pointer instead of a string, or even store the aws-sdk-go's - // LaunchTemplateSpecification structs directly. - var version string - if ltSpec.Version == nil { - version = "$Default" - } else { - version = aws.StringValue(ltSpec.Version) - } - return &launchTemplate{ - name: aws.StringValue(ltSpec.LaunchTemplateName), - version: version, - } -} - func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef { providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId)) return AwsInstanceRef{ diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index 9efb50f7cc5d..e9f9ad1ab79e 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -20,9 +20,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - - sdkaws "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/autoscaling" ) func TestBuildAsg(t *testing.T) { @@ -49,64 +46,3 @@ func validateAsg(t *testing.T, asg *asg, name string, minSize int, maxSize int) assert.Equal(t, minSize, asg.minSize) assert.Equal(t, maxSize, asg.maxSize) } - -func TestBuildLaunchTemplateFromSpec(t *testing.T) { - assert := assert.New(t) - - units := []struct { - name string - in *autoscaling.LaunchTemplateSpecification - exp *launchTemplate - }{ - { - name: "non-default, specified version", - in: &autoscaling.LaunchTemplateSpecification{ - LaunchTemplateName: sdkaws.String("foo"), - Version: sdkaws.String("1"), - }, - exp: &launchTemplate{ - name: "foo", - version: "1", - }, - }, - { - name: "non-default, specified $Latest", - in: &autoscaling.LaunchTemplateSpecification{ - LaunchTemplateName: sdkaws.String("foo"), - Version: sdkaws.String("$Latest"), - }, - exp: &launchTemplate{ - name: "foo", - version: "$Latest", - }, - }, - { - name: "specified $Default", - in: &autoscaling.LaunchTemplateSpecification{ - LaunchTemplateName: sdkaws.String("foo"), - Version: sdkaws.String("$Default"), - }, - exp: &launchTemplate{ - name: "foo", - version: "$Default", - }, - }, - { - name: "no version specified", - in: &autoscaling.LaunchTemplateSpecification{ - LaunchTemplateName: sdkaws.String("foo"), - Version: nil, - }, - exp: &launchTemplate{ - name: "foo", - version: "$Default", - }, - }, - } - - cache := &asgCache{} - for _, unit := range units { - got := cache.buildLaunchTemplateFromSpec(unit.in) - assert.Equal(unit.exp, got) - } -} diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go deleted file mode 100644 index 90e5dba7e261..000000000000 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go +++ /dev/null @@ -1,84 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package aws - -import ( - "fmt" - "testing" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/autoscaling" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func TestMoreThen50Groups(t *testing.T) { - service := &AutoScalingMock{} - autoScalingWrapper := &autoScalingWrapper{ - autoScaling: service, - } - - // Generate 51 ASG names - names := make([]string, 51) - for i := 0; i < len(names); i++ { - names[i] = fmt.Sprintf("asg-%d", i) - } - - // First batch, first 50 elements - service.On("DescribeAutoScalingGroupsPages", - &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice(names[:50]), - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }, - mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), - ).Run(func(args mock.Arguments) { - fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) - fn(testNamedDescribeAutoScalingGroupsOutput("asg-1", 1, "test-instance-id"), false) - }).Return(nil) - - // Second batch, element 51 - service.On("DescribeAutoScalingGroupsPages", - &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{"asg-50"}), - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }, - mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), - ).Run(func(args mock.Arguments) { - fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) - fn(testNamedDescribeAutoScalingGroupsOutput("asg-2", 1, "test-instance-id"), false) - }).Return(nil) - - asgs, err := autoScalingWrapper.getAutoscalingGroupsByNames(names) - assert.Nil(t, err) - assert.Equal(t, len(asgs), 2) - assert.Equal(t, *asgs[0].AutoScalingGroupName, "asg-1") - assert.Equal(t, *asgs[1].AutoScalingGroupName, "asg-2") -} - -func TestLaunchConfigurationCache(t *testing.T) { - c := newLaunchConfigurationInstanceTypeCache() - err := c.Add(instanceTypeCachedObject{ - name: "123", - instanceType: "t2.medium", - }) - require.NoError(t, err) - obj, ok, err := c.GetByKey("123") - require.NoError(t, err) - require.True(t, ok) - require.Equal(t, "t2.medium", obj.(instanceTypeCachedObject).instanceType) -} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 587833779335..30498b78296e 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -21,88 +21,48 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" - "github.com/aws/aws-sdk-go/service/ec2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) -type AutoScalingMock struct { - mock.Mock -} - -func (a *AutoScalingMock) DescribeAutoScalingGroupsPages(i *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error { - args := a.Called(i, fn) - return args.Error(0) -} - -func (a *AutoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) { - args := a.Called(i) - return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil -} - -func (a *AutoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { - args := a.Called(i, fn) - return args.Error(0) -} - -func (a *AutoScalingMock) SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) { - args := a.Called(input) - return args.Get(0).(*autoscaling.SetDesiredCapacityOutput), nil -} - -func (a *AutoScalingMock) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) { - args := a.Called(input) - return args.Get(0).(*autoscaling.TerminateInstanceInAutoScalingGroupOutput), nil -} - -type EC2Mock struct { - mock.Mock -} - -func (e *EC2Mock) DescribeLaunchTemplateVersions(i *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) { - args := e.Called(i) - return args.Get(0).(*ec2.DescribeLaunchTemplateVersionsOutput), nil -} - -var testService = autoScalingWrapper{&AutoScalingMock{}, newLaunchConfigurationInstanceTypeCache()} - var testAwsManager = &AwsManager{ asgCache: &asgCache{ registeredAsgs: make([]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), interrupt: make(chan struct{}), - service: testService, + awsService: &testAwsService, }, - autoScalingService: testService, + awsService: testAwsService, } -func newTestAwsManagerWithService(service autoScaling, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager { - wrapper := autoScalingWrapper{service, newLaunchConfigurationInstanceTypeCache()} +func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2I, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager { + awsService := awsWrapper{mockAutoScaling, mockEC2} return &AwsManager{ - autoScalingService: wrapper, + awsService: awsService, asgCache: &asgCache{ registeredAsgs: make([]*asg, 0), asgToInstances: make(map[AwsRef][]AwsInstanceRef), instanceToAsg: make(map[AwsInstanceRef]*asg), + asgInstanceTypeCache: newAsgInstanceTypeCache(&awsService), explicitlyConfigured: make(map[AwsRef]bool), interrupt: make(chan struct{}), asgAutoDiscoverySpecs: autoDiscoverySpecs, - service: wrapper, + awsService: &awsService, }, } } -func newTestAwsManagerWithAsgs(t *testing.T, service autoScaling, specs []string) *AwsManager { - m := newTestAwsManagerWithService(service, nil) +func newTestAwsManagerWithAsgs(t *testing.T, mockAutoScaling autoScalingI, mockEC2 ec2I, specs []string) *AwsManager { + m := newTestAwsManagerWithMockServices(mockAutoScaling, mockEC2, nil) m.asgCache.parseExplicitAsgs(specs) return m } -func newTestAwsManagerWithAutoAsgs(t *testing.T, service autoScaling, specs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager { - m := newTestAwsManagerWithService(service, autoDiscoverySpecs) +func newTestAwsManagerWithAutoAsgs(t *testing.T, mockAutoScaling autoScalingI, mockEC2 ec2I, specs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager { + m := newTestAwsManagerWithMockServices(mockAutoScaling, mockEC2, autoDiscoverySpecs) m.asgCache.parseExplicitAsgs(specs) return m } @@ -154,7 +114,7 @@ func TestName(t *testing.T) { } func TestNodeGroups(t *testing.T) { - provider := testProvider(t, newTestAwsManagerWithAsgs(t, testService, []string{"1:5:test-asg"})) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, testAwsService, nil, []string{"1:5:test-asg"})) nodeGroups := provider.NodeGroups() assert.Equal(t, len(nodeGroups), 1) @@ -164,14 +124,14 @@ func TestNodeGroups(t *testing.T) { } func TestAutoDiscoveredNodeGroups(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAutoAsgs(t, service, []string{}, []asgAutoDiscoveryConfig{ + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAutoAsgs(t, a, nil, []string{}, []asgAutoDiscoveryConfig{ { Tags: map[string]string{"test": ""}, }, })) - service.On("DescribeTagsPages", + a.On("DescribeTagsPages", &autoscaling.DescribeTagsInput{ Filters: []*autoscaling.Filter{ {Name: aws.String("key"), Values: aws.StringSlice([]string{"test"})}, @@ -187,7 +147,7 @@ func TestAutoDiscoveredNodeGroups(t *testing.T) { }}, false) }).Return(nil).Once() - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"auto-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -213,10 +173,10 @@ func TestNodeGroupForNode(t *testing.T) { ProviderID: "aws:///us-east-1a/test-instance-id", }, } - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -241,7 +201,7 @@ func TestNodeGroupForNode(t *testing.T) { assert.NoError(t, err) assert.Equal(t, []cloudprovider.Instance{{Id: "aws:///us-east-1a/test-instance-id"}}, nodes) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) // test node in cluster that is not in a group managed by cluster autoscaler nodeNotInGroup := &apiv1.Node{ @@ -254,7 +214,7 @@ func TestNodeGroupForNode(t *testing.T) { assert.NoError(t, err) assert.Nil(t, group) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) } func TestNodeGroupForNodeWithNoProviderId(t *testing.T) { @@ -263,8 +223,8 @@ func TestNodeGroupForNodeWithNoProviderId(t *testing.T) { ProviderID: "", }, } - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) group, err := provider.NodeGroupForNode(node) assert.NoError(t, err) @@ -325,11 +285,11 @@ func TestAwsRefFromProviderId(t *testing.T) { } func TestTargetSize(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) asgs := provider.NodeGroups() - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -346,21 +306,21 @@ func TestTargetSize(t *testing.T) { assert.Equal(t, targetSize, 2) assert.NoError(t, err) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) } func TestIncreaseSize(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) asgs := provider.NodeGroups() - service.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ AutoScalingGroupName: aws.String(asgs[0].Id()), DesiredCapacity: aws.Int64(3), HonorCooldown: aws.Bool(false), }).Return(&autoscaling.SetDesiredCapacityOutput{}) - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -379,8 +339,8 @@ func TestIncreaseSize(t *testing.T) { err = asgs[0].IncreaseSize(1) assert.NoError(t, err) - service.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) @@ -388,11 +348,11 @@ func TestIncreaseSize(t *testing.T) { } func TestBelongs(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) asgs := provider.NodeGroups() - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Id()}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -412,7 +372,7 @@ func TestBelongs(t *testing.T) { } _, err := asgs[0].(*AwsNodeGroup).Belongs(invalidNode) assert.Error(t, err) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) validNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ @@ -425,15 +385,15 @@ func TestBelongs(t *testing.T) { // As "test-instance-id" is already known to be managed by test-asg since // the first `Belongs` call, no additional DescribAutoScalingGroupsPages // call is made. - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) } func TestDeleteNodes(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) asgs := provider.NodeGroups() - service.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String("test-instance-id"), ShouldDecrementDesiredCapacity: aws.Bool(true), }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ @@ -442,7 +402,7 @@ func TestDeleteNodes(t *testing.T) { // Look up the current number of instances... var expectedInstancesCount int64 = 2 - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -468,8 +428,8 @@ func TestDeleteNodes(t *testing.T) { } err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) - service.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) @@ -477,11 +437,11 @@ func TestDeleteNodes(t *testing.T) { } func TestDeleteNodesWithPlaceholder(t *testing.T) { - service := &AutoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) asgs := provider.NodeGroups() - service.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ AutoScalingGroupName: aws.String(asgs[0].Id()), DesiredCapacity: aws.Int64(1), HonorCooldown: aws.Bool(false), @@ -489,7 +449,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { // Look up the current number of instances... var expectedInstancesCount int64 = 2 - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -515,8 +475,8 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { } err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) - service.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) @@ -524,12 +484,12 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { } func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) { - service := &AutoScalingMock{} - manager := newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"}) + a := &autoScalingMock{} + manager := newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"}) provider := testProvider(t, manager) asgs := provider.NodeGroups() - service.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String("test-instance-id"), ShouldDecrementDesiredCapacity: aws.Bool(true), }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ @@ -537,7 +497,7 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) { }) // Look up the current number of instances... - service.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -562,8 +522,9 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) { } func TestGetResourceLimiter(t *testing.T) { - service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service, nil) + mockAutoScaling := &autoScalingMock{} + mockEC2 := &ec2Mock{} + m := newTestAwsManagerWithMockServices(mockAutoScaling, mockEC2, nil) provider := testProvider(t, m) _, err := provider.GetResourceLimiter() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 1fb95e8aa279..99dbb3dc5130 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -48,7 +48,7 @@ import ( const ( operationWaitTimeout = 5 * time.Second operationPollInterval = 100 * time.Millisecond - maxRecordsReturnedByAPI = 100 + maxRecordsReturnedByAPI = 50 maxAsgNamesPerDescribe = 50 refreshInterval = 1 * time.Minute autoDiscovererTypeASG = "asg" @@ -57,11 +57,10 @@ const ( // AwsManager is handles aws communication and data caching. type AwsManager struct { - autoScalingService autoScalingWrapper - ec2Service ec2Wrapper - asgCache *asgCache - lastRefresh time.Time - instanceTypes map[string]*InstanceType + awsService awsWrapper + asgCache *asgCache + lastRefresh time.Time + instanceTypes map[string]*InstanceType } type asgTemplate struct { @@ -162,7 +161,7 @@ func getRegion(cfg ...*aws.Config) string { return region } -// createAwsManagerInternal allows for a customer autoScalingWrapper to be passed in by tests +// createAwsManagerInternal allows for custom objects to be passed in by tests // // #1449 If running tests outside of AWS without AWS_REGION among environment // variables, avoid a 5+ second EC2 Metadata lookup timeout in getRegion by @@ -173,8 +172,7 @@ func getRegion(cfg ...*aws.Config) string { func createAWSManagerInternal( configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, - autoScalingService *autoScalingWrapper, - ec2Service *ec2Wrapper, + awsService *awsWrapper, instanceTypes map[string]*InstanceType, ) (*AwsManager, error) { @@ -189,7 +187,7 @@ func createAWSManagerInternal( return nil, err } - if autoScalingService == nil || ec2Service == nil { + if awsService == nil { awsSdkProvider := newAWSSDKProvider(cfg) sess, err := session.NewSession(aws.NewConfig().WithRegion(getRegion()). WithEndpointResolver(getResolver(awsSdkProvider.cfg))) @@ -197,14 +195,7 @@ func createAWSManagerInternal( return nil, err } - if autoScalingService == nil { - c := newLaunchConfigurationInstanceTypeCache() - autoScalingService = &autoScalingWrapper{autoscaling.New(sess), c} - } - - if ec2Service == nil { - ec2Service = &ec2Wrapper{ec2.New(sess)} - } + awsService = &awsWrapper{autoscaling.New(sess), ec2.New(sess)} } specs, err := parseASGAutoDiscoverySpecs(discoveryOpts) @@ -212,16 +203,15 @@ func createAWSManagerInternal( return nil, err } - cache, err := newASGCache(*autoScalingService, discoveryOpts.NodeGroupSpecs, specs) + cache, err := newASGCache(awsService, discoveryOpts.NodeGroupSpecs, specs) if err != nil { return nil, err } manager := &AwsManager{ - autoScalingService: *autoScalingService, - ec2Service: *ec2Service, - asgCache: cache, - instanceTypes: instanceTypes, + awsService: *awsService, + asgCache: cache, + instanceTypes: instanceTypes, } if err := manager.forceRefresh(); err != nil { @@ -248,7 +238,7 @@ func readAWSCloudConfig(config io.Reader) (*provider_aws.CloudConfig, error) { // CreateAwsManager constructs awsManager object. func CreateAwsManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, instanceTypes map[string]*InstanceType) (*AwsManager, error) { - return createAWSManagerInternal(configReader, discoveryOpts, nil, nil, instanceTypes) + return createAWSManagerInternal(configReader, discoveryOpts, nil, instanceTypes) } // Refresh is called before every main loop and can be used to dynamically update cloud provider state. @@ -315,7 +305,7 @@ func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { klog.Warningf("Found multiple availability zones for ASG %q; using %s\n", asg.Name, az) } - instanceTypeName, err := m.buildInstanceType(asg) + instanceTypeName, err := getInstanceTypeForAsg(m.asgCache, asg) if err != nil { return nil, err } @@ -331,23 +321,6 @@ func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) { return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName) } -func (m *AwsManager) buildInstanceType(asg *asg) (string, error) { - if asg.LaunchConfigurationName != "" { - return m.autoScalingService.getInstanceTypeByLCName(asg.LaunchConfigurationName) - } else if asg.LaunchTemplate != nil { - return m.ec2Service.getInstanceTypeByLT(asg.LaunchTemplate) - } else if asg.MixedInstancesPolicy != nil { - // always use first instance - if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 { - return asg.MixedInstancesPolicy.instanceTypesOverrides[0], nil - } - - return m.ec2Service.getInstanceTypeByLT(asg.MixedInstancesPolicy.launchTemplate) - } - - return "", errors.New("Unable to get instance type from launch config or launch template") -} - func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) { node := apiv1.Node{} nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63()) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 86e5d0ff2c37..0c6d03d11f03 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -260,8 +260,8 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { func TestFetchExplicitAsgs(t *testing.T) { min, max, groupname := 1, 10, "coolasg" - s := &AutoScalingMock{} - s.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ + a := &autoScalingMock{} + a.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: []*string{aws.String(groupname)}, MaxRecords: aws.Int64(1), }).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ @@ -270,7 +270,7 @@ func TestFetchExplicitAsgs(t *testing.T) { }, }) - s.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{groupname}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -304,7 +304,7 @@ func TestFetchExplicitAsgs(t *testing.T) { defer resetAWSRegion(os.LookupEnv("AWS_REGION")) os.Setenv("AWS_REGION", "fanghorn") instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil, instanceTypes) + m, err := createAWSManagerInternal(nil, do, &awsWrapper{a, nil}, instanceTypes) assert.NoError(t, err) asgs := m.asgCache.Get() @@ -312,104 +312,9 @@ func TestFetchExplicitAsgs(t *testing.T) { validateAsg(t, asgs[0], groupname, min, max) } -func TestBuildInstanceType(t *testing.T) { - ltName, ltVersion, instanceType := "launcher", "1", "t2.large" - - s := &EC2Mock{} - s.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ - LaunchTemplateName: aws.String(ltName), - Versions: []*string{aws.String(ltVersion)}, - }).Return(&ec2.DescribeLaunchTemplateVersionsOutput{ - LaunchTemplateVersions: []*ec2.LaunchTemplateVersion{ - { - LaunchTemplateData: &ec2.ResponseLaunchTemplateData{ - InstanceType: aws.String(instanceType), - }, - }, - }, - }) - - // #1449 Without AWS_REGION getRegion() lookup runs till timeout during tests. - defer resetAWSRegion(os.LookupEnv("AWS_REGION")) - os.Setenv("AWS_REGION", "fanghorn") - instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, nil, &ec2Wrapper{s}, instanceTypes) - assert.NoError(t, err) - - asg := asg{ - LaunchTemplate: &launchTemplate{name: ltName, version: ltVersion}, - } - - builtInstanceType, err := m.buildInstanceType(&asg) - - assert.NoError(t, err) - assert.Equal(t, instanceType, builtInstanceType) -} - -func TestBuildInstanceTypeMixedInstancePolicyOverride(t *testing.T) { - ltName, ltVersion, instanceType := "launcher", "1", "t2.large" - instanceTypeOverrides := []string{} - - s := &EC2Mock{} - s.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ - LaunchTemplateName: aws.String(ltName), - Versions: []*string{aws.String(ltVersion)}, - }).Return(&ec2.DescribeLaunchTemplateVersionsOutput{ - LaunchTemplateVersions: []*ec2.LaunchTemplateVersion{ - { - LaunchTemplateData: &ec2.ResponseLaunchTemplateData{ - InstanceType: aws.String(instanceType), - }, - }, - }, - }) - - defer resetAWSRegion(os.LookupEnv("AWS_REGION")) - os.Setenv("AWS_REGION", "fanghorn") - instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, nil, &ec2Wrapper{s}, instanceTypes) - assert.NoError(t, err) - - lt := &launchTemplate{name: ltName, version: ltVersion} - asg := asg{ - MixedInstancesPolicy: &mixedInstancesPolicy{ - launchTemplate: lt, - instanceTypesOverrides: instanceTypeOverrides, - }, - } - - builtInstanceType, err := m.buildInstanceType(&asg) - - assert.NoError(t, err) - assert.Equal(t, instanceType, builtInstanceType) -} - -func TestBuildInstanceTypeMixedInstancePolicyNoOverride(t *testing.T) { - ltName, ltVersion := "launcher", "1" - instanceTypeOverrides := []string{"m4.xlarge", "m5.xlarge"} - - defer resetAWSRegion(os.LookupEnv("AWS_REGION")) - os.Setenv("AWS_REGION", "fanghorn") - instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, nil, &ec2Wrapper{}, instanceTypes) - assert.NoError(t, err) - - lt := &launchTemplate{name: ltName, version: ltVersion} - asg := asg{ - MixedInstancesPolicy: &mixedInstancesPolicy{ - launchTemplate: lt, - instanceTypesOverrides: instanceTypeOverrides, - }, - } - - builtInstanceType, err := m.buildInstanceType(&asg) - - assert.NoError(t, err) - assert.Equal(t, instanceTypeOverrides[0], builtInstanceType) -} - func TestGetASGTemplate(t *testing.T) { const ( + asgName = "sample" knownInstanceType = "t3.micro" region = "us-east-1" az = region + "a" @@ -417,6 +322,8 @@ func TestGetASGTemplate(t *testing.T) { ltVersion = "1" ) + asgRef := AwsRef{Name: asgName} + tags := []*autoscaling.TagDescription{ { Key: aws.String("k8s.io/cluster-autoscaler/node-template/taint/dedicated"), @@ -442,8 +349,8 @@ func TestGetASGTemplate(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { - s := &EC2Mock{} - s.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ + e := &ec2Mock{} + e.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ LaunchTemplateName: aws.String(ltName), Versions: []*string{aws.String(ltVersion)}, }).Return(&ec2.DescribeLaunchTemplateVersionsOutput{ @@ -460,11 +367,18 @@ func TestGetASGTemplate(t *testing.T) { defer resetAWSRegion(os.LookupEnv("AWS_REGION")) os.Setenv("AWS_REGION", "fanghorn") instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, nil, &ec2Wrapper{s}, instanceTypes) + do := cloudprovider.NodeGroupDiscoveryOptions{} + + m, err := createAWSManagerInternal(nil, do, &awsWrapper{nil, e}, instanceTypes) + origGetInstanceTypeFunc := getInstanceTypeForAsg + defer func() { getInstanceTypeForAsg = origGetInstanceTypeFunc }() + getInstanceTypeForAsg = func(m *asgCache, asg *asg) (string, error) { + return test.instanceType, nil + } assert.NoError(t, err) asg := &asg{ - AwsRef: AwsRef{Name: "sample"}, + AwsRef: asgRef, AvailabilityZones: test.availabilityZones, LaunchTemplate: &launchTemplate{ name: ltName, @@ -492,7 +406,7 @@ func TestFetchAutoAsgs(t *testing.T) { min, max := 1, 10 groupname, tags := "coolasg", []string{"tag", "anothertag"} - s := &AutoScalingMock{} + a := &autoScalingMock{} // Lookup groups associated with tags expectedTagsInput := &autoscaling.DescribeTagsInput{ Filters: []*autoscaling.Filter{ @@ -502,7 +416,7 @@ func TestFetchAutoAsgs(t *testing.T) { MaxRecords: aws.Int64(maxRecordsReturnedByAPI), } // Use MatchedBy pattern to avoid list order issue https://github.com/kubernetes/autoscaler/issues/1346 - s.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), + a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), mock.AnythingOfType("func(*autoscaling.DescribeTagsOutput, bool) bool"), ).Run(func(args mock.Arguments) { fn := args.Get(1).(func(*autoscaling.DescribeTagsOutput, bool) bool) @@ -515,7 +429,7 @@ func TestFetchAutoAsgs(t *testing.T) { // Describe the group to register it, then again to generate the instance // cache. - s.On("DescribeAutoScalingGroupsPages", + a.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ AutoScalingGroupNames: aws.StringSlice([]string{groupname}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), @@ -543,7 +457,7 @@ func TestFetchAutoAsgs(t *testing.T) { os.Setenv("AWS_REGION", "fanghorn") // fetchAutoASGs is called at manager creation time, via forceRefresh instanceTypes, _ := GetStaticEC2InstanceTypes() - m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil, instanceTypes) + m, err := createAWSManagerInternal(nil, do, &awsWrapper{a, nil}, instanceTypes) assert.NoError(t, err) asgs := m.asgCache.Get() @@ -551,7 +465,7 @@ func TestFetchAutoAsgs(t *testing.T) { validateAsg(t, asgs[0], groupname, min, max) // Simulate the previously discovered ASG disappearing - s.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), + a.On("DescribeTagsPages", mock.MatchedBy(tagsMatcher(expectedTagsInput)), mock.AnythingOfType("func(*autoscaling.DescribeTagsOutput, bool) bool"), ).Run(func(args mock.Arguments) { fn := args.Get(1).(func(*autoscaling.DescribeTagsOutput, bool) bool) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go new file mode 100644 index 000000000000..10bd1401c6a0 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -0,0 +1,264 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "k8s.io/klog" +) + +// autoScalingI is the interface abstracting specific API calls of the auto-scaling service provided by AWS SDK for use in CA +type autoScalingI interface { + DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error + DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) + DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error + SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) + TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) +} + +// ec2I is the interface abstracting specific API calls of the EC2 service provided by AWS SDK for use in CA +type ec2I interface { + DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) +} + +// awsWrapper provides several utility methods over the services provided by the AWS SDK +type awsWrapper struct { + autoScalingI + ec2I +} + +func (m *awsWrapper) getInstanceTypeByLaunchConfigNames(launchConfigToQuery []*string) (map[string]string, error) { + launchConfigurationsToInstanceType := map[string]string{} + + for i := 0; i < len(launchConfigToQuery); i += 50 { + end := i + 50 + + if end > len(launchConfigToQuery) { + end = len(launchConfigToQuery) + } + params := &autoscaling.DescribeLaunchConfigurationsInput{ + LaunchConfigurationNames: launchConfigToQuery[i:end], + MaxRecords: aws.Int64(50), + } + r, err := m.DescribeLaunchConfigurations(params) + if err != nil { + return nil, err + } + for _, lc := range r.LaunchConfigurations { + launchConfigurationsToInstanceType[*lc.LaunchConfigurationName] = *lc.InstanceType + } + } + return launchConfigurationsToInstanceType, nil +} + +func (m *awsWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) { + if len(names) == 0 { + return nil, nil + } + + asgs := make([]*autoscaling.Group, 0) + + // AWS only accepts up to 50 ASG names as input, describe them in batches + for i := 0; i < len(names); i += maxAsgNamesPerDescribe { + end := i + maxAsgNamesPerDescribe + + if end > len(names) { + end = len(names) + } + + input := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(names[i:end]), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + } + err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { + asgs = append(asgs, output.AutoScalingGroups...) + // We return true while we want to be called with the next page of + // results, if any. + return true + }) + if err != nil { + return nil, err + } + } + + return asgs, nil +} + +func (m *awsWrapper) getAutoscalingGroupNamesByTags(kvs map[string]string) ([]string, error) { + // DescribeTags does an OR query when multiple filters on different tags are + // specified. In other words, DescribeTags returns [asg1, asg1] for keys + // [t1, t2] when there's only one asg tagged both t1 and t2. + filters := []*autoscaling.Filter{} + for key, value := range kvs { + filter := &autoscaling.Filter{ + Name: aws.String("key"), + Values: []*string{aws.String(key)}, + } + filters = append(filters, filter) + if value != "" { + filters = append(filters, &autoscaling.Filter{ + Name: aws.String("value"), + Values: []*string{aws.String(value)}, + }) + } + } + + tags := []*autoscaling.TagDescription{} + input := &autoscaling.DescribeTagsInput{ + Filters: filters, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + } + err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool { + tags = append(tags, out.Tags...) + // We return true while we want to be called with the next page of + // results, if any. + return true + }) + + if err != nil { + return nil, err + } + + // According to how DescribeTags API works, the result contains ASGs which + // not all but only subset of tags are associated. Explicitly select ASGs to + // which all the tags are associated so that we won't end up calling + // DescribeAutoScalingGroups API multiple times on an ASG. + asgNames := []string{} + asgNameOccurrences := make(map[string]int) + for _, t := range tags { + asgName := aws.StringValue(t.ResourceId) + occurrences := asgNameOccurrences[asgName] + 1 + if occurrences >= len(kvs) { + asgNames = append(asgNames, asgName) + } + asgNameOccurrences[asgName] = occurrences + } + + return asgNames, nil +} + +func (m *awsWrapper) getInstanceTypeByLaunchTemplate(launchTemplate *launchTemplate) (string, error) { + params := &ec2.DescribeLaunchTemplateVersionsInput{ + LaunchTemplateName: aws.String(launchTemplate.name), + Versions: []*string{aws.String(launchTemplate.version)}, + } + + describeData, err := m.DescribeLaunchTemplateVersions(params) + if err != nil { + return "", err + } + + if len(describeData.LaunchTemplateVersions) == 0 { + return "", fmt.Errorf("unable to find template versions") + } + + lt := describeData.LaunchTemplateVersions[0] + instanceType := lt.LaunchTemplateData.InstanceType + + if instanceType == nil { + return "", fmt.Errorf("unable to find instance type within launch template") + } + + return aws.StringValue(instanceType), nil +} + +func (m *awsWrapper) getInstanceTypesForAsgs(asgs []*asg) (map[string]string, error) { + results := map[string]string{} + launchConfigsToQuery := map[string]string{} + launchTemplatesToQuery := map[string]*launchTemplate{} + + for _, asg := range asgs { + name := asg.AwsRef.Name + if asg.LaunchConfigurationName != "" { + launchConfigsToQuery[name] = asg.LaunchConfigurationName + } else if asg.LaunchTemplate != nil { + launchTemplatesToQuery[name] = asg.LaunchTemplate + } else if asg.MixedInstancesPolicy != nil { + if len(asg.MixedInstancesPolicy.instanceTypesOverrides) > 0 { + results[name] = asg.MixedInstancesPolicy.instanceTypesOverrides[0] + } else { + launchTemplatesToQuery[name] = asg.MixedInstancesPolicy.launchTemplate + } + } + } + + klog.V(4).Infof("%d launch configurations to query", len(launchConfigsToQuery)) + klog.V(4).Infof("%d launch templates to query", len(launchTemplatesToQuery)) + + // Query these all at once to minimize AWS API calls + launchConfigNames := make([]*string, 0, len(launchConfigsToQuery)) + for _, cfgName := range launchConfigsToQuery { + launchConfigNames = append(launchConfigNames, aws.String(cfgName)) + } + launchConfigs, err := m.getInstanceTypeByLaunchConfigNames(launchConfigNames) + if err != nil { + klog.Errorf("Failed to query %d launch configurations", len(launchConfigsToQuery)) + return nil, err + } + + for asgName, cfgName := range launchConfigsToQuery { + results[asgName] = launchConfigs[cfgName] + } + klog.V(4).Infof("Successfully queried %d launch configurations", len(launchConfigsToQuery)) + + // Have to query LaunchTemplates one-at-a-time, since there's no way to query pairs in bulk + for asgName, lt := range launchTemplatesToQuery { + instanceType, err := m.getInstanceTypeByLaunchTemplate(lt) + if err != nil { + klog.Errorf("Failed to query launch tempate %s: %v", lt.name, err) + continue + } + results[asgName] = instanceType + } + klog.V(4).Infof("Successfully queried %d launch templates", len(launchTemplatesToQuery)) + + return results, nil +} + +func buildLaunchTemplateFromSpec(ltSpec *autoscaling.LaunchTemplateSpecification) *launchTemplate { + // NOTE(jaypipes): The LaunchTemplateSpecification.Version is a pointer to + // string. When the pointer is nil, EC2 AutoScaling API considers the value + // to be "$Default", however aws.StringValue(ltSpec.Version) will return an + // empty string (which is not considered the same as "$Default" or a nil + // string pointer. So, in order to not pass an empty string as the version + // for the launch template when we communicate with the EC2 AutoScaling API + // using the information in the launchTemplate, we store the string + // "$Default" here when the ltSpec.Version is a nil pointer. + // + // See: + // + // https://github.com/kubernetes/autoscaler/issues/1728 + // https://github.com/aws/aws-sdk-go/blob/81fad3b797f4a9bd1b452a5733dd465eefef1060/service/autoscaling/api.go#L10666-L10671 + // + // A cleaner alternative might be to make launchTemplate.version a string + // pointer instead of a string, or even store the aws-sdk-go's + // LaunchTemplateSpecification structs directly. + var version string + if ltSpec.Version == nil { + version = "$Default" + } else { + version = aws.StringValue(ltSpec.Version) + } + return &launchTemplate{ + name: aws.StringValue(ltSpec.LaunchTemplateName), + version: version, + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go new file mode 100644 index 000000000000..7ec188fa4f8a --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go @@ -0,0 +1,269 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "fmt" + "os" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type autoScalingMock struct { + mock.Mock +} + +func (a *autoScalingMock) DescribeAutoScalingGroupsPages(i *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error { + args := a.Called(i, fn) + return args.Error(0) +} + +func (a *autoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) { + args := a.Called(i) + return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil +} + +func (a *autoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { + args := a.Called(i, fn) + return args.Error(0) +} + +func (a *autoScalingMock) SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) { + args := a.Called(input) + return args.Get(0).(*autoscaling.SetDesiredCapacityOutput), nil +} + +func (a *autoScalingMock) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) { + args := a.Called(input) + return args.Get(0).(*autoscaling.TerminateInstanceInAutoScalingGroupOutput), nil +} + +type ec2Mock struct { + mock.Mock +} + +func (e *ec2Mock) DescribeLaunchTemplateVersions(i *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) { + args := e.Called(i) + return args.Get(0).(*ec2.DescribeLaunchTemplateVersionsOutput), nil +} + +var testAwsService = awsWrapper{&autoScalingMock{}, &ec2Mock{}} + +func TestMoreThen50Groups(t *testing.T) { + a := &autoScalingMock{} + awsWrapper := &awsWrapper{ + autoScalingI: a, + ec2I: nil, + } + + // Generate 51 ASG names + names := make([]string, 51) + for i := 0; i < len(names); i++ { + names[i] = fmt.Sprintf("asg-%d", i) + } + + // First batch, first 50 elements + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(names[:50]), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("asg-1", 1, "test-instance-id"), false) + }).Return(nil) + + // Second batch, element 51 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"asg-50"}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("asg-2", 1, "test-instance-id"), false) + }).Return(nil) + + asgs, err := awsWrapper.getAutoscalingGroupsByNames(names) + assert.Nil(t, err) + assert.Equal(t, len(asgs), 2) + assert.Equal(t, *asgs[0].AutoScalingGroupName, "asg-1") + assert.Equal(t, *asgs[1].AutoScalingGroupName, "asg-2") +} + +func TestGetInstanceTypesForAsgs(t *testing.T) { + asgName, ltName, ltVersion, instanceType := "testasg", "launcher", "1", "t2.large" + ltSpec := launchTemplate{ + name: ltName, + version: ltVersion, + } + + a := &autoScalingMock{} + e := &ec2Mock{} + a.On("DescribeLaunchConfigurations", &autoscaling.DescribeLaunchConfigurationsInput{ + LaunchConfigurationNames: []*string{aws.String(ltName)}, + MaxRecords: aws.Int64(50), + }).Return(&autoscaling.DescribeLaunchConfigurationsOutput{ + LaunchConfigurations: []*autoscaling.LaunchConfiguration{ + { + LaunchConfigurationName: aws.String(ltName), + InstanceType: aws.String(instanceType), + }, + }, + }) + e.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ + LaunchTemplateName: aws.String(ltName), + Versions: []*string{aws.String(ltVersion)}, + }).Return(&ec2.DescribeLaunchTemplateVersionsOutput{ + LaunchTemplateVersions: []*ec2.LaunchTemplateVersion{ + { + LaunchTemplateData: &ec2.ResponseLaunchTemplateData{ + InstanceType: aws.String(instanceType), + }, + }, + }, + }) + + // #1449 Without AWS_REGION getRegion() lookup runs till timeout during tests. + defer resetAWSRegion(os.LookupEnv("AWS_REGION")) + os.Setenv("AWS_REGION", "fanghorn") + + awsWrapper := &awsWrapper{ + autoScalingI: a, + ec2I: e, + } + + cases := []struct { + name string + launchConfigurationName string + launchTemplate *launchTemplate + mixedInstancesPolicy *mixedInstancesPolicy + }{ + { + "AsgWithLaunchConfiguration", + ltName, + nil, + nil, + }, + { + "AsgWithLaunchTemplate", + "", + <Spec, + nil, + }, + { + "AsgWithLaunchTemplateMixedInstancePolicyOverride", + "", + nil, + &mixedInstancesPolicy{ + instanceTypesOverrides: []string{instanceType}, + }, + }, + { + "AsgWithLaunchTemplateMixedInstancePolicyNoOverride", + "", + nil, + &mixedInstancesPolicy{ + launchTemplate: <Spec, + }, + }, + } + + for _, tc := range cases { + results, err := awsWrapper.getInstanceTypesForAsgs([]*asg{ + { + AwsRef: AwsRef{Name: asgName}, + LaunchConfigurationName: tc.launchConfigurationName, + LaunchTemplate: tc.launchTemplate, + MixedInstancesPolicy: tc.mixedInstancesPolicy, + }, + }) + assert.NoError(t, err) + + foundInstanceType, exists := results[asgName] + assert.NoErrorf(t, err, "%s had error %v", tc.name, err) + assert.Truef(t, exists, "%s did not find asg", tc.name) + assert.Equalf(t, foundInstanceType, instanceType, "%s had %s, expected %s", tc.name, foundInstanceType, instanceType) + } +} + +func TestBuildLaunchTemplateFromSpec(t *testing.T) { + assert := assert.New(t) + + units := []struct { + name string + in *autoscaling.LaunchTemplateSpecification + exp *launchTemplate + }{ + { + name: "non-default, specified version", + in: &autoscaling.LaunchTemplateSpecification{ + LaunchTemplateName: aws.String("foo"), + Version: aws.String("1"), + }, + exp: &launchTemplate{ + name: "foo", + version: "1", + }, + }, + { + name: "non-default, specified $Latest", + in: &autoscaling.LaunchTemplateSpecification{ + LaunchTemplateName: aws.String("foo"), + Version: aws.String("$Latest"), + }, + exp: &launchTemplate{ + name: "foo", + version: "$Latest", + }, + }, + { + name: "specified $Default", + in: &autoscaling.LaunchTemplateSpecification{ + LaunchTemplateName: aws.String("foo"), + Version: aws.String("$Default"), + }, + exp: &launchTemplate{ + name: "foo", + version: "$Default", + }, + }, + { + name: "no version specified", + in: &autoscaling.LaunchTemplateSpecification{ + LaunchTemplateName: aws.String("foo"), + Version: nil, + }, + exp: &launchTemplate{ + name: "foo", + version: "$Default", + }, + }, + } + + for _, unit := range units { + got := buildLaunchTemplateFromSpec(unit.in) + assert.Equal(unit.exp, got) + } +} diff --git a/cluster-autoscaler/cloudprovider/aws/ec2.go b/cluster-autoscaler/cloudprovider/aws/ec2.go deleted file mode 100644 index dee33760ee1a..000000000000 --- a/cluster-autoscaler/cloudprovider/aws/ec2.go +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package aws - -import ( - "fmt" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/ec2" -) - -type ec2I interface { - DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) -} - -type ec2Wrapper struct { - ec2I -} - -func (m ec2Wrapper) getInstanceTypeByLT(launchTemplate *launchTemplate) (string, error) { - params := &ec2.DescribeLaunchTemplateVersionsInput{ - LaunchTemplateName: aws.String(launchTemplate.name), - Versions: []*string{aws.String(launchTemplate.version)}, - } - - describeData, err := m.DescribeLaunchTemplateVersions(params) - if err != nil { - return "", err - } - - if len(describeData.LaunchTemplateVersions) == 0 { - return "", fmt.Errorf("unable to find template versions") - } - - lt := describeData.LaunchTemplateVersions[0] - instanceType := lt.LaunchTemplateData.InstanceType - - if instanceType == nil { - return "", fmt.Errorf("unable to find instance type within launch template") - } - - return aws.StringValue(instanceType), nil -} diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go new file mode 100644 index 000000000000..02d71823b285 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache.go @@ -0,0 +1,128 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/client-go/tools/cache" +) + +const ( + asgInstanceTypeCacheTTL = time.Minute * 20 + cacheMinTTL = 120 + cacheMaxTTL = 600 +) + +// instanceTypeExpirationStore caches the canonical instance type for an ASG. +// The store expires its keys based on a TTL. This TTL can have a jitter applied to it. +// This allows to get a better repartition of the AWS queries. +type instanceTypeExpirationStore struct { + cache.Store + jitterClock clock.Clock + awsService *awsWrapper +} + +type instanceTypeCachedObject struct { + name string + instanceType string +} + +type jitterClock struct { + clock.Clock + + jitter bool + sync.RWMutex +} + +func newAsgInstanceTypeCache(awsService *awsWrapper) *instanceTypeExpirationStore { + jc := &jitterClock{} + return newAsgInstanceTypeCacheWithClock( + awsService, + jc, + cache.NewExpirationStore(func(obj interface{}) (s string, e error) { + return obj.(instanceTypeCachedObject).name, nil + }, &cache.TTLPolicy{ + TTL: asgInstanceTypeCacheTTL, + Clock: jc, + }), + ) +} + +func newAsgInstanceTypeCacheWithClock(awsService *awsWrapper, jc clock.Clock, store cache.Store) *instanceTypeExpirationStore { + return &instanceTypeExpirationStore{ + store, + jc, + awsService, + } +} + +func (c *jitterClock) Since(ts time.Time) time.Duration { + since := time.Since(ts) + c.RLock() + defer c.RUnlock() + if c.jitter { + return since + (time.Second * time.Duration(rand.IntnRange(cacheMinTTL, cacheMaxTTL))) + } + return since +} + +func (es instanceTypeExpirationStore) populate(autoscalingGroups []*asg) error { + asgsToQuery := []*asg{} + + if c, ok := es.jitterClock.(*jitterClock); ok { + c.Lock() + c.jitter = true + c.Unlock() + } + + for _, asg := range autoscalingGroups { + if asg == nil { + continue + } + _, found, _ := es.GetByKey(asg.AwsRef.Name) + if found { + continue + } + asgsToQuery = append(asgsToQuery, asg) + } + + if c, ok := es.jitterClock.(*jitterClock); ok { + c.Lock() + c.jitter = false + c.Unlock() + } + + // List expires old entries + _ = es.List() + + instanceTypesByAsg, err := es.awsService.getInstanceTypesForAsgs(asgsToQuery) + if err != nil { + return err + } + + for asgName, instanceType := range instanceTypesByAsg { + es.Add(instanceTypeCachedObject{ + name: asgName, + instanceType: instanceType, + }) + } + return nil +} diff --git a/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go new file mode 100644 index 000000000000..45fcc844a9f4 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/instance_type_cache_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/client-go/tools/cache" +) + +func TestInstanceTypeCache(t *testing.T) { + c := newAsgInstanceTypeCache(nil) + err := c.Add(instanceTypeCachedObject{ + name: "123", + instanceType: "t2.medium", + }) + require.NoError(t, err) + obj, ok, err := c.GetByKey("123") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, "t2.medium", obj.(instanceTypeCachedObject).instanceType) +} + +func TestLTVersionChange(t *testing.T) { + asgName, ltName := "testasg", "launcher" + ltVersions := []*string{aws.String("1"), aws.String("2")} + instanceTypes := []*string{aws.String("t2.large"), aws.String("m4.xlarge")} + + a := &autoScalingMock{} + e := &ec2Mock{} + + for i := 0; i < 2; i++ { + e.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{ + LaunchTemplateName: aws.String(ltName), + Versions: []*string{ltVersions[i]}, + }).Return(&ec2.DescribeLaunchTemplateVersionsOutput{ + LaunchTemplateVersions: []*ec2.LaunchTemplateVersion{ + { + LaunchTemplateData: &ec2.ResponseLaunchTemplateData{ + InstanceType: instanceTypes[i], + }, + }, + }, + }) + } + + fakeClock := clock.NewFakeClock(time.Unix(0, 0)) + fakeStore := cache.NewFakeExpirationStore( + func(obj interface{}) (s string, e error) { + return obj.(instanceTypeCachedObject).name, nil + }, + nil, + &cache.TTLPolicy{ + TTL: asgInstanceTypeCacheTTL, + Clock: fakeClock, + }, + fakeClock, + ) + m := newAsgInstanceTypeCacheWithClock(&awsWrapper{a, e}, fakeClock, fakeStore) + + for i := 0; i < 2; i++ { + err := m.populate([]*asg{ + { + AwsRef: AwsRef{Name: asgName}, + LaunchTemplate: &launchTemplate{ + name: ltName, + version: aws.StringValue(ltVersions[i]), + }, + }, + }) + assert.NoError(t, err) + + result, found, err := m.GetByKey(asgName) + assert.NoError(t, err) + assert.Truef(t, found, "%s did not find asg (iteration %d)", asgName, i) + + foundInstanceType := result.(instanceTypeCachedObject).instanceType + assert.Equalf(t, foundInstanceType, *instanceTypes[i], "%s had %s, expected %s (iteration %d)", asgName, foundInstanceType, *instanceTypes[i], i) + + // Expire the first instance + fakeClock.SetTime(time.Now().Add(asgInstanceTypeCacheTTL + 10*time.Minute)) + } +}