Skip to content

Commit

Permalink
Replace the Polling Autoscaler
Browse files Browse the repository at this point in the history
Node group discovery is now handled by cloudprovider.Refresh() in all cases.
Additionally, explicit node groups can now be used alongside autodiscovery.
  • Loading branch information
Nic Cope committed Dec 11, 2017
1 parent 6a704a6 commit e96ff07
Show file tree
Hide file tree
Showing 16 changed files with 1,274 additions and 893 deletions.
123 changes: 37 additions & 86 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package aws

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,8 +27,9 @@ import (
// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
type autoScaling interface {
DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error)
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error)
DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error)
DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error
SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error)
TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error)
}
Expand Down Expand Up @@ -73,55 +73,29 @@ func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling
}

func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names)

if len(names) < 1 {
glog.V(4).Info("Failed to describe ASGs: Must specify at least one ASG name.")
return nil, fmt.Errorf("List of ASG names was empty. Must specify at least one ASG name")
}

nameRefs := []*string{}
for _, n := range names {
nameRefs = append(nameRefs, aws.String(n))
if len(names) == 0 {
return nil, nil
}
params := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: nameRefs,
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice(names),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
}
description, err := m.DescribeAutoScalingGroups(params)
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
asgs := make([]*autoscaling.Group, 0)
if err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool {
asgs = append(asgs, output.AutoScalingGroups...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}
if len(description.AutoScalingGroups) < 1 {
return nil, errors.New("No ASGs found")
}

asgs := description.AutoScalingGroups
for description.NextToken != nil {
description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
return nil, err
}
asgs = append(asgs, description.AutoScalingGroups...)
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs)

return asgs, nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByTag with keys=%v", keys)

numKeys := len(keys)

