From 3ca0c84f1519e6f22b8cd1926ad742eac90a85bd Mon Sep 17 00:00:00 2001 From: Nic Cope Date: Thu, 16 Nov 2017 18:12:06 -0800 Subject: [PATCH] Replace the Polling Autoscaler Node group discovery is now handled by cloudprovider.Refresh() in all cases. Additionally, explicit node groups can now be used alongside autodiscovery. --- .../cloudprovider/aws/auto_scaling.go | 123 +++----- .../cloudprovider/aws/auto_scaling_groups.go | 153 +++++++--- .../cloudprovider/aws/aws_cloud_provider.go | 138 ++------- .../aws/aws_cloud_provider_test.go | 271 +++++++----------- .../cloudprovider/aws/aws_manager.go | 204 +++++++++++-- .../cloudprovider/aws/aws_manager_test.go | 156 +++++++++- .../builder/cloud_provider_builder.go | 25 +- .../cloudprovider/gce/gce_cloud_provider.go | 146 +--------- .../gce/gce_cloud_provider_test.go | 147 +--------- .../cloudprovider/gce/gce_manager.go | 204 ++++++++++--- .../cloudprovider/gce/gce_manager_test.go | 252 +++++++++++----- .../node_group_discovery_options.go | 159 +++++++++- .../node_group_discovery_options_test.go | 174 +++++++++-- cluster-autoscaler/cloudprovider/util_test.go | 3 + .../config/dynamic/node_group_spec.go | 10 +- cluster-autoscaler/main.go | 2 +- 16 files changed, 1274 insertions(+), 893 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling.go index 6f9e859d7a6d..fa88f2d3c2b9 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling.go @@ -17,7 +17,6 @@ limitations under the License. package aws import ( - "errors" "fmt" "github.com/aws/aws-sdk-go/aws" @@ -28,8 +27,9 @@ import ( // autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA type autoScaling interface { DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) + DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) - DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error) + DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) } @@ -73,55 +73,29 @@ func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling } func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) { - glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names) - - if len(names) < 1 { - glog.V(4).Info("Failed to describe ASGs: Must specify at least one ASG name.") - return nil, fmt.Errorf("List of ASG names was empty. Must specify at least one ASG name") - } - - nameRefs := []*string{} - for _, n := range names { - nameRefs = append(nameRefs, aws.String(n)) + if len(names) == 0 { + return nil, nil } - params := &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: nameRefs, + input := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(names), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), } - description, err := m.DescribeAutoScalingGroups(params) - if err != nil { - glog.V(4).Infof("Failed to describe ASGs : %v", err) + asgs := make([]*autoscaling.Group, 0) + if err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { + asgs = append(asgs, output.AutoScalingGroups...) + // We return true while we want to be called with the next page of + // results, if any. + return true + }); err != nil { return nil, err } - if len(description.AutoScalingGroups) < 1 { - return nil, errors.New("No ASGs found") - } - - asgs := description.AutoScalingGroups - for description.NextToken != nil { - description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{ - NextToken: description.NextToken, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASGs : %v", err) - return nil, err - } - asgs = append(asgs, description.AutoScalingGroups...) - } - - glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs) - return asgs, nil } func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) { - glog.V(6).Infof("Starting getAutoscalingGroupsByTag with keys=%v", keys) - - numKeys := len(keys) - - // DescribeTags does an OR query when multiple filters on different tags are specified. - // In other words, DescribeTags returns [asg1, asg1] for keys [t1, t2] when there's only one asg tagged both t1 and t2. + // DescribeTags does an OR query when multiple filters on different tags are + // specified. In other words, DescribeTags returns [asg1, asg1] for keys + // [t1, t2] when there's only one asg tagged both t1 and t2. filters := []*autoscaling.Filter{} for _, key := range keys { filter := &autoscaling.Filter{ @@ -130,58 +104,35 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autos } filters = append(filters, filter) } - description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{ + + tags := []*autoscaling.TagDescription{} + input := &autoscaling.DescribeTagsInput{ Filters: filters, MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASG tags for keys %v : %v", keys, err) - return nil, err - } - if len(description.Tags) < 1 { - return nil, fmt.Errorf("Unable to find ASGs for tag keys %v", keys) } - tags := []*autoscaling.TagDescription{} - tags = append(tags, description.Tags...) - - for description.NextToken != nil { - description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{ - NextToken: description.NextToken, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASG tags for key %v: %v", keys, err) - return nil, err - } - tags = append(tags, description.Tags...) + if err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool { + tags = append(tags, out.Tags...) + // We return true while we want to be called with the next page of + // results, if any. + return true + }); err != nil { + return nil, err } - // De-duplicate asg names - asgNameOccurrences := map[string]int{} - for _, t := range tags { - asgName := *(t.ResourceId) - if n, ok := asgNameOccurrences[asgName]; ok { - asgNameOccurrences[asgName] = n + 1 - } else { - asgNameOccurrences[asgName] = 1 - } - } - // Accordingly to how DescribeTags API works, the result contains ASGs which not all but only subset of tags are associated. - // Explicitly select ASGs to which all the tags are associated so that we won't end up calling DescribeAutoScalingGroups API - // multiple times on an ASG + // According to how DescribeTags API works, the result contains ASGs which + // not all but only subset of tags are associated. Explicitly select ASGs to + // which all the tags are associated so that we won't end up calling + // DescribeAutoScalingGroups API multiple times on an ASG. asgNames := []string{} - for asgName, n := range asgNameOccurrences { - if n == numKeys { + asgNameOccurrences := make(map[string]int) + for _, t := range tags { + asgName := aws.StringValue(t.ResourceId) + occurrences := asgNameOccurrences[asgName] + 1 + if occurrences >= len(keys) { asgNames = append(asgNames, asgName) } + asgNameOccurrences[asgName] = occurrences } - asgs, err := m.getAutoscalingGroupsByNames(asgNames) - if err != nil { - return nil, err - } - - glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs) - - return asgs, nil + return m.getAutoscalingGroupsByNames(asgNames) } diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index dd411f27ff27..cf31c655357f 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -18,80 +18,155 @@ package aws import ( "fmt" + "reflect" "sync" + "time" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/aws/aws-sdk-go/aws" "github.com/golang/glog" ) -type autoScalingGroups struct { - registeredAsgs []*asgInformation - instanceToAsg map[AwsRef]*Asg - cacheMutex sync.Mutex - instancesNotInManagedAsg map[AwsRef]struct{} - service autoScalingWrapper +const scaleToZeroSupported = false + +type asgCache struct { + registeredAsgs []*asgInformation + instanceToAsg map[AwsRef]*Asg + notInRegisteredAsg map[AwsRef]bool + mutex sync.Mutex + service autoScalingWrapper + interrupt chan struct{} } -func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups { - registry := &autoScalingGroups{ - registeredAsgs: make([]*asgInformation, 0), - service: service, - instanceToAsg: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), +func newASGCache(service autoScalingWrapper) (*asgCache, error) { + registry := &asgCache{ + registeredAsgs: make([]*asgInformation, 0), + service: service, + instanceToAsg: make(map[AwsRef]*Asg), + notInRegisteredAsg: make(map[AwsRef]bool), + interrupt: make(chan struct{}), } - return registry + go wait.Until(func() { + registry.mutex.Lock() + defer registry.mutex.Unlock() + if err := registry.regenerate(); err != nil { + glog.Errorf("Error while regenerating Asg cache: %v", err) + } + }, time.Hour, registry.interrupt) + + return registry, nil } -// Register registers asg in Aws Manager. -func (m *autoScalingGroups) Register(asg *Asg) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() +// Register ASG. Returns true if the ASG was registered. +func (m *asgCache) Register(asg *Asg) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + for i := range m.registeredAsgs { + if existing := m.registeredAsgs[i].config; existing.AwsRef == asg.AwsRef { + if reflect.DeepEqual(existing, asg) { + return false + } + m.registeredAsgs[i].config = asg + glog.V(4).Infof("Updated ASG %s", asg.AwsRef.Name) + m.invalidateUnownedInstanceCache() + return true + } + } + + glog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name) m.registeredAsgs = append(m.registeredAsgs, &asgInformation{ config: asg, }) + m.invalidateUnownedInstanceCache() + return true +} + +// Unregister ASG. Returns true if the ASG was unregistered. +func (m *asgCache) Unregister(asg *Asg) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + updated := make([]*asgInformation, 0, len(m.registeredAsgs)) + changed := false + for _, existing := range m.registeredAsgs { + if existing.config.AwsRef == asg.AwsRef { + glog.V(1).Infof("Unregistered ASG %s", asg.AwsRef.Name) + continue + } + updated = append(updated, existing) + changed = true + } + m.registeredAsgs = updated + return changed +} + +func (m *asgCache) get() []*asgInformation { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.registeredAsgs } // FindForInstance returns AsgConfig of the given Instance -func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() +func (m *asgCache) FindForInstance(instance *AwsRef) (*Asg, error) { + // TODO(negz): Prevent this calling describe ASGs too often. + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.notInRegisteredAsg[*instance] { + // We already know we don't own this instance. Return early and avoid + // additional calls to describe ASGs. + return nil, nil + } + if config, found := m.instanceToAsg[*instance]; found { return config, nil } - if _, found := m.instancesNotInManagedAsg[*instance]; found { - // The instance is already known to not belong to any configured ASG - // Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups - // See https://github.com/kubernetes/contrib/issues/2541 - return nil, nil - } - if err := m.regenerateCache(); err != nil { + if err := m.regenerate(); err != nil { return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err) } if config, found := m.instanceToAsg[*instance]; found { return config, nil } - // instance does not belong to any configured ASG - glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance) - m.instancesNotInManagedAsg[*instance] = struct{}{} + + m.notInRegisteredAsg[*instance] = true return nil, nil } -func (m *autoScalingGroups) regenerateCache() error { +func (m *asgCache) invalidateUnownedInstanceCache() { + glog.V(4).Info("Invalidating unowned instance cache") + m.notInRegisteredAsg = make(map[AwsRef]bool) +} + +func (m *asgCache) regenerate() error { newCache := make(map[AwsRef]*Asg) - for _, asg := range m.registeredAsgs { - glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name) + names := make([]string, len(m.registeredAsgs)) + configs := make(map[string]*Asg) + for i, asg := range m.registeredAsgs { + names[i] = asg.config.Name + configs[asg.config.Name] = asg.config + } - group, err := m.service.getAutoscalingGroupByName(asg.config.Name) - if err != nil { - return err - } + glog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", names) + groups, err := m.service.getAutoscalingGroupsByNames(names) + if err != nil { + return err + } + for _, group := range groups { for _, instance := range group.Instances { - ref := AwsRef{Name: *instance.InstanceId} - newCache[ref] = asg.config + ref := AwsRef{Name: aws.StringValue(instance.InstanceId)} + newCache[ref] = configs[aws.StringValue(group.AutoScalingGroupName)] } } m.instanceToAsg = newCache return nil } + +// Cleanup closes the channel to signal the go routine to stop that is handling the cache +func (m *asgCache) Cleanup() { + close(m.interrupt) +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index 88616ee6da37..583e09982bfc 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -24,7 +24,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -37,90 +36,15 @@ const ( // awsCloudProvider implements CloudProvider interface. type awsCloudProvider struct { awsManager *AwsManager - asgs []*Asg resourceLimiter *cloudprovider.ResourceLimiter } // BuildAwsCloudProvider builds CloudProvider implementation for AWS. -func BuildAwsCloudProvider(awsManager *AwsManager, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { - if err := discoveryOpts.Validate(); err != nil { - return nil, fmt.Errorf("Failed to build an aws cloud provider: %v", err) - } - if discoveryOpts.StaticDiscoverySpecified() { - return buildStaticallyDiscoveringProvider(awsManager, discoveryOpts.NodeGroupSpecs, resourceLimiter) - } - if discoveryOpts.AutoDiscoverySpecified() { - return buildAutoDiscoveringProvider(awsManager, discoveryOpts.NodeGroupAutoDiscoverySpecs, resourceLimiter) - } - return nil, fmt.Errorf("Failed to build an aws cloud provider: Either node group specs or node group auto discovery spec must be specified") -} - -func parseAutoDiscoverySpec(spec string) ([]string, error) { - tokens := strings.Split(spec, ":") - if len(tokens) != 2 { - return nil, fmt.Errorf("Invalid node group auto discovery spec specified via --node-group-auto-discovery: %s", spec) - } - discoverer := tokens[0] - if discoverer != "asg" { - return nil, fmt.Errorf("Unsupported discoverer specified: %s", discoverer) - } - param := tokens[1] - paramTokens := strings.Split(param, "=") - parameterKey := paramTokens[0] - if parameterKey != "tag" { - return nil, fmt.Errorf("Unsupported parameter key \"%s\" is specified for discoverer \"%s\". The only supported key is \"tag\"", parameterKey, discoverer) - } - tag := paramTokens[1] - if tag == "" { - return nil, fmt.Errorf("Invalid ASG tag for auto discovery specified: ASG tag must not be empty") - } - // Use the k8s cluster name tag to only discover asgs of the cluster denoted by clusterName - // See https://github.com/kubernetes/kubernetes/blob/9ef85a7/pkg/cloudprovider/providers/aws/tags.go#L30-L34 - // for more information about the tag - return strings.Split(tag, ","), nil -} -func buildAutoDiscoveringProvider(awsManager *AwsManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*awsCloudProvider, error) { +func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { aws := &awsCloudProvider{ awsManager: awsManager, - asgs: make([]*Asg, 0), resourceLimiter: resourceLimiter, } - - seen := make(map[string]bool) - for _, spec := range specs { - tags, err := parseAutoDiscoverySpec(spec) - if err != nil { - return nil, err - } - asgs, err := awsManager.getAutoscalingGroupsByTags(tags) - if err != nil { - return nil, fmt.Errorf("Failed to get ASGs: %v", err) - } - for _, asg := range asgs { - // An ASG might match more than one provided spec, but we only ever - // want to add it once. - if seen[*asg.AutoScalingGroupARN] { - continue - } - seen[*asg.AutoScalingGroupARN] = true - aws.addAsg(buildAsg(aws.awsManager, int(*asg.MinSize), int(*asg.MaxSize), *asg.AutoScalingGroupName)) - } - } - - return aws, nil -} - -func buildStaticallyDiscoveringProvider(awsManager *AwsManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*awsCloudProvider, error) { - aws := &awsCloudProvider{ - awsManager: awsManager, - asgs: make([]*Asg, 0), - resourceLimiter: resourceLimiter, - } - for _, spec := range specs { - if err := aws.addNodeGroup(spec); err != nil { - return nil, err - } - } return aws, nil } @@ -130,35 +54,28 @@ func (aws *awsCloudProvider) Cleanup() error { return nil } -// addNodeGroup adds node group defined in string spec. Format: -// minNodes:maxNodes:asgName -func (aws *awsCloudProvider) addNodeGroup(spec string) error { - asg, err := buildAsgFromSpec(spec, aws.awsManager) - if err != nil { - return err - } - aws.addAsg(asg) - return nil -} - -// addAsg adds and registers an asg to this cloud provider -func (aws *awsCloudProvider) addAsg(asg *Asg) { - aws.asgs = append(aws.asgs, asg) - aws.awsManager.RegisterAsg(asg) -} - // Name returns name of the cloud provider. func (aws *awsCloudProvider) Name() string { return ProviderName } +func (aws *awsCloudProvider) asgs() []*Asg { + infos := aws.awsManager.getAsgs() + asgs := make([]*Asg, len(infos)) + for i, info := range infos { + asgs[i] = info.config + } + return asgs +} + // NodeGroups returns all node groups configured for this cloud provider. func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0, len(aws.asgs)) - for _, asg := range aws.asgs { - result = append(result, asg) + asgs := aws.awsManager.getAsgs() + ngs := make([]cloudprovider.NodeGroup, len(asgs)) + for i, asg := range asgs { + ngs[i] = asg.config } - return result + return ngs } // NodeGroupForNode returns the node group for the given node. @@ -195,7 +112,7 @@ func (aws *awsCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimite // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (aws *awsCloudProvider) Refresh() error { - return nil + return aws.awsManager.Refresh() } // AwsRef contains a reference to some entity in AWS/GKE world. @@ -382,26 +299,3 @@ func (asg *Asg) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { nodeInfo.SetNode(node) return nodeInfo, nil } - -func buildAsgFromSpec(value string, awsManager *AwsManager) (*Asg, error) { - spec, err := dynamic.SpecFromString(value, true) - - if err != nil { - return nil, fmt.Errorf("failed to parse node group spec: %v", err) - } - - asg := buildAsg(awsManager, spec.MinSize, spec.MaxSize, spec.Name) - - return asg, nil -} - -func buildAsg(awsManager *AwsManager, minSize int, maxSize int, name string) *Asg { - return &Asg{ - awsManager: awsManager, - minSize: minSize, - maxSize: maxSize, - AwsRef: AwsRef{ - Name: name, - }, - } -} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index a66c61c0e436..dc472770ede1 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -36,23 +36,19 @@ func (a *AutoScalingMock) DescribeAutoScalingGroups(i *autoscaling.DescribeAutoS return args.Get(0).(*autoscaling.DescribeAutoScalingGroupsOutput), nil } +func (a *AutoScalingMock) DescribeAutoScalingGroupsPages(i *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error { + args := a.Called(i, fn) + return args.Error(0) +} + func (a *AutoScalingMock) DescribeLaunchConfigurations(i *autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) { args := a.Called(i) return args.Get(0).(*autoscaling.DescribeLaunchConfigurationsOutput), nil } -func (a *AutoScalingMock) DescribeTags(i *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error) { - return &autoscaling.DescribeTagsOutput{ - Tags: []*autoscaling.TagDescription{ - { - Key: aws.String("foo"), - Value: aws.String("bar"), - ResourceId: aws.String("asg-123456"), - ResourceType: aws.String("auto-scaling-group"), - PropagateAtLaunch: aws.Bool(false), - }, - }, - }, nil +func (a *AutoScalingMock) DescribeTagsPages(i *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error { + args := a.Called(i, fn) + return args.Error(0) } func (a *AutoScalingMock) SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) { @@ -68,30 +64,47 @@ func (a *AutoScalingMock) TerminateInstanceInAutoScalingGroup(input *autoscaling var testService = autoScalingWrapper{&AutoScalingMock{}} var testAwsManager = &AwsManager{ - asgs: &autoScalingGroups{ - registeredAsgs: make([]*asgInformation, 0), - instanceToAsg: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), + asgCache: &asgCache{ + registeredAsgs: make([]*asgInformation, 0), + instanceToAsg: make(map[AwsRef]*Asg), + interrupt: make(chan struct{}), + service: testService, }, - service: testService, - interrupt: make(chan struct{}), + explicitlyConfigured: make(map[AwsRef]bool), + service: testService, } func newTestAwsManagerWithService(service autoScaling) *AwsManager { wrapper := autoScalingWrapper{service} return &AwsManager{ service: wrapper, - asgs: &autoScalingGroups{ - registeredAsgs: make([]*asgInformation, 0), - instanceToAsg: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - service: wrapper, + asgCache: &asgCache{ + registeredAsgs: make([]*asgInformation, 0), + instanceToAsg: make(map[AwsRef]*Asg), + interrupt: make(chan struct{}), + service: wrapper, }, - interrupt: make(chan struct{}), + explicitlyConfigured: make(map[AwsRef]bool), } } +func newTestAwsManagerWithAsgs(t *testing.T, service autoScaling, specs []string) *AwsManager { + m := newTestAwsManagerWithService(service) + for _, spec := range specs { + asg, err := m.buildAsgFromSpec(spec) + if err != nil { + t.Fatalf("bad ASG spec %v: %v", spec, err) + } + m.RegisterAsg(asg) + } + return m +} + func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput { + return testNamedDescribeAutoScalingGroupsOutput("UNUSED", desiredCap, instanceIds...) +} + +func testNamedDescribeAutoScalingGroupsOutput(groupName string, desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput { instances := []*autoscaling.Instance{} for _, id := range instanceIds { instances = append(instances, &autoscaling.Instance{ @@ -101,8 +114,9 @@ func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string return &autoscaling.DescribeAutoScalingGroupsOutput{ AutoScalingGroups: []*autoscaling.Group{ { - DesiredCapacity: aws.Int64(desiredCap), - Instances: instances, + AutoScalingGroupName: aws.String(groupName), + DesiredCapacity: aws.Int64(desiredCap), + Instances: instances, }, }, } @@ -113,9 +127,9 @@ func testProvider(t *testing.T, m *AwsManager) *awsCloudProvider { map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - provider, err := buildStaticallyDiscoveringProvider(m, nil, resourceLimiter) + provider, err := BuildAwsCloudProvider(m, resourceLimiter) assert.NoError(t, err) - return provider + return provider.(*awsCloudProvider) } func TestBuildAwsCloudProvider(t *testing.T) { @@ -123,54 +137,17 @@ func TestBuildAwsCloudProvider(t *testing.T) { map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - m := testAwsManager - _, err := buildStaticallyDiscoveringProvider(m, []string{"bad spec"}, resourceLimiter) - assert.Error(t, err) - - _, err = buildStaticallyDiscoveringProvider(m, nil, resourceLimiter) - assert.NoError(t, err) -} - -func TestParseAutoDiscoverySpec(t *testing.T) { - want := []string{"coolTag", "anotherTag"} - got, err := parseAutoDiscoverySpec("asg:tag=coolTag,anotherTag") - assert.NoError(t, err) - assert.Equal(t, want, got) - - badSpecs := []string{ - "asg", - "tag=coolTag,anotherTag", - "mig:tag=coolTag,anotherTag", - "asg:notatag=coolTag,anotherTag", - } - - for _, spec := range badSpecs { - _, err = parseAutoDiscoverySpec(spec) - assert.Error(t, err) - } -} - -func TestAddNodeGroup(t *testing.T) { - provider := testProvider(t, testAwsManager) - err := provider.addNodeGroup("bad spec") - assert.Error(t, err) - assert.Equal(t, len(provider.asgs), 0) - - err = provider.addNodeGroup("1:5:test-asg") + _, err := BuildAwsCloudProvider(testAwsManager, resourceLimiter) assert.NoError(t, err) - assert.Equal(t, len(provider.asgs), 1) } func TestName(t *testing.T) { provider := testProvider(t, testAwsManager) - assert.Equal(t, provider.Name(), "aws") + assert.Equal(t, provider.Name(), ProviderName) } func TestNodeGroups(t *testing.T) { - provider := testProvider(t, testAwsManager) - assert.Equal(t, len(provider.NodeGroups()), 0) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, testService, []string{"1:5:test-asg"})) assert.Equal(t, len(provider.NodeGroups()), 1) } @@ -181,15 +158,19 @@ func TestNodeGroupForNode(t *testing.T) { }, } service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service) - provider := testProvider(t, m) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + asgs := provider.asgs() - service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}), - MaxRecords: aws.Int64(1), - }).Return(testDescribeAutoScalingGroupsOutput(1, "test-instance-id")) + service.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 1, "test-instance-id"), false) + }).Return(nil) group, err := provider.NodeGroupForNode(node) @@ -197,7 +178,7 @@ func TestNodeGroupForNode(t *testing.T) { assert.Equal(t, group.Id(), "test-asg") assert.Equal(t, group.MinSize(), 1) assert.Equal(t, group.MaxSize(), 5) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) // test node in cluster that is not in a group managed by cluster autoscaler nodeNotInGroup := &apiv1.Node{ @@ -210,7 +191,7 @@ func TestNodeGroupForNode(t *testing.T) { assert.NoError(t, err) assert.Nil(t, group) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 2) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) } func TestAwsRefFromProviderId(t *testing.T) { @@ -224,35 +205,17 @@ func TestAwsRefFromProviderId(t *testing.T) { assert.Equal(t, awsRef, &AwsRef{Name: "i-260942b3"}) } -func TestMaxSize(t *testing.T) { - provider := testProvider(t, testAwsManager) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.asgs), 1) - assert.Equal(t, provider.asgs[0].MaxSize(), 5) -} - -func TestMinSize(t *testing.T) { - provider := testProvider(t, testAwsManager) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.asgs), 1) - assert.Equal(t, provider.asgs[0].MinSize(), 1) -} - func TestTargetSize(t *testing.T) { service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service) - provider := testProvider(t, m) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + asgs := provider.asgs() service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}), + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), MaxRecords: aws.Int64(1), }).Return(testDescribeAutoScalingGroupsOutput(2, "test-instance-id", "second-test-instance-id")) - targetSize, err := provider.asgs[0].TargetSize() + targetSize, err := asgs[0].TargetSize() assert.Equal(t, targetSize, 2) assert.NoError(t, err) @@ -261,24 +224,21 @@ func TestTargetSize(t *testing.T) { func TestIncreaseSize(t *testing.T) { service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service) - provider := testProvider(t, m) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.asgs), 1) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + asgs := provider.asgs() service.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ - AutoScalingGroupName: aws.String(provider.asgs[0].Name), + AutoScalingGroupName: aws.String(asgs[0].Name), DesiredCapacity: aws.Int64(3), HonorCooldown: aws.Bool(false), }).Return(&autoscaling.SetDesiredCapacityOutput{}) service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}), + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), MaxRecords: aws.Int64(1), }).Return(testDescribeAutoScalingGroupsOutput(2, "test-instance-id", "second-test-instance-id")) - err = provider.asgs[0].IncreaseSize(1) + err := asgs[0].IncreaseSize(1) assert.NoError(t, err) service.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1) @@ -286,41 +246,47 @@ func TestIncreaseSize(t *testing.T) { func TestBelongs(t *testing.T) { service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service) - provider := testProvider(t, m) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + asgs := provider.asgs() - service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}), - MaxRecords: aws.Int64(1), - }).Return(testDescribeAutoScalingGroupsOutput(1, "test-instance-id")) + service.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 1, "test-instance-id"), false) + }).Return(nil) invalidNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ ProviderID: "aws:///us-east-1a/invalid-instance-id", }, } - _, err = provider.asgs[0].Belongs(invalidNode) + _, err := asgs[0].Belongs(invalidNode) assert.Error(t, err) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) validNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ ProviderID: "aws:///us-east-1a/test-instance-id", }, } - belongs, err := provider.asgs[0].Belongs(validNode) + belongs, err := asgs[0].Belongs(validNode) assert.Equal(t, belongs, true) assert.NoError(t, err) - // As "test-instance-id" is already known to be managed by test-asg since the first `Belongs` call, - // No additional DescribAutoScalingGroup call is made - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1) + // As "test-instance-id" is already known to be managed by test-asg since + // the first `Belongs` call, No additional DescribAutoScalingGroupsPages + // call is made + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) } func TestDeleteNodes(t *testing.T) { service := &AutoScalingMock{} - m := newTestAwsManagerWithService(service) + provider := testProvider(t, newTestAwsManagerWithAsgs(t, service, []string{"1:5:test-asg"})) + asgs := provider.asgs() service.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String("test-instance-id"), @@ -329,24 +295,34 @@ func TestDeleteNodes(t *testing.T) { Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")}, }) - provider := testProvider(t, m) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - + // Look up the current number of instances... service.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{provider.asgs[0].Name}), + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), MaxRecords: aws.Int64(1), }).Return(testDescribeAutoScalingGroupsOutput(2, "test-instance-id", "second-test-instance-id")) + // Refresh the instance to ASG cache... + service.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{asgs[0].Name}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 2, "test-instance-id", "second-test-instance-id"), false) + }).Return(nil) + node := &apiv1.Node{ Spec: apiv1.NodeSpec{ ProviderID: "aws:///us-east-1a/test-instance-id", }, } - err = provider.asgs[0].DeleteNodes([]*apiv1.Node{node}) + err := asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) service.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1) - service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 2) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroups", 1) + service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) } func TestGetResourceLimiter(t *testing.T) { @@ -358,41 +334,6 @@ func TestGetResourceLimiter(t *testing.T) { assert.NoError(t, err) } -func TestId(t *testing.T) { - provider := testProvider(t, testAwsManager) - err := provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.asgs), 1) - assert.Equal(t, provider.asgs[0].Id(), "test-asg") -} - -func TestDebug(t *testing.T) { - asg := Asg{ - awsManager: testAwsManager, - minSize: 5, - maxSize: 55, - } - asg.Name = "test-asg" - assert.Equal(t, asg.Debug(), "test-asg (5:55)") -} - -func TestBuildAsg(t *testing.T) { - _, err := buildAsgFromSpec("a", nil) - assert.Error(t, err) - _, err = buildAsgFromSpec("a:b:c", nil) - assert.Error(t, err) - _, err = buildAsgFromSpec("1:", nil) - assert.Error(t, err) - _, err = buildAsgFromSpec("1:2:", nil) - assert.Error(t, err) - - asg, err := buildAsgFromSpec("111:222:test-name", nil) - assert.NoError(t, err) - assert.Equal(t, 111, asg.MinSize()) - assert.Equal(t, 222, asg.MaxSize()) - assert.Equal(t, "test-name", asg.Name) -} - func TestCleanup(t *testing.T) { provider := testProvider(t, testAwsManager) err := provider.Cleanup() diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 2f0c5d6f1157..2446e644cbbc 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -33,8 +33,8 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" provider_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) @@ -43,6 +43,7 @@ const ( operationWaitTimeout = 5 * time.Second operationPollInterval = 100 * time.Millisecond maxRecordsReturnedByAPI = 100 + refreshInterval = 1 * time.Minute ) type asgInformation struct { @@ -52,9 +53,11 @@ type asgInformation struct { // AwsManager is handles aws communication and data caching. type AwsManager struct { - service autoScalingWrapper - asgs *autoScalingGroups - interrupt chan struct{} + service autoScalingWrapper + asgCache *asgCache + lastRefresh time.Time + asgAutoDiscoverySpecs []cloudprovider.ASGAutoDiscoveryConfig + explicitlyConfigured map[AwsRef]bool } type asgTemplate struct { @@ -65,7 +68,11 @@ type asgTemplate struct { } // createAwsManagerInternal allows for a customer autoScalingWrapper to be passed in by tests -func createAWSManagerInternal(configReader io.Reader, service *autoScalingWrapper) (*AwsManager, error) { +func createAWSManagerInternal( + configReader io.Reader, + discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, + service *autoScalingWrapper, +) (*AwsManager, error) { if configReader != nil { var cfg provider_aws.CloudConfig if err := gcfg.ReadInto(&cfg, configReader); err != nil { @@ -80,41 +87,188 @@ func createAWSManagerInternal(configReader io.Reader, service *autoScalingWrappe } } + cache, err := newASGCache(*service) + if err != nil { + return nil, err + } + + specs, err := discoveryOpts.ParseASGAutoDiscoverySpecs() + if err != nil { + return nil, err + } + manager := &AwsManager{ - asgs: newAutoScalingGroups(*service), - service: *service, - interrupt: make(chan struct{}), + service: *service, + asgCache: cache, + asgAutoDiscoverySpecs: specs, + explicitlyConfigured: make(map[AwsRef]bool), } - go wait.Until(func() { - manager.asgs.cacheMutex.Lock() - defer manager.asgs.cacheMutex.Unlock() - if err := manager.asgs.regenerateCache(); err != nil { - glog.Errorf("Error while regenerating Asg cache: %v", err) - } - }, time.Hour, manager.interrupt) + if err := manager.fetchExplicitAsgs(discoveryOpts.NodeGroupSpecs); err != nil { + return nil, err + } + + if err := manager.forceRefresh(); err != nil { + return nil, err + } return manager, nil } // CreateAwsManager constructs awsManager object. -func CreateAwsManager(configReader io.Reader) (*AwsManager, error) { - return createAWSManagerInternal(configReader, nil) +func CreateAwsManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AwsManager, error) { + return createAWSManagerInternal(configReader, discoveryOpts, nil) +} + +// Fetch explicitly configured ASGs. These ASGs should never be unregistered +// during refreshes, even if they no longer exist in AWS. +func (m *AwsManager) fetchExplicitAsgs(specs []string) error { + changed := false + for _, spec := range specs { + asg, err := m.buildAsgFromSpec(spec) + if err != nil { + return fmt.Errorf("failed to parse node group spec: %v", err) + } + if m.RegisterAsg(asg) { + changed = true + } + m.explicitlyConfigured[asg.AwsRef] = true + } + + if changed { + if err := m.regenerateCache(); err != nil { + return err + } + } + return nil +} + +func (m *AwsManager) buildAsgFromSpec(spec string) (*Asg, error) { + s, err := dynamic.SpecFromString(spec, scaleToZeroSupported) + if err != nil { + return nil, fmt.Errorf("failed to parse node group spec: %v", err) + } + asg := &Asg{ + awsManager: m, + AwsRef: AwsRef{Name: s.Name}, + minSize: s.MinSize, + maxSize: s.MaxSize, + } + return asg, nil +} + +// Fetch automatically discovered ASGs. These ASGs should be unregistered if +// they no longer exist in AWS. +func (m *AwsManager) fetchAutoAsgs() error { + exists := make(map[AwsRef]bool) + changed := false + for _, spec := range m.asgAutoDiscoverySpecs { + groups, err := m.getAutoscalingGroupsByTags(spec.TagKeys) + if err != nil { + return fmt.Errorf("cannot autodiscover ASGs: %s", err) + } + for _, g := range groups { + asg, err := m.buildAsgFromAWS(g) + if err != nil { + return err + } + exists[asg.AwsRef] = true + if m.explicitlyConfigured[asg.AwsRef] { + // This ASG was explicitly configured, but would also be + // autodiscovered. We want the explicitly configured min and max + // nodes to take precedence. + glog.V(3).Infof("Ignoring explicitly configured ASG %s for autodiscovery.", asg.AwsRef.Name) + continue + } + if m.RegisterAsg(asg) { + glog.V(3).Infof("Autodiscovered ASG %s using tags %v", asg.AwsRef.Name, spec.TagKeys) + changed = true + } + } + } + + for _, asg := range m.getAsgs() { + if !exists[asg.config.AwsRef] && !m.explicitlyConfigured[asg.config.AwsRef] { + m.UnregisterAsg(asg.config) + changed = true + } + } + + if changed { + if err := m.regenerateCache(); err != nil { + return err + } + } + + return nil +} + +func (m *AwsManager) buildAsgFromAWS(g *autoscaling.Group) (*Asg, error) { + spec := dynamic.NodeGroupSpec{ + Name: aws.StringValue(g.AutoScalingGroupName), + MinSize: int(aws.Int64Value(g.MinSize)), + MaxSize: int(aws.Int64Value(g.MaxSize)), + SupportScaleToZero: scaleToZeroSupported, + } + if verr := spec.Validate(); verr != nil { + return nil, fmt.Errorf("failed to create node group spec: %v", verr) + } + asg := &Asg{ + awsManager: m, + AwsRef: AwsRef{Name: spec.Name}, + minSize: spec.MinSize, + maxSize: spec.MaxSize, + } + return asg, nil } -// RegisterAsg registers asg in Aws Manager. -func (m *AwsManager) RegisterAsg(asg *Asg) { - m.asgs.Register(asg) +// Refresh is called before every main loop and can be used to dynamically update cloud provider state. +// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). +func (m *AwsManager) Refresh() error { + if m.lastRefresh.Add(refreshInterval).After(time.Now()) { + return nil + } + return m.forceRefresh() +} + +func (m *AwsManager) forceRefresh() error { + if err := m.fetchAutoAsgs(); err != nil { + glog.Errorf("Failed to fetch ASGs: %v", err) + return err + } + m.lastRefresh = time.Now() + glog.V(2).Infof("Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval)) + return nil +} + +func (m *AwsManager) getAsgs() []*asgInformation { + return m.asgCache.get() +} + +// RegisterAsg registers an ASG. +func (m *AwsManager) RegisterAsg(asg *Asg) bool { + return m.asgCache.Register(asg) +} + +// UnregisterAsg unregisters an ASG. +func (m *AwsManager) UnregisterAsg(asg *Asg) bool { + return m.asgCache.Unregister(asg) } // GetAsgForInstance returns AsgConfig of the given Instance func (m *AwsManager) GetAsgForInstance(instance *AwsRef) (*Asg, error) { - return m.asgs.FindForInstance(instance) + return m.asgCache.FindForInstance(instance) +} + +func (m *AwsManager) regenerateCache() error { + m.asgCache.mutex.Lock() + defer m.asgCache.mutex.Unlock() + return m.asgCache.regenerate() } -// Cleanup closes the channel to signal the go routine to stop that is handling the cache +// Cleanup the ASG cache. func (m *AwsManager) Cleanup() { - close(m.interrupt) + m.asgCache.Cleanup() } func (m *AwsManager) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) { @@ -160,12 +314,12 @@ func (m *AwsManager) DeleteInstances(instances []*AwsRef) error { if len(instances) == 0 { return nil } - commonAsg, err := m.asgs.FindForInstance(instances[0]) + commonAsg, err := m.asgCache.FindForInstance(instances[0]) if err != nil { return err } for _, instance := range instances { - asg, err := m.asgs.FindForInstance(instance) + asg, err := m.asgCache.FindForInstance(instance) if err != nil { return err } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 8430ed23276e..aa11a62661b8 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -17,15 +17,18 @@ limitations under the License. package aws import ( + "fmt" + "strings" "testing" + "github.com/stretchr/testify/mock" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "runtime" ) func TestBuildGenericLabels(t *testing.T) { @@ -94,11 +97,150 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool { } return set } +func TestBuildAsg(t *testing.T) { + do := cloudprovider.NodeGroupDiscoveryOptions{} + m, err := createAWSManagerInternal(nil, do, &testService) + assert.NoError(t, err) + + asg, err := m.buildAsgFromSpec("1:5:test-asg") + assert.NoError(t, err) + assert.Equal(t, asg.MinSize(), 1) + assert.Equal(t, asg.MaxSize(), 5) + assert.Equal(t, asg.Id(), "test-asg") + assert.Equal(t, asg.Name, "test-asg") + assert.Equal(t, asg.Debug(), "test-asg (1:5)") + + _, err = m.buildAsgFromSpec("a") + assert.Error(t, err) + _, err = m.buildAsgFromSpec("a:b:c") + assert.Error(t, err) + _, err = m.buildAsgFromSpec("1:") + assert.Error(t, err) + _, err = m.buildAsgFromSpec("1:2:") + assert.Error(t, err) +} + +func validateAsg(t *testing.T, asg *Asg, name string, minSize int, maxSize int) { + assert.Equal(t, name, asg.Name) + assert.Equal(t, minSize, asg.minSize) + assert.Equal(t, maxSize, asg.maxSize) +} + +func TestFetchExplicitAsgs(t *testing.T) { + min, max, groupname := 1, 10, "coolasg" + + s := &AutoScalingMock{} + s.On("DescribeAutoScalingGroups", &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: []*string{aws.String(groupname)}, + MaxRecords: aws.Int64(1), + }).Return(&autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: []*autoscaling.Group{ + {AutoScalingGroupName: aws.String(groupname)}, + }, + }) + + s.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{groupname}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(&autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: []*autoscaling.Group{ + {AutoScalingGroupName: aws.String(groupname)}, + }}, false) + }).Return(nil) + + do := cloudprovider.NodeGroupDiscoveryOptions{ + // Register the same node group twice with different max nodes. + // The intention is to test that the asgs.Register method will update + // the node group instead of registering it twice. + NodeGroupSpecs: []string{ + fmt.Sprintf("%d:%d:%s", min, max-1, groupname), + fmt.Sprintf("%d:%d:%s", min, max, groupname), + }, + } + // fetchExplicitASGs is called at manager creation time. + m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s}) + assert.NoError(t, err) + + asgs := m.asgCache.get() + assert.Equal(t, 1, len(asgs)) + validateAsg(t, asgs[0].config, groupname, min, max) +} + +func TestFetchAutoAsgs(t *testing.T) { + min, max := 1, 10 + groupname, tags := "coolasg", []string{"tag", "anothertag"} + + s := &AutoScalingMock{} + // Lookup groups associated with tags + s.On("DescribeTagsPages", + &autoscaling.DescribeTagsInput{ + Filters: []*autoscaling.Filter{ + {Name: aws.String("key"), Values: aws.StringSlice([]string{tags[0]})}, + {Name: aws.String("key"), Values: aws.StringSlice([]string{tags[1]})}, + }, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeTagsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeTagsOutput, bool) bool) + fn(&autoscaling.DescribeTagsOutput{ + Tags: []*autoscaling.TagDescription{ + {ResourceId: aws.String(groupname)}, + {ResourceId: aws.String(groupname)}, + }}, false) + }).Return(nil).Once() + + // Describe the group to register it, then again to generate the instance + // cache. + s.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{groupname}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(&autoscaling.DescribeAutoScalingGroupsOutput{ + AutoScalingGroups: []*autoscaling.Group{{ + AutoScalingGroupName: aws.String(groupname), + MinSize: aws.Int64(int64(min)), + MaxSize: aws.Int64(int64(max)), + }}}, false) + }).Return(nil).Twice() + + do := cloudprovider.NodeGroupDiscoveryOptions{ + NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("asg:tag=%s", strings.Join(tags, ","))}, + } + + // fetchAutoASGs is called at manager creation time, via forceRefresh + m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s}) + assert.NoError(t, err) + + asgs := m.asgCache.get() + assert.Equal(t, 1, len(asgs)) + validateAsg(t, asgs[0].config, groupname, min, max) + + // Simulate the previously discovered ASG disappearing + s.On("DescribeTagsPages", + &autoscaling.DescribeTagsInput{ + Filters: []*autoscaling.Filter{ + {Name: aws.String("key"), Values: aws.StringSlice([]string{tags[0]})}, + {Name: aws.String("key"), Values: aws.StringSlice([]string{tags[1]})}, + }, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeTagsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeTagsOutput, bool) bool) + fn(&autoscaling.DescribeTagsOutput{Tags: []*autoscaling.TagDescription{}}, false) + }).Return(nil).Once() -func testCreateAWSManager(t *testing.T) { - manager, awsError := createAWSManagerInternal(nil, &testService) - assert.Nil(t, awsError, "Expected nil from the error when creating AWS Manager") - currentNumberRoutines := runtime.NumGoroutine() - manager.Cleanup() - assert.True(t, currentNumberRoutines-1 == runtime.NumGoroutine(), "current number of go routines should be one less since we called close") + err = m.fetchAutoAsgs() + assert.NoError(t, err) + assert.Empty(t, m.asgCache.get()) } diff --git a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go index 0113ea93f227..8cf8ca8ab3b5 100644 --- a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go +++ b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go @@ -72,6 +72,9 @@ func (b CloudProviderBuilder) Build(discoveryOpts cloudprovider.NodeGroupDiscove case gce.ProviderNameGCE: return b.buildGCE(discoveryOpts, resourceLimiter, gce.ModeGCE) case gce.ProviderNameGKE: + if discoveryOpts.DiscoverySpecified() { + glog.Fatalf("GKE gets nodegroup specification via API, command line specs are not allowed") + } if b.autoprovisioningEnabled { return b.buildGCE(discoveryOpts, resourceLimiter, gce.ModeGKENAP) } @@ -104,16 +107,16 @@ func (b CloudProviderBuilder) buildGCE(do cloudprovider.NodeGroupDiscoveryOption defer config.Close() } - m, err := gce.CreateGceManager(config, mode, b.clusterName) + manager, err := gce.CreateGceManager(config, mode, b.clusterName, do) if err != nil { glog.Fatalf("Failed to create GCE Manager: %v", err) } - p, err := gce.BuildGceCloudProvider(m, do, rl) + provider, err := gce.BuildGceCloudProvider(manager, rl) if err != nil { glog.Fatalf("Failed to create GCE cloud provider: %v", err) } - return p + return provider } func (b CloudProviderBuilder) buildAWS(do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { @@ -127,16 +130,16 @@ func (b CloudProviderBuilder) buildAWS(do cloudprovider.NodeGroupDiscoveryOption defer config.Close() } - m, err := aws.CreateAwsManager(config) + manager, err := aws.CreateAwsManager(config, do) if err != nil { glog.Fatalf("Failed to create AWS Manager: %v", err) } - p, err := aws.BuildAwsCloudProvider(m, do, rl) + provider, err := aws.BuildAwsCloudProvider(manager, rl) if err != nil { glog.Fatalf("Failed to create AWS cloud provider: %v", err) } - return p + return provider } func (b CloudProviderBuilder) buildAzure(do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { @@ -152,15 +155,15 @@ func (b CloudProviderBuilder) buildAzure(do cloudprovider.NodeGroupDiscoveryOpti } else { glog.Info("Creating Azure Manager with default configuration.") } - m, err := azure.CreateAzureManager(config) + manager, err := azure.CreateAzureManager(config) if err != nil { glog.Fatalf("Failed to create Azure Manager: %v", err) } - p, err := azure.BuildAzureCloudProvider(m, do.NodeGroupSpecs, rl) + provider, err := azure.BuildAzureCloudProvider(manager, do.NodeGroupSpecs, rl) if err != nil { glog.Fatalf("Failed to create Azure cloud provider: %v", err) } - return p + return provider } func (b CloudProviderBuilder) buildKubemark(do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { @@ -196,9 +199,9 @@ func (b CloudProviderBuilder) buildKubemark(do cloudprovider.NodeGroupDiscoveryO } go kubemarkController.Run(stop) - p, err := kubemark.BuildKubemarkCloudProvider(kubemarkController, do.NodeGroupSpecs, rl) + provider, err := kubemark.BuildKubemarkCloudProvider(kubemarkController, do.NodeGroupSpecs, rl) if err != nil { glog.Fatalf("Failed to create Kubemark cloud provider: %v", err) } - return p + return provider } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index 905d13cd9b76..2dde494f5d16 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -18,15 +18,12 @@ package gce import ( "fmt" - "regexp" - "strconv" "strings" "time" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) @@ -40,19 +37,8 @@ const ( const ( maxAutoprovisionedSize = 1000 minAutoprovisionedSize = 0 - - autoDiscovererTypeMIG = "mig" - autoDiscovererKeyPrefix = "prefix" - autoDiscovererKeyMinNodes = "min" - autoDiscovererKeyMaxNodes = "max" ) -var validAutoDiscovererKeys = strings.Join([]string{ - autoDiscovererKeyPrefix, - autoDiscovererKeyMinNodes, - autoDiscovererKeyMaxNodes, -}, ", ") - // Big machines are temporarily commented out. // TODO(mwielgus): get this list programatically var autoprovisionedMachineTypes = []string{ @@ -85,104 +71,8 @@ type GceCloudProvider struct { } // BuildGceCloudProvider builds CloudProvider implementation for GCE. -func BuildGceCloudProvider(gceManager GceManager, do cloudprovider.NodeGroupDiscoveryOptions, resourceLimiter *cloudprovider.ResourceLimiter) (*GceCloudProvider, error) { - if err := do.Validate(); err != nil { - return nil, fmt.Errorf("Failed to build a GCE cloud provider: %v", err) - } - if gceManager.getMode() == ModeGKE && !do.NoDiscoverySpecified() { - return nil, fmt.Errorf("GKE gets nodegroup specification via API, command line specs are not allowed") - } - if do.AutoDiscoverySpecified() { - return buildAutoDiscoveringProvider(gceManager, do.NodeGroupAutoDiscoverySpecs, resourceLimiter) - } - return buildStaticallyDiscoveringProvider(gceManager, do.NodeGroupSpecs, resourceLimiter) -} - -func buildStaticallyDiscoveringProvider(gceManager GceManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*GceCloudProvider, error) { - gce := &GceCloudProvider{ - gceManager: gceManager, - resourceLimiterFromFlags: resourceLimiter, - } - for _, spec := range specs { - if err := gce.addNodeGroup(spec); err != nil { - return nil, err - } - } - return gce, nil -} - -type autoDiscovererConfig struct { - migRe *regexp.Regexp - minNodes string - maxNodes string -} - -func parseAutoDiscoverySpec(spec string) (autoDiscovererConfig, error) { - cfg := autoDiscovererConfig{} - - tokens := strings.Split(spec, ":") - if len(tokens) != 2 { - return cfg, fmt.Errorf("spec \"%s\" should be discoverer:key=value,key=value", spec) - } - discoverer := tokens[0] - if discoverer != autoDiscovererTypeMIG { - return cfg, fmt.Errorf("unsupported discoverer specified: %s", discoverer) - } - - for _, arg := range strings.Split(tokens[1], ",") { - kv := strings.Split(arg, "=") - k, v := kv[0], kv[1] - - switch k { - case autoDiscovererKeyPrefix: - var err error - if cfg.migRe, err = regexp.Compile(fmt.Sprintf("^%s.+", v)); err != nil { - return cfg, fmt.Errorf("invalid instance group name prefix \"%s\" - \"^%s.+\" must be a valid RE2 regexp", v, v) - } - case autoDiscovererKeyMinNodes: - if _, err := strconv.Atoi(v); err != nil { - return cfg, fmt.Errorf("invalid minimum nodes: %s", v) - } - cfg.minNodes = v - case autoDiscovererKeyMaxNodes: - if _, err := strconv.Atoi(v); err != nil { - return cfg, fmt.Errorf("invalid maximum nodes: %s", v) - } - cfg.maxNodes = v - default: - return cfg, fmt.Errorf("unsupported key \"%s\" is specified for discoverer \"%s\". Supported keys are \"%s\"", k, discoverer, validAutoDiscovererKeys) - } - } - return cfg, nil -} - -func buildAutoDiscoveringProvider(gceManager GceManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*GceCloudProvider, error) { - gce := &GceCloudProvider{gceManager: gceManager, resourceLimiterFromFlags: resourceLimiter} - - seen := make(map[string]bool) - for _, spec := range specs { - cfg, err := parseAutoDiscoverySpec(spec) - if err != nil { - return nil, fmt.Errorf("invalid node group auto discovery spec \"%s\": %v", spec, err) - } - links, err := gceManager.findMigsNamed(cfg.migRe) - if err != nil { - return nil, fmt.Errorf("cannot autodiscover managed instance groups: %s", err) - } - for _, link := range links { - // A MIG might match more than one provided spec, but we only ever - // want to add it once. - if seen[link] { - continue - } - seen[link] = true - spec := fmt.Sprintf("%s:%s:%s", cfg.minNodes, cfg.maxNodes, link) - if err := gce.addNodeGroup(spec); err != nil { - return nil, err - } - } - } - return gce, nil +func BuildGceCloudProvider(gceManager GceManager, resourceLimiter *cloudprovider.ResourceLimiter) (*GceCloudProvider, error) { + return &GceCloudProvider{gceManager: gceManager, resourceLimiterFromFlags: resourceLimiter}, nil } // Cleanup cleans up all resources before the cloud provider is removed @@ -191,17 +81,6 @@ func (gce *GceCloudProvider) Cleanup() error { return nil } -// addNodeGroup adds node group defined in string spec. Format: -// minNodes:maxNodes:migUrl -func (gce *GceCloudProvider) addNodeGroup(spec string) error { - mig, err := buildMig(spec, gce.gceManager) - if err != nil { - return err - } - gce.gceManager.RegisterMig(mig) - return nil -} - // Name returns name of the cloud provider. func (gce *GceCloudProvider) Name() string { // Technically we're both ProviderNameGCE and ProviderNameGKE... @@ -502,24 +381,3 @@ func (mig *Mig) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { nodeInfo.SetNode(node) return nodeInfo, nil } - -func buildMig(value string, gceManager GceManager) (*Mig, error) { - spec, err := dynamic.SpecFromString(value, true) - - if err != nil { - return nil, fmt.Errorf("failed to parse node group spec: %v", err) - } - - mig := Mig{ - gceManager: gceManager, - minSize: spec.MinSize, - maxSize: spec.MaxSize, - exist: true, - autoprovisioned: false, - } - - if mig.Project, mig.Zone, mig.Name, err = ParseMigUrl(spec.Name); err != nil { - return nil, fmt.Errorf("failed to parse mig url: %s got error: %v", spec.Name, err) - } - return &mig, nil -} diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go index d3f5d944f11f..17339f88910b 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go @@ -17,7 +17,6 @@ limitations under the License. package gce import ( - "errors" "fmt" "net/http" "reflect" @@ -129,132 +128,16 @@ func (m *gceManagerMock) findMigsNamed(name *regexp.Regexp) ([]string, error) { return args.Get(0).([]string), args.Error(1) } -func TestBuildStaticGceCloudProvider(t *testing.T) { +func TestBuildGceCloudProvider(t *testing.T) { gceManagerMock := &gceManagerMock{} - ng1Name := "https://content.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/ng1" - ng2Name := "https://content.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/ng2" - resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - // GCE mode with explicit node groups. - gceManagerMock.On("getMode").Return(ModeGCE).Once() - gceManagerMock.On("RegisterMig", - mock.MatchedBy(func(mig *Mig) bool { - return mig.Name == "ng1" || mig.Name == "ng2" - })).Return(true).Times(2) - - do := cloudprovider.NodeGroupDiscoveryOptions{ - NodeGroupSpecs: []string{"0:10:" + ng1Name, "0:5:https:" + ng2Name}, - } - provider, err := BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.NoError(t, err) - assert.NotNil(t, provider) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Error on GKE mode with specs. - gceManagerMock.On("getMode").Return(ModeGKE).Once() - _, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.Error(t, err) - assert.Equal(t, "GKE gets nodegroup specification via API, command line specs are not allowed", err.Error()) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Ensure GKE mode works with no specs. - gceManagerMock.On("getMode").Return(ModeGKE).Once() - do = cloudprovider.NodeGroupDiscoveryOptions{} - provider, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) + provider, err := BuildGceCloudProvider(gceManagerMock, resourceLimiter) assert.NoError(t, err) assert.NotNil(t, provider) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Error with both explicit and autodiscovery specs. - do = cloudprovider.NodeGroupDiscoveryOptions{ - NodeGroupSpecs: []string{"0:10:" + ng1Name, "0:5:https:" + ng2Name}, - NodeGroupAutoDiscoverySpecs: []string{"mig:prefix=pfx,min=0,max=10"}, - } - _, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.Error(t, err) - mock.AssertExpectationsForObjects(t, gceManagerMock) -} -func TestBuildAutodiscoveringGceCloudProvider(t *testing.T) { - gceManagerMock := &gceManagerMock{} - - ng1Name := "https://content.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/ng1" - ng2Name := "https://content.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/ng2" - - resourceLimiter := cloudprovider.NewResourceLimiter( - map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, - map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - - // GCE mode with autodiscovery. - gceManagerMock.On("getMode").Return(ModeGCE).Once() - gceManagerMock.On("findMigsNamed").Return([]string{ng1Name, ng2Name}, nil).Twice() - gceManagerMock.On("RegisterMig", - mock.MatchedBy(func(mig *Mig) bool { - return mig.Name == "ng1" || mig.Name == "ng2" - })).Return(true).Times(2) - - do := cloudprovider.NodeGroupDiscoveryOptions{ - NodeGroupAutoDiscoverySpecs: []string{ - "mig:prefix=ng,min=0,max=10", - "mig:prefix=n,min=1,max=2", - }, - } - provider, err := BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.NoError(t, err) - assert.NotNil(t, provider) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Error finding instance groups - gceManagerMock.On("getMode").Return(ModeGCE).Once() - gceManagerMock.On("findMigsNamed").Return([]string{}, errors.New("nope")).Once() - _, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.Error(t, err) - assert.Equal(t, "cannot autodiscover managed instance groups: nope", err.Error()) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Error on GKE mode with autodiscovery specs. - gceManagerMock.On("getMode").Return(ModeGKE).Once() - _, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.Error(t, err) - assert.Equal(t, "GKE gets nodegroup specification via API, command line specs are not allowed", err.Error()) - mock.AssertExpectationsForObjects(t, gceManagerMock) - - // Bad autodiscovery spec - do = cloudprovider.NodeGroupDiscoveryOptions{ - NodeGroupAutoDiscoverySpecs: []string{"mig"}, - } - gceManagerMock.On("getMode").Return(ModeGCE).Once() - _, err = BuildGceCloudProvider(gceManagerMock, do, resourceLimiter) - assert.Error(t, err) - mock.AssertExpectationsForObjects(t, gceManagerMock) -} - -func TestParseAutoDiscoverySpec(t *testing.T) { - want := autoDiscovererConfig{ - migRe: regexp.MustCompile("^pfx.+"), - minNodes: "0", - maxNodes: "10", - } - got, err := parseAutoDiscoverySpec("mig:prefix=pfx,min=0,max=10") - assert.NoError(t, err) - assert.Equal(t, want, got) - - badSpecs := []string{ - "prefix=pfx,min=0,max=10", - "asg:prefix=pfx,min=0,max=10", - "mig:prefix=pfx,min=0,max=10,unknown=hi", - "mig:prefix=pfx,min=a,max=10", - "mig:prefix=pfx,min=10,max=donkey", - "mig:prefix=(a,min=1,max=10", - } - - for _, spec := range badSpecs { - _, err = parseAutoDiscoverySpec(spec) - assert.Error(t, err) - } } func TestNodeGroups(t *testing.T) { @@ -635,29 +518,3 @@ func TestGceRefFromProviderId(t *testing.T) { assert.NoError(t, err) assert.Equal(t, GceRef{"project1", "us-central1-b", "name1"}, *ref) } - -func TestBuildMig(t *testing.T) { - _, err := buildMig("a", nil) - assert.Error(t, err) - _, err = buildMig("a:b:c", nil) - assert.Error(t, err) - _, err = buildMig("1:2:x", nil) - assert.Error(t, err) - _, err = buildMig("1:2:", nil) - assert.Error(t, err) - - mig, err := buildMig("111:222:https://content.googleapis.com/compute/v1/projects/test-project/zones/test-zone/instanceGroups/test-name", nil) - assert.NoError(t, err) - assert.Equal(t, 111, mig.MinSize()) - assert.Equal(t, 222, mig.MaxSize()) - assert.Equal(t, "test-zone", mig.Zone) - assert.Equal(t, "test-name", mig.Name) -} - -func TestBuildKubeProxy(t *testing.T) { - mig, _ := buildMig("1:20:https://content.googleapis.com/compute/v1/projects/test-project/zones/test-zone/instanceGroups/test-name", nil) - pod := cloudprovider.BuildKubeProxy(mig.Id()) - assert.Equal(t, 1, len(pod.Spec.Containers)) - cpu := pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceCPU] - assert.Equal(t, int64(100), cpu.MilliValue()) -} diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index 783143542bdb..65598b3a0948 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -18,7 +18,6 @@ package gce import ( "context" - "errors" "flag" "fmt" "io" @@ -42,6 +41,7 @@ import ( gke_beta "google.golang.org/api/container/v1beta1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" provider_gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" ) @@ -73,6 +73,7 @@ const ( nodeAutoprovisioningPrefix = "nap" napMaxNodes = 1000 napMinNodes = 0 + scaleToZeroSupported = true ) var ( @@ -134,19 +135,21 @@ type gceManagerImpl struct { cacheMutex sync.Mutex migsMutex sync.Mutex - location string - projectId string - clusterName string - mode GcpCloudProviderMode - templates *templateBuilder - interrupt chan struct{} - isRegional bool - resourceLimiter *cloudprovider.ResourceLimiter - lastRefresh time.Time + location string + projectId string + clusterName string + mode GcpCloudProviderMode + templates *templateBuilder + interrupt chan struct{} + isRegional bool + explicitlyConfigured map[GceRef]bool + migAutoDiscoverySpecs []cloudprovider.MIGAutoDiscoveryConfig + resourceLimiter *cloudprovider.ResourceLimiter + lastRefresh time.Time } // CreateGceManager constructs gceManager object. -func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, clusterName string) (GceManager, error) { +func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, clusterName string, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (GceManager, error) { // Create Google Compute Engine token. var err error tokenSource := google.ComputeTokenSource("") @@ -214,10 +217,20 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster projectId: projectId, service: gceService, }, - interrupt: make(chan struct{}), + interrupt: make(chan struct{}), + explicitlyConfigured: make(map[GceRef]bool), } - if mode == ModeGKE { + switch mode { + case ModeGCE: + var err error + if err = manager.fetchExplicitMigs(discoveryOpts.NodeGroupSpecs); err != nil { + return nil, fmt.Errorf("failed to fetch MIGs: %v", err) + } + if manager.migAutoDiscoverySpecs, err = discoveryOpts.ParseMIGAutoDiscoverySpecs(); err != nil { + return nil, err + } + case ModeGKE: gkeService, err := gke.New(client) if err != nil { return nil, err @@ -236,14 +249,7 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster } manager.gkeBetaService = gkeBetaService } - err = manager.fetchAllNodePools() - if err != nil { - glog.Errorf("Failed to fetch node pools: %v", err) - return nil, err - } - } - - if mode == ModeGKENAP { + case ModeGKENAP: gkeAlphaService, err := gke_alpha.New(client) if err != nil { return nil, err @@ -252,20 +258,12 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster gkeAlphaService.BasePath = *gkeAPIEndpoint } manager.gkeAlphaService = gkeAlphaService - err = manager.fetchAllNodePools() - if err != nil { - glog.Errorf("Failed to fetch node pools: %v", err) - return nil, err - } - err = manager.fetchResourceLimiter() - if err != nil { - glog.Errorf("Failed to fetch resource limits: %v", err) - return nil, err - } glog.V(1).Info("Using GKE-NAP mode") } - manager.lastRefresh = time.Now() + if err := manager.forceRefresh(); err != nil { + return nil, err + } go wait.Until(func() { manager.cacheMutex.Lock() @@ -284,6 +282,12 @@ func (m *gceManagerImpl) Cleanup() error { return nil } +func (m *gceManagerImpl) assertGCE() { + if m.mode != ModeGCE { + glog.Fatalf("This should run only in GCE mode") + } +} + func (m *gceManagerImpl) assertGKE() { if m.mode != ModeGKE { glog.Fatalf("This should run only in GKE mode") @@ -687,7 +691,7 @@ func (m *gceManagerImpl) DeleteInstances(instances []*GceRef) error { return err } if mig != commonMig { - return errors.New("Cannot delete instances which don't belong to the same MIG.") + return fmt.Errorf("Cannot delete instances which don't belong to the same MIG.") } } @@ -814,27 +818,145 @@ func (m *gceManagerImpl) getTemplates() *templateBuilder { } func (m *gceManagerImpl) Refresh() error { - if m.mode == ModeGCE { + if m.lastRefresh.Add(refreshInterval).After(time.Now()) { return nil } - if m.lastRefresh.Add(refreshInterval).Before(time.Now()) { - err := m.fetchAllNodePools() - if err != nil { + return m.forceRefresh() +} + +func (m *gceManagerImpl) forceRefresh() error { + switch m.mode { + case ModeGCE: + if err := m.fetchAutoMigs(); err != nil { + glog.Errorf("Failed to fetch MIGs: %v", err) + return err + } + case ModeGKENAP: + if err := m.fetchResourceLimiter(); err != nil { + glog.Errorf("Failed to fetch resource limits: %v", err) return err } + fallthrough + case ModeGKE: + if err := m.fetchAllNodePools(); err != nil { + glog.Errorf("Failed to fetch node pools: %v", err) + return err + } + } + m.lastRefresh = time.Now() + glog.V(2).Infof("Refreshed GCE resources, next refresh after %v", m.lastRefresh.Add(refreshInterval)) + return nil +} - err = m.fetchResourceLimiter() +// Fetch explicitly configured MIGs. These MIGs should never be unregistered +// during refreshes, even if they no longer exist in GCE. +func (m *gceManagerImpl) fetchExplicitMigs(specs []string) error { + m.assertGCE() + + changed := false + for _, spec := range specs { + mig, err := m.buildMigFromSpec(spec) if err != nil { return err } + if m.RegisterMig(mig) { + changed = true + } + m.explicitlyConfigured[mig.GceRef] = true + } - m.lastRefresh = time.Now() - glog.V(2).Infof("Refreshed NodePools list and resource limits, next refresh after %v", m.lastRefresh.Add(refreshInterval)) - return nil + if changed { + m.cacheMutex.Lock() + defer m.cacheMutex.Unlock() + + if err := m.regenerateCache(); err != nil { + return err + } } return nil } +func (m *gceManagerImpl) buildMigFromSpec(spec string) (*Mig, error) { + s, err := dynamic.SpecFromString(spec, scaleToZeroSupported) + if err != nil { + return nil, fmt.Errorf("failed to parse node group spec: %v", err) + } + mig := &Mig{gceManager: m, minSize: s.MinSize, maxSize: s.MaxSize, exist: true} + if mig.Project, mig.Zone, mig.Name, err = ParseMigUrl(s.Name); err != nil { + return nil, fmt.Errorf("failed to parse mig url: %s got error: %v", s.Name, err) + } + return mig, nil +} + +// Fetch automatically discovered MIGs. These MIGs should be unregistered if +// they no longer exist in GCE. +func (m *gceManagerImpl) fetchAutoMigs() error { + m.assertGCE() + + exists := make(map[GceRef]bool) + changed := false + for _, cfg := range m.migAutoDiscoverySpecs { + links, err := m.findMigsNamed(cfg.Re) + if err != nil { + return fmt.Errorf("cannot autodiscover managed instance groups: %s", err) + } + for _, link := range links { + mig, err := m.buildMigFromAutoCfg(link, cfg) + if err != nil { + return err + } + exists[mig.GceRef] = true + if m.explicitlyConfigured[mig.GceRef] { + // This MIG was explicitly configured, but would also be + // autodiscovered. We want the explicitly configured min and max + // nodes to take precedence. + glog.V(3).Infof("Ignoring explicitly configured MIG %s for autodiscovery.", mig.GceRef.Name) + continue + } + if m.RegisterMig(mig) { + glog.V(3).Infof("Autodiscovered MIG %s using regexp %s", mig.GceRef.Name, cfg.Re.String()) + changed = true + } + } + } + + for _, mig := range m.getMigs() { + if !exists[mig.config.GceRef] && !m.explicitlyConfigured[mig.config.GceRef] { + m.UnregisterMig(mig.config) + changed = true + } + } + + if changed { + m.cacheMutex.Lock() + defer m.cacheMutex.Unlock() + + if err := m.regenerateCache(); err != nil { + return err + } + } + + return nil +} + +func (m *gceManagerImpl) buildMigFromAutoCfg(link string, cfg cloudprovider.MIGAutoDiscoveryConfig) (*Mig, error) { + spec := dynamic.NodeGroupSpec{ + Name: link, + MinSize: cfg.MinSize, + MaxSize: cfg.MaxSize, + SupportScaleToZero: scaleToZeroSupported, + } + if verr := spec.Validate(); verr != nil { + return nil, fmt.Errorf("failed to create node group spec: %v", verr) + } + mig := &Mig{gceManager: m, minSize: spec.MinSize, maxSize: spec.MaxSize, exist: true} + var err error + if mig.Project, mig.Zone, mig.Name, err = ParseMigUrl(spec.Name); err != nil { + return nil, fmt.Errorf("failed to parse mig url: %s got error: %v", spec.Name, err) + } + return mig, nil +} + func (m *gceManagerImpl) fetchResourceLimiter() error { if m.mode == ModeGKENAP { cluster, err := m.gkeAlphaService.Projects.Zones.Clusters.Get(m.projectId, m.location, m.clusterName).Do() diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index 9bbe76c7746f..247a6f719a7c 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -22,6 +22,8 @@ import ( "regexp" "testing" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "github.com/stretchr/testify/assert" @@ -43,6 +45,9 @@ const ( autoprovisionedPoolMig = "gke-cluster-1-nodeautoprovisioning-323233232" autoprovisionedPool = "nodeautoprovisioning-323233232" clusterName = "cluster1" + + gceMigA = "gce-mig-a" + gceMigB = "gce-mig-b" ) const allNodePools1 = `{ @@ -218,8 +223,8 @@ const instanceGroupManager = `{ "creationTimestamp": "2017-09-15T04:47:24.687-07:00", "name": "%s", "zone": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s", - "instanceTemplate": "https://www.googleapis.com/compute/v1/projects/project1/global/instanceTemplates/gke-cluster-1-default-pool", - "instanceGroup": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups/gke-cluster-1-default-pool", + "instanceTemplate": "https://www.googleapis.com/compute/v1/projects/project1/global/instanceTemplates/%s", + "instanceGroup": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups/%s", "baseInstanceName": "gke-cluster-1-default-pool-f23aac-grp", "fingerprint": "kfdsuH", "currentActions": { @@ -352,26 +357,26 @@ const machineType = `{ const managedInstancesResponse1 = `{ "managedInstances": [ { - "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/gke-cluster-1-default-pool-f7607aac-9j4g", + "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/%s-f7607aac-9j4g", "id": "1974815549671473983", "instanceStatus": "RUNNING", "currentAction": "NONE" }, { - "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/gke-cluster-1-default-pool-f7607aac-c63g", + "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/%s-f7607aac-c63g", "currentAction": "RUNNING", "id": "197481554967143333", "instanceStatus": "RUNNING", "currentAction": "NONE" }, { - "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/gke-cluster-1-default-pool-f7607aac-dck1", + "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/%s-f7607aac-dck1", "id": "4462422841867240255", "instanceStatus": "RUNNING", "currentAction": "NONE" }, { - "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/gke-cluster-1-default-pool-f7607aac-f1hm", + "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/%s-f7607aac-f1hm", "id": "6309299611401323327", "instanceStatus": "RUNNING", "currentAction": "NONE" @@ -382,7 +387,7 @@ const managedInstancesResponse1 = `{ const managedInstancesResponse2 = `{ "managedInstances": [ { - "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instances/gke-cluster-1-nodeautoprovisioning-323233232-gdf607aac-9j4g", + "instance": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instances/%s-gdf607aac-9j4g", "id": "1974815323221473983", "instanceStatus": "RUNNING", "currentAction": "NONE" @@ -494,11 +499,11 @@ const getClusterResponse = `{ }` func getInstanceGroupManager(zone string) string { - return getInstanceGroupManagerNamed("gke-cluster-1-default-pool", zone) + return getInstanceGroupManagerNamed(defaultPoolMig, zone) } func getInstanceGroupManagerNamed(name, zone string) string { - return fmt.Sprintf(instanceGroupManager, name, zone, zone, zone, name) + return fmt.Sprintf(instanceGroupManager, name, zone, name, zone, name, zone, name) } func getMachineType(zone string) string { @@ -506,7 +511,19 @@ func getMachineType(zone string) string { } func getManagedInstancesResponse1(zone string) string { - return fmt.Sprintf(managedInstancesResponse1, zone, zone, zone, zone) + return getManagedInstancesResponse1Named(defaultPoolMig, zone) +} + +func getManagedInstancesResponse1Named(name, zone string) string { + return fmt.Sprintf(managedInstancesResponse1, zone, name, zone, name, zone, name, zone, name) +} + +func getManagedInstancesResponse2(zone string) string { + return getManagedInstancesResponse2Named(autoprovisionedPoolMig, zone) +} + +func getManagedInstancesResponse2Named(name, zone string) string { + return fmt.Sprintf(managedInstancesResponse2, zone, name) } func newTestGceManager(t *testing.T, testServerURL string, mode GcpCloudProviderMode, isRegional bool) *gceManagerImpl { @@ -526,6 +543,7 @@ func newTestGceManager(t *testing.T, testServerURL string, mode GcpCloudProvider projectId: projectId, service: gceService, }, + explicitlyConfigured: make(map[GceRef]bool), } if isRegional { @@ -599,7 +617,7 @@ func TestFetchAllNodePools(t *testing.T) { server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool").Return(instanceGroupManager).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(getManagedInstancesResponse1(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232").Return(getInstanceGroupManager(zoneB)).Once() - server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(managedInstancesResponse2).Once() + server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(getManagedInstancesResponse2(zoneB)).Once() err = g.fetchAllNodePools() assert.NoError(t, err) @@ -690,7 +708,7 @@ func TestDeleteNodePool(t *testing.T) { server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool").Return(instanceGroupManager).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(getManagedInstancesResponse1(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232").Return(getInstanceGroupManager(zoneB)).Once() - server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(managedInstancesResponse2).Once() + server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(getManagedInstancesResponse2(zoneB)).Once() mig := &Mig{ GceRef: GceRef{ @@ -749,7 +767,7 @@ func TestCreateNodePool(t *testing.T) { server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool").Return(getInstanceGroupManager(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(getManagedInstancesResponse1(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232").Return(getInstanceGroupManager(zoneB)).Once() - server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(managedInstancesResponse2).Once() + server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(getManagedInstancesResponse2(zoneB)).Once() mig := &Mig{ GceRef: GceRef{ @@ -905,7 +923,7 @@ func TestDeleteInstances(t *testing.T) { server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool").Return(getInstanceGroupManager(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(getManagedInstancesResponse1(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232").Return(getInstanceGroupManager(zoneB)).Once() - server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(managedInstancesResponse2).Once() + server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-nodeautoprovisioning-323233232/listManagedInstances").Return(getManagedInstancesResponse2(zoneB)).Once() server.On("handle", "/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/deleteInstances").Return(deleteInstancesResponse).Once() server.On("handle", "/project1/zones/us-central1-b/operations/operation-1505802641136-55984ff86d980-a99e8c2b-0c8aaaaa").Return(deleteInstancesOperationResponse).Once() @@ -1092,32 +1110,13 @@ func TestFetchResourceLimiter(t *testing.T) { server := NewHttpServerMock() defer server.Close() - // GCE. - g := newTestGceManager(t, server.URL, ModeGCE, false) + g := newTestGceManager(t, server.URL, ModeGKENAP, false) + server.On("handle", "/v1alpha1/projects/project1/zones/us-central1-b/clusters/cluster1").Return(getClusterResponse).Once() err := g.fetchResourceLimiter() assert.NoError(t, err) resourceLimiter, err := g.GetResourceLimiter() assert.NoError(t, err) - assert.Nil(t, resourceLimiter) - - // GKE. - g = newTestGceManager(t, server.URL, ModeGKE, false) - - err = g.fetchResourceLimiter() - assert.NoError(t, err) - resourceLimiter, err = g.GetResourceLimiter() - assert.NoError(t, err) - assert.Nil(t, resourceLimiter) - - // GKENAP. - g = newTestGceManager(t, server.URL, ModeGKENAP, false) - server.On("handle", "/v1alpha1/projects/project1/zones/us-central1-b/clusters/cluster1").Return(getClusterResponse).Once() - - err = g.fetchResourceLimiter() - assert.NoError(t, err) - resourceLimiter, err = g.GetResourceLimiter() - assert.NoError(t, err) assert.NotNil(t, resourceLimiter) mock.AssertExpectationsForObjects(t, server) @@ -1149,27 +1148,21 @@ const instanceGroupList = `{ func listInstanceGroups(zone string) string { return fmt.Sprintf(instanceGroupList, zone, - getInstanceGroupNamed("gce-pool-a", zone), - getInstanceGroupNamed("gce-pool-b", zone), + getInstanceGroupNamed(gceMigA, zone), + getInstanceGroupNamed(gceMigB, zone), zone, ) } -func TestFindMigsNamedZonal(t *testing.T) { - server := NewHttpServerMock() - defer server.Close() - - server.On("handle", "/project1/zones/us-central1-b/instanceGroups").Return(listInstanceGroups("us-central1-b")).Once() - - regional := false - g := newTestGceManager(t, server.URL, ModeGCE, regional) - links, err := g.findMigsNamed(regexp.MustCompile("^UNUSED")) - assert.NoError(t, err) +const noInstanceGroupList = `{ + "kind": "compute#instanceGroupList", + "id": "projects/project1a/zones/%s/instanceGroups", + "items": [], + "selfLink": "https://www.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups" +}` - assert.Equal(t, 2, len(links)) - assert.Equal(t, "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/gce-pool-a", links[0]) - assert.Equal(t, "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/gce-pool-b", links[1]) - mock.AssertExpectationsForObjects(t, server) +func listNoInstanceGroups(zone string) string { + return fmt.Sprintf(noInstanceGroupList, zone, zone) } const getRegion = `{ @@ -1180,40 +1173,155 @@ const getRegion = `{ "description": "us-central1", "status": "UP", "zones": [ - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-a", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-c", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-f" + "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b" ], "quotas": [], "selfLink": "https://www.googleapis.com/compute/v1/projects/project1/regions/us-central1" }` -func TestFindMigsNamedRegional(t *testing.T) { +func TestFetchAutoMigsZonal(t *testing.T) { + server := NewHttpServerMock() + defer server.Close() + + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroups").Return(listInstanceGroups(zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + + server.On("handle", "/project1/global/instanceTemplates/"+gceMigA).Return(instanceTemplate).Once() + server.On("handle", "/project1/global/instanceTemplates/"+gceMigB).Return(instanceTemplate).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA+"/listManagedInstances").Return(getManagedInstancesResponse1Named(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB+"/listManagedInstances").Return(getManagedInstancesResponse2Named(gceMigB, zoneB)).Once() + + regional := false + g := newTestGceManager(t, server.URL, ModeGCE, regional) + + min, max := 0, 100 + g.migAutoDiscoverySpecs = []cloudprovider.MIGAutoDiscoveryConfig{ + {Re: regexp.MustCompile("UNUSED"), MinSize: min, MaxSize: max}, + } + + assert.NoError(t, g.fetchAutoMigs()) + + migs := g.getMigs() + assert.Equal(t, 2, len(migs)) + validateMig(t, migs[0].config, zoneB, gceMigA, min, max) + validateMig(t, migs[1].config, zoneB, gceMigB, min, max) + mock.AssertExpectationsForObjects(t, server) +} +func TestFetchAutoMigsUnregistersMissingMigs(t *testing.T) { + server := NewHttpServerMock() + defer server.Close() + + // Register explicit instance group + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/global/instanceTemplates/"+gceMigA).Return(instanceTemplate).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Once() + + // Regenerate cache for explicit instance group + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Twice() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA+"/listManagedInstances").Return(getManagedInstancesResponse1Named(gceMigA, zoneB)).Twice() + + // Register 'previously autodetected' instance group + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + server.On("handle", "/project1/global/instanceTemplates/"+gceMigB).Return(instanceTemplate).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Once() + + regional := false + g := newTestGceManager(t, server.URL, ModeGCE, regional) + + // This MIG should never be unregistered because it is explicitly configured. + minA, maxA := 0, 100 + specs := []string{fmt.Sprintf("%d:%d:https://content.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups/%s", minA, maxA, zoneB, gceMigA)} + assert.NoError(t, g.fetchExplicitMigs(specs)) + + // This MIG was previously autodetected but is now gone. + // It should be unregistered. + unregister := &Mig{ + gceManager: g, + GceRef: GceRef{Project: projectId, Zone: zoneB, Name: gceMigB}, + minSize: 1, + maxSize: 10, + exist: true, + } + assert.True(t, g.RegisterMig(unregister)) + + assert.NoError(t, g.fetchAutoMigs()) + + migs := g.getMigs() + assert.Equal(t, 1, len(migs)) + validateMig(t, migs[0].config, zoneB, gceMigA, minA, maxA) + mock.AssertExpectationsForObjects(t, server) +} + +func TestFetchAutoMigsRegional(t *testing.T) { server := NewHttpServerMock() defer server.Close() server.On("handle", "/project1/regions/us-central1").Return(getRegion).Once() - server.On("handle", "/project1/zones/us-central1-a/instanceGroups").Return(listInstanceGroups("us-central1-a")).Once() - server.On("handle", "/project1/zones/us-central1-b/instanceGroups").Return(listInstanceGroups("us-central1-b")).Once() - server.On("handle", "/project1/zones/us-central1-c/instanceGroups").Return(listInstanceGroups("us-central1-c")).Once() - server.On("handle", "/project1/zones/us-central1-f/instanceGroups").Return(listInstanceGroups("us-central1-f")).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroups").Return(listInstanceGroups(zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + + server.On("handle", "/project1/global/instanceTemplates/"+gceMigA).Return(instanceTemplate).Once() + server.On("handle", "/project1/global/instanceTemplates/"+gceMigB).Return(instanceTemplate).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Twice() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA+"/listManagedInstances").Return(getManagedInstancesResponse1Named(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB+"/listManagedInstances").Return(getManagedInstancesResponse2Named(gceMigB, zoneB)).Once() regional := true g := newTestGceManager(t, server.URL, ModeGCE, regional) - got, err := g.findMigsNamed(regexp.MustCompile("^UNUSED")) - assert.NoError(t, err) - want := []string{ - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-a/instanceGroups/gce-pool-a", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-a/instanceGroups/gce-pool-b", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/gce-pool-a", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-b/instanceGroups/gce-pool-b", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-c/instanceGroups/gce-pool-a", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-c/instanceGroups/gce-pool-b", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-f/instanceGroups/gce-pool-a", - "https://www.googleapis.com/compute/v1/projects/project1/zones/us-central1-f/instanceGroups/gce-pool-b", + min, max := 0, 100 + g.migAutoDiscoverySpecs = []cloudprovider.MIGAutoDiscoveryConfig{ + {Re: regexp.MustCompile("UNUSED"), MinSize: min, MaxSize: max}, } - assert.Equal(t, want, got) + + assert.NoError(t, g.fetchAutoMigs()) + + migs := g.getMigs() + assert.Equal(t, 2, len(migs)) + validateMig(t, migs[0].config, zoneB, gceMigA, min, max) + validateMig(t, migs[1].config, zoneB, gceMigB, min, max) + mock.AssertExpectationsForObjects(t, server) +} + +func TestFetchExplicitMigs(t *testing.T) { + server := NewHttpServerMock() + defer server.Close() + + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + + server.On("handle", "/project1/global/instanceTemplates/"+gceMigA).Return(instanceTemplate).Once() + server.On("handle", "/project1/global/instanceTemplates/"+gceMigB).Return(instanceTemplate).Once() + server.On("handle", "/project1/zones/"+zoneB+"/machineTypes/n1-standard-1").Return(getMachineType(zoneB)).Twice() + + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA).Return(getInstanceGroupManagerNamed(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigA+"/listManagedInstances").Return(getManagedInstancesResponse1Named(gceMigA, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB).Return(getInstanceGroupManagerNamed(gceMigB, zoneB)).Once() + server.On("handle", "/project1/zones/"+zoneB+"/instanceGroupManagers/"+gceMigB+"/listManagedInstances").Return(getManagedInstancesResponse2Named(gceMigB, zoneB)).Once() + + regional := false + g := newTestGceManager(t, server.URL, ModeGCE, regional) + + minA, maxA := 0, 100 + minB, maxB := 1, 10 + specs := []string{ + fmt.Sprintf("%d:%d:https://content.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups/%s", minA, maxA, zoneB, gceMigA), + fmt.Sprintf("%d:%d:https://content.googleapis.com/compute/v1/projects/project1/zones/%s/instanceGroups/%s", minB, maxB, zoneB, gceMigB), + } + + assert.NoError(t, g.fetchExplicitMigs(specs)) + + migs := g.getMigs() + assert.Equal(t, 2, len(migs)) + validateMig(t, migs[0].config, zoneB, gceMigA, minA, maxA) + validateMig(t, migs[1].config, zoneB, gceMigB, minB, maxB) mock.AssertExpectationsForObjects(t, server) } diff --git a/cluster-autoscaler/cloudprovider/node_group_discovery_options.go b/cluster-autoscaler/cloudprovider/node_group_discovery_options.go index 56db3017d782..6420fe9df8c6 100644 --- a/cluster-autoscaler/cloudprovider/node_group_discovery_options.go +++ b/cluster-autoscaler/cloudprovider/node_group_discovery_options.go @@ -16,7 +16,30 @@ limitations under the License. package cloudprovider -import "fmt" +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" +) + +const ( + autoDiscovererTypeMIG = "mig" + autoDiscovererTypeASG = "asg" + + migAutoDiscovererKeyPrefix = "namePrefix" + migAutoDiscovererKeyMinNodes = "min" + migAutoDiscovererKeyMaxNodes = "max" + + asgAutoDiscovererKeyTag = "tag" +) + +var validMIGAutoDiscovererKeys = strings.Join([]string{ + migAutoDiscovererKeyPrefix, + migAutoDiscovererKeyMinNodes, + migAutoDiscovererKeyMaxNodes, +}, ", ") // NodeGroupDiscoveryOptions contains various options to configure how a cloud provider discovers node groups type NodeGroupDiscoveryOptions struct { @@ -36,16 +59,132 @@ func (o NodeGroupDiscoveryOptions) AutoDiscoverySpecified() bool { return len(o.NodeGroupAutoDiscoverySpecs) > 0 } -// NoDiscoverySpecified returns true expected nly when there were no --nodes or -// --node-group-auto-discovery flags specified. This is expected in GKE. -func (o NodeGroupDiscoveryOptions) NoDiscoverySpecified() bool { - return !o.StaticDiscoverySpecified() && !o.AutoDiscoverySpecified() +// DiscoverySpecified returns true when at least one of the --nodes or +// --node-group-auto-discovery flags specified. +func (o NodeGroupDiscoveryOptions) DiscoverySpecified() bool { + return o.StaticDiscoverySpecified() || o.AutoDiscoverySpecified() +} + +// ParseMIGAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs +// parsed into configuration appropriate for MIG autodiscovery. +func (o NodeGroupDiscoveryOptions) ParseMIGAutoDiscoverySpecs() ([]MIGAutoDiscoveryConfig, error) { + cfgs := make([]MIGAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs)) + var err error + for i, spec := range o.NodeGroupAutoDiscoverySpecs { + cfgs[i], err = parseMIGAutoDiscoverySpec(spec) + if err != nil { + return nil, err + } + } + return cfgs, nil +} + +// ParseASGAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs +// parsed into configuration appropriate for ASG autodiscovery. +func (o NodeGroupDiscoveryOptions) ParseASGAutoDiscoverySpecs() ([]ASGAutoDiscoveryConfig, error) { + cfgs := make([]ASGAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs)) + var err error + for i, spec := range o.NodeGroupAutoDiscoverySpecs { + cfgs[i], err = parseASGAutoDiscoverySpec(spec) + if err != nil { + return nil, err + } + } + return cfgs, nil +} + +// A MIGAutoDiscoveryConfig specifies how to autodiscover GCE MIGs. +type MIGAutoDiscoveryConfig struct { + // Re is a regexp passed using the eq filter to the GCE list API. + Re *regexp.Regexp + // MinSize specifies the minimum size for all MIGs that match Re. + MinSize int + // MaxSize specifies the maximum size for all MIGs that match Re. + MaxSize int } -// Validate returns and error when both --nodes and --node-group-auto-discovery are specified -func (o NodeGroupDiscoveryOptions) Validate() error { - if o.StaticDiscoverySpecified() && o.AutoDiscoverySpecified() { - return fmt.Errorf("Either node group specs(%v) or node group auto discovery spec(%v) can be specified but not both", o.NodeGroupSpecs, o.NodeGroupAutoDiscoverySpecs) +func parseMIGAutoDiscoverySpec(spec string) (MIGAutoDiscoveryConfig, error) { + cfg := MIGAutoDiscoveryConfig{} + + tokens := strings.Split(spec, ":") + if len(tokens) != 2 { + return cfg, fmt.Errorf("spec \"%s\" should be discoverer:key=value,key=value", spec) + } + discoverer := tokens[0] + if discoverer != autoDiscovererTypeMIG { + return cfg, fmt.Errorf("unsupported discoverer specified: %s", discoverer) + } + + for _, arg := range strings.Split(tokens[1], ",") { + kv := strings.Split(arg, "=") + if len(kv) != 2 { + return cfg, fmt.Errorf("invalid key=value pair %s", kv) + } + k, v := kv[0], kv[1] + + var err error + switch k { + case migAutoDiscovererKeyPrefix: + if cfg.Re, err = regexp.Compile(fmt.Sprintf("^%s.+", v)); err != nil { + return cfg, fmt.Errorf("invalid instance group name prefix \"%s\" - \"^%s.+\" must be a valid RE2 regexp", v, v) + } + case migAutoDiscovererKeyMinNodes: + if cfg.MinSize, err = strconv.Atoi(v); err != nil { + return cfg, fmt.Errorf("invalid minimum nodes: %s", v) + } + case migAutoDiscovererKeyMaxNodes: + if cfg.MaxSize, err = strconv.Atoi(v); err != nil { + return cfg, fmt.Errorf("invalid maximum nodes: %s", v) + } + default: + return cfg, fmt.Errorf("unsupported key \"%s\" is specified for discoverer \"%s\". Supported keys are \"%s\"", k, discoverer, validMIGAutoDiscovererKeys) + } + } + if cfg.Re == nil || cfg.Re.String() == "^.+" { + return cfg, errors.New("empty instance group name prefix supplied") + } + if cfg.MinSize > cfg.MaxSize { + return cfg, fmt.Errorf("minimum size %d is greater than maximum size %d", cfg.MinSize, cfg.MaxSize) + } + if cfg.MaxSize < 1 { + return cfg, fmt.Errorf("maximum size %d must be at least 1", cfg.MaxSize) + } + return cfg, nil +} + +// An ASGAutoDiscoveryConfig specifies how to autodiscover AWS ASGs. +type ASGAutoDiscoveryConfig struct { + // TagKeys to match on. + // Any ASG with all of the provided tag keys will be autoscaled. + TagKeys []string +} + +func parseASGAutoDiscoverySpec(spec string) (ASGAutoDiscoveryConfig, error) { + cfg := ASGAutoDiscoveryConfig{} + + tokens := strings.Split(spec, ":") + if len(tokens) != 2 { + return cfg, fmt.Errorf("Invalid node group auto discovery spec specified via --node-group-auto-discovery: %s", spec) + } + discoverer := tokens[0] + if discoverer != autoDiscovererTypeASG { + return cfg, fmt.Errorf("Unsupported discoverer specified: %s", discoverer) + } + param := tokens[1] + kv := strings.Split(param, "=") + if len(kv) != 2 { + return cfg, fmt.Errorf("invalid key=value pair %s", kv) + } + k, v := kv[0], kv[1] + if k != asgAutoDiscovererKeyTag { + return cfg, fmt.Errorf("Unsupported parameter key \"%s\" is specified for discoverer \"%s\". The only supported key is \"%s\"", k, discoverer, asgAutoDiscovererKeyTag) + } + if v == "" { + return cfg, errors.New("tag value not supplied") + } + cfg.TagKeys = strings.Split(v, ",") + if len(cfg.TagKeys) == 0 { + return cfg, fmt.Errorf("Invalid ASG tag for auto discovery specified: ASG tag must not be empty") } - return nil + return cfg, nil } diff --git a/cluster-autoscaler/cloudprovider/node_group_discovery_options_test.go b/cluster-autoscaler/cloudprovider/node_group_discovery_options_test.go index c8af369f93f4..6db0f270d429 100644 --- a/cluster-autoscaler/cloudprovider/node_group_discovery_options_test.go +++ b/cluster-autoscaler/cloudprovider/node_group_discovery_options_test.go @@ -17,37 +17,171 @@ limitations under the License. package cloudprovider import ( - "fmt" + "regexp" "testing" "github.com/stretchr/testify/assert" ) -func TestNodeGroupDiscoveryOptionsValidate(t *testing.T) { - o := NodeGroupDiscoveryOptions{ - NodeGroupAutoDiscoverySpecs: []string{"asg:tag=foobar"}, - NodeGroupSpecs: []string{"myasg:0:10"}, +func TestParseMIGAutoDiscoverySpecs(t *testing.T) { + cases := []struct { + name string + specs []string + want []MIGAutoDiscoveryConfig + wantErr bool + }{ + { + name: "GoodSpecs", + specs: []string{ + "mig:namePrefix=pfx,min=0,max=10", + "mig:namePrefix=anotherpfx,min=1,max=2", + }, + want: []MIGAutoDiscoveryConfig{ + {Re: regexp.MustCompile("^pfx.+"), MinSize: 0, MaxSize: 10}, + {Re: regexp.MustCompile("^anotherpfx.+"), MinSize: 1, MaxSize: 2}, + }, + }, + { + name: "MissingMIGType", + specs: []string{"namePrefix=pfx,min=0,max=10"}, + wantErr: true, + }, + { + name: "WrongType", + specs: []string{"asg:namePrefix=pfx,min=0,max=10"}, + wantErr: true, + }, + { + name: "UnknownKey", + specs: []string{"mig:namePrefix=pfx,min=0,max=10,unknown=hi"}, + wantErr: true, + }, + { + name: "NonIntegerMin", + specs: []string{"mig:namePrefix=pfx,min=a,max=10"}, + wantErr: true, + }, + { + name: "NonIntegerMax", + specs: []string{"mig:namePrefix=pfx,min=1,max=donkey"}, + wantErr: true, + }, + { + name: "PrefixDoesNotCompileToRegexp", + specs: []string{"mig:namePrefix=a),min=1,max=10"}, + wantErr: true, + }, + { + name: "KeyMissingValue", + specs: []string{"mig:namePrefix=prefix,min=,max=10"}, + wantErr: true, + }, + { + name: "ValueMissingKey", + specs: []string{"mig:namePrefix=prefix,=0,max=10"}, + wantErr: true, + }, + { + name: "KeyMissingSeparator", + specs: []string{"mig:namePrefix=prefix,min,max=10"}, + wantErr: true, + }, + { + name: "TooManySeparators", + specs: []string{"mig:namePrefix=prefix,min=0,max=10=20"}, + wantErr: true, + }, + { + name: "PrefixIsEmpty", + specs: []string{"mig:namePrefix=,min=0,max=10"}, + wantErr: true, + }, + { + name: "PrefixIsMissing", + specs: []string{"mig:min=0,max=10"}, + wantErr: true, + }, + { + name: "MaxBelowMin", + specs: []string{"mig:namePrefix=prefix,min=10,max=1"}, + wantErr: true, + }, + { + name: "MaxIsZero", + specs: []string{"mig:namePrefix=prefix,min=0,max=0"}, + wantErr: true, + }, } - err := o.Validate() - if err == nil { - t.Errorf("Expected validation error didn't occur with NodeGroupDiscoveryOptions: %+v", o) - t.FailNow() - } - if msg := fmt.Sprintf("%v", err); msg != `Either node group specs([myasg:0:10]) or node group auto discovery spec([asg:tag=foobar]) can be specified but not both` { - t.Errorf("Unexpected validation error message: %s", msg) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + do := NodeGroupDiscoveryOptions{NodeGroupAutoDiscoverySpecs: tc.specs} + got, err := do.ParseMIGAutoDiscoverySpecs() + if tc.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.True(t, assert.ObjectsAreEqualValues(tc.want, got), "\ngot: %#v\nwant: %#v", got, tc.want) + }) } +} - o = NodeGroupDiscoveryOptions{ - NodeGroupAutoDiscoverySpecs: []string{ - "mig:prefix=iga,min=0,max=10", - "mig:prefix=igb,min=0,max=20", +func TestParseASGAutoDiscoverySpecs(t *testing.T) { + cases := []struct { + name string + specs []string + want []ASGAutoDiscoveryConfig + wantErr bool + }{ + { + name: "GoodSpecs", + specs: []string{ + "asg:tag=tag,anothertag", + "asg:tag=cooltag,anothertag", + }, + want: []ASGAutoDiscoveryConfig{ + {TagKeys: []string{"tag", "anothertag"}}, + {TagKeys: []string{"cooltag", "anothertag"}}, + }, + }, + { + name: "MissingASGType", + specs: []string{"tag=tag,anothertag"}, + wantErr: true, + }, + { + name: "WrongType", + specs: []string{"mig:tag=tag,anothertag"}, + wantErr: true, + }, + { + name: "KeyMissingValue", + specs: []string{"asg:tag="}, + wantErr: true, + }, + { + name: "ValueMissingKey", + specs: []string{"asg:=tag"}, + wantErr: true, + }, + { + name: "KeyMissingSeparator", + specs: []string{"asg:tag"}, + wantErr: true, }, } - assert.NoError(t, o.Validate()) - o = NodeGroupDiscoveryOptions{ - NodeGroupSpecs: []string{"myasg:0:10"}, + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + do := NodeGroupDiscoveryOptions{NodeGroupAutoDiscoverySpecs: tc.specs} + got, err := do.ParseASGAutoDiscoverySpecs() + if tc.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.True(t, assert.ObjectsAreEqualValues(tc.want, got), "\ngot: %#v\nwant: %#v", got, tc.want) + }) } - assert.NoError(t, o.Validate()) } diff --git a/cluster-autoscaler/cloudprovider/util_test.go b/cluster-autoscaler/cloudprovider/util_test.go index ced127ca2554..ba23d61807a3 100644 --- a/cluster-autoscaler/cloudprovider/util_test.go +++ b/cluster-autoscaler/cloudprovider/util_test.go @@ -38,6 +38,9 @@ func TestBuildKubeProxy(t *testing.T) { pod := BuildKubeProxy("kube-proxy") assert.NotNil(t, pod) + assert.Equal(t, 1, len(pod.Spec.Containers)) + cpu := pod.Spec.Containers[0].Resources.Requests[apiv1.ResourceCPU] + assert.Equal(t, int64(100), cpu.MilliValue()) } func TestJoinStringMaps(t *testing.T) { diff --git a/cluster-autoscaler/config/dynamic/node_group_spec.go b/cluster-autoscaler/config/dynamic/node_group_spec.go index cec991b2bd64..04535caefc00 100644 --- a/cluster-autoscaler/config/dynamic/node_group_spec.go +++ b/cluster-autoscaler/config/dynamic/node_group_spec.go @@ -30,18 +30,18 @@ type NodeGroupSpec struct { MinSize int `json:"minSize"` // Max size of the autoscaling target MaxSize int `json:"maxSize"` - - supportScaleToZero bool + // Specifies whether this node group can scale to zero nodes. + SupportScaleToZero bool } // SpecFromString parses a node group spec represented in the form of `::` and produces a node group spec object -func SpecFromString(value string, supportScaleToZero bool) (*NodeGroupSpec, error) { +func SpecFromString(value string, SupportScaleToZero bool) (*NodeGroupSpec, error) { tokens := strings.SplitN(value, ":", 3) if len(tokens) != 3 { return nil, fmt.Errorf("wrong nodes configuration: %s", value) } - spec := NodeGroupSpec{supportScaleToZero: supportScaleToZero} + spec := NodeGroupSpec{SupportScaleToZero: SupportScaleToZero} if size, err := strconv.Atoi(tokens[0]); err == nil { spec.MinSize = size @@ -66,7 +66,7 @@ func SpecFromString(value string, supportScaleToZero bool) (*NodeGroupSpec, erro // Validate produces an error if there's an invalid field in the node group spec func (s NodeGroupSpec) Validate() error { - if s.supportScaleToZero { + if s.SupportScaleToZero { if s.MinSize < 0 { return fmt.Errorf("min size must be >= 0") } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 889eb4576465..5d49b5ac267a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -286,7 +286,7 @@ func main() { flag.Var(&nodeGroupAutoDiscoveryFlag, "node-group-auto-discovery", "One or more definition(s) of node group auto-discovery. "+ "A definition is expressed `:[[=]]`. "+ "The `aws` and `gce` cloud providers are currently supported. AWS matches by ASG tags, e.g. `asg:tag=tagKey,anotherTagKey`. "+ - "GCE matches by IG prefix, and requires you to specify min and max nodes per IG, e.g. `mig:prefix=pfx,min=0,max=10` "+ + "GCE matches by IG name prefix, and requires you to specify min and max nodes per IG, e.g. `mig:namePrefix=pfx,min=0,max=10` "+ "Can be used multiple times.") kube_flag.InitFlags()