// DescribeTags does an OR query when multiple filters on different tags are specified.
// In other words, DescribeTags returns [asg1, asg1] for keys [t1, t2] when there's only one asg tagged both t1 and t2.
// DescribeTags does an OR query when multiple filters on different tags are
// specified. In other words, DescribeTags returns [asg1, asg1] for keys
// [t1, t2] when there's only one asg tagged both t1 and t2.
filters := []*autoscaling.Filter{}
for _, key := range keys {
filter := &autoscaling.Filter{
Expand All @@ -130,58 +104,35 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autos
}
filters = append(filters, filter)
}
description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{

tags := []*autoscaling.TagDescription{}
input := &autoscaling.DescribeTagsInput{
Filters: filters,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for keys %v : %v", keys, err)
return nil, err
}
if len(description.Tags) < 1 {
return nil, fmt.Errorf("Unable to find ASGs for tag keys %v", keys)
}
tags := []*autoscaling.TagDescription{}
tags = append(tags, description.Tags...)

for description.NextToken != nil {
description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for key %v: %v", keys, err)
return nil, err
}
tags = append(tags, description.Tags...)
if err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool {
tags = append(tags, out.Tags...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}

// De-duplicate asg names
asgNameOccurrences := map[string]int{}
for _, t := range tags {
asgName := *(t.ResourceId)
if n, ok := asgNameOccurrences[asgName]; ok {
asgNameOccurrences[asgName] = n + 1
} else {
asgNameOccurrences[asgName] = 1
}
}
// Accordingly to how DescribeTags API works, the result contains ASGs which not all but only subset of tags are associated.
// Explicitly select ASGs to which all the tags are associated so that we won't end up calling DescribeAutoScalingGroups API
// multiple times on an ASG
// According to how DescribeTags API works, the result contains ASGs which
// not all but only subset of tags are associated. Explicitly select ASGs to
// which all the tags are associated so that we won't end up calling
// DescribeAutoScalingGroups API multiple times on an ASG.
asgNames := []string{}
for asgName, n := range asgNameOccurrences {
if n == numKeys {
asgNameOccurrences := make(map[string]int)
for _, t := range tags {
asgName := aws.StringValue(t.ResourceId)
occurrences := asgNameOccurrences[asgName] + 1
if occurrences >= len(keys) {
asgNames = append(asgNames, asgName)
}
asgNameOccurrences[asgName] = occurrences
}

asgs, err := m.getAutoscalingGroupsByNames(asgNames)
if err != nil {
return nil, err
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs)

return asgs, nil
return m.getAutoscalingGroupsByNames(asgNames)
}
153 changes: 114 additions & 39 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,155 @@ package aws

import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go/aws"
"github.com/golang/glog"
)

type autoScalingGroups struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
cacheMutex sync.Mutex
instancesNotInManagedAsg map[AwsRef]struct{}
service autoScalingWrapper
const scaleToZeroSupported = false

type asgCache struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
notInRegisteredAsg map[AwsRef]bool
mutex sync.Mutex
service autoScalingWrapper
interrupt chan struct{}
}

func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups {
registry := &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
func newASGCache(service autoScalingWrapper) (*asgCache, error) {
registry := &asgCache{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
notInRegisteredAsg: make(map[AwsRef]bool),
interrupt: make(chan struct{}),
}
return registry
go wait.Until(func() {
registry.mutex.Lock()
defer registry.mutex.Unlock()
if err := registry.regenerate(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour, registry.interrupt)

return registry, nil
}

// Register registers asg in Aws Manager.
func (m *autoScalingGroups) Register(asg *Asg) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
// Register ASG. Returns true if the ASG was registered.
func (m *asgCache) Register(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

for i := range m.registeredAsgs {
if existing := m.registeredAsgs[i].config; existing.AwsRef == asg.AwsRef {
if reflect.DeepEqual(existing, asg) {
return false
}
m.registeredAsgs[i].config = asg
glog.V(4).Infof("Updated ASG %s", asg.AwsRef.Name)
m.invalidateUnownedInstanceCache()
return true
}
}

glog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
m.registeredAsgs = append(m.registeredAsgs, &asgInformation{
config: asg,
})
m.invalidateUnownedInstanceCache()
return true
}

// Unregister ASG. Returns true if the ASG was unregistered.
func (m *asgCache) Unregister(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

updated := make([]*asgInformation, 0, len(m.registeredAsgs))
changed := false
for _, existing := range m.registeredAsgs {
if existing.config.AwsRef == asg.AwsRef {
glog.V(1).Infof("Unregistered ASG %s", asg.AwsRef.Name)
changed = true
continue
}
updated = append(updated, existing)
}
m.registeredAsgs = updated
return changed
}

func (m *asgCache) get() []*asgInformation {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.registeredAsgs
}

// FindForInstance returns AsgConfig of the given Instance
func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
func (m *asgCache) FindForInstance(instance *AwsRef) (*Asg, error) {
// TODO(negz): Prevent this calling describe ASGs too often.
m.mutex.Lock()
defer m.mutex.Unlock()

if m.notInRegisteredAsg[*instance] {
// We already know we don't own this instance. Return early and avoid
// additional calls to describe ASGs.
return nil, nil
}

if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
if _, found := m.instancesNotInManagedAsg[*instance]; found {
// The instance is already known to not belong to any configured ASG
// Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups
// See https://github.com/kubernetes/contrib/issues/2541
return nil, nil
}
if err := m.regenerateCache(); err != nil {
if err := m.regenerate(); err != nil {
return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err)
}
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
// instance does not belong to any configured ASG
glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance)
m.instancesNotInManagedAsg[*instance] = struct{}{}

m.notInRegisteredAsg[*instance] = true
return nil, nil
}

func (m *autoScalingGroups) regenerateCache() error {
func (m *asgCache) invalidateUnownedInstanceCache() {
glog.V(4).Info("Invalidating unowned instance cache")
m.notInRegisteredAsg = make(map[AwsRef]bool)
}

func (m *asgCache) regenerate() error {
newCache := make(map[AwsRef]*Asg)

for _, asg := range m.registeredAsgs {
glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name)
names := make([]string, len(m.registeredAsgs))
configs := make(map[string]*Asg)
for i, asg := range m.registeredAsgs {
names[i] = asg.config.Name
configs[asg.config.Name] = asg.config
}

group, err := m.service.getAutoscalingGroupByName(asg.config.Name)
if err != nil {
return err
}
glog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", names)
groups, err := m.service.getAutoscalingGroupsByNames(names)
if err != nil {
return err
}
for _, group := range groups {
for _, instance := range group.Instances {
ref := AwsRef{Name: *instance.InstanceId}
newCache[ref] = asg.config
ref := AwsRef{Name: aws.StringValue(instance.InstanceId)}
newCache[ref] = configs[aws.StringValue(group.AutoScalingGroupName)]
}
}

m.instanceToAsg = newCache
return nil
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
Loading

0 comments on commit e96ff07

Please sign in to comment.