Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support autodetection of GCE managed instance groups by name prefix #462

Merged
merged 4 commits into from
Dec 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 37 additions & 86 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package aws

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,8 +27,9 @@ import (
// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
type autoScaling interface {
DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error)
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error)
DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error)
DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error
SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error)
TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error)
}
Expand Down Expand Up @@ -73,55 +73,29 @@ func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling
}

func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names)

if len(names) < 1 {
glog.V(4).Info("Failed to describe ASGs: Must specify at least one ASG name.")
return nil, fmt.Errorf("List of ASG names was empty. Must specify at least one ASG name")
}

nameRefs := []*string{}
for _, n := range names {
nameRefs = append(nameRefs, aws.String(n))
if len(names) == 0 {
return nil, nil
}
params := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: nameRefs,
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice(names),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
}
description, err := m.DescribeAutoScalingGroups(params)
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
asgs := make([]*autoscaling.Group, 0)
if err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool {
asgs = append(asgs, output.AutoScalingGroups...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}
if len(description.AutoScalingGroups) < 1 {
return nil, errors.New("No ASGs found")
}

asgs := description.AutoScalingGroups
for description.NextToken != nil {
description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
return nil, err
}
asgs = append(asgs, description.AutoScalingGroups...)
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs)

return asgs, nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByTag with keys=%v", keys)

numKeys := len(keys)

// DescribeTags does an OR query when multiple filters on different tags are specified.
// In other words, DescribeTags returns [asg1, asg1] for keys [t1, t2] when there's only one asg tagged both t1 and t2.
// DescribeTags does an OR query when multiple filters on different tags are
// specified. In other words, DescribeTags returns [asg1, asg1] for keys
// [t1, t2] when there's only one asg tagged both t1 and t2.
filters := []*autoscaling.Filter{}
for _, key := range keys {
filter := &autoscaling.Filter{
Expand All @@ -130,58 +104,35 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autos
}
filters = append(filters, filter)
}
description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{

tags := []*autoscaling.TagDescription{}
input := &autoscaling.DescribeTagsInput{
Filters: filters,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for keys %v : %v", keys, err)
return nil, err
}
if len(description.Tags) < 1 {
return nil, fmt.Errorf("Unable to find ASGs for tag keys %v", keys)
}
tags := []*autoscaling.TagDescription{}
tags = append(tags, description.Tags...)

for description.NextToken != nil {
description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for key %v: %v", keys, err)
return nil, err
}
tags = append(tags, description.Tags...)
if err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool {
tags = append(tags, out.Tags...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}

// De-duplicate asg names
asgNameOccurrences := map[string]int{}
for _, t := range tags {
asgName := *(t.ResourceId)
if n, ok := asgNameOccurrences[asgName]; ok {
asgNameOccurrences[asgName] = n + 1
} else {
asgNameOccurrences[asgName] = 1
}
}
// Accordingly to how DescribeTags API works, the result contains ASGs which not all but only subset of tags are associated.
// Explicitly select ASGs to which all the tags are associated so that we won't end up calling DescribeAutoScalingGroups API
// multiple times on an ASG
// According to how DescribeTags API works, the result contains ASGs which
// not all but only subset of tags are associated. Explicitly select ASGs to
// which all the tags are associated so that we won't end up calling
// DescribeAutoScalingGroups API multiple times on an ASG.
asgNames := []string{}
for asgName, n := range asgNameOccurrences {
if n == numKeys {
asgNameOccurrences := make(map[string]int)
for _, t := range tags {
asgName := aws.StringValue(t.ResourceId)
occurrences := asgNameOccurrences[asgName] + 1
if occurrences >= len(keys) {
asgNames = append(asgNames, asgName)
}
asgNameOccurrences[asgName] = occurrences
}

asgs, err := m.getAutoscalingGroupsByNames(asgNames)
if err != nil {
return nil, err
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs)

return asgs, nil
return m.getAutoscalingGroupsByNames(asgNames)
}
153 changes: 114 additions & 39 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,155 @@ package aws

import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go/aws"
"github.com/golang/glog"
)

type autoScalingGroups struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
cacheMutex sync.Mutex
instancesNotInManagedAsg map[AwsRef]struct{}
service autoScalingWrapper
const scaleToZeroSupported = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was scale to zero disabled for AWS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing discussion we had elsewhere: this was totally accidental - sorry!


type asgCache struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
notInRegisteredAsg map[AwsRef]bool
mutex sync.Mutex
service autoScalingWrapper
interrupt chan struct{}
}

func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups {
registry := &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
func newASGCache(service autoScalingWrapper) (*asgCache, error) {
registry := &asgCache{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
notInRegisteredAsg: make(map[AwsRef]bool),
interrupt: make(chan struct{}),
}
return registry
go wait.Until(func() {
registry.mutex.Lock()
defer registry.mutex.Unlock()
if err := registry.regenerate(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour, registry.interrupt)

return registry, nil
}

// Register registers asg in Aws Manager.
func (m *autoScalingGroups) Register(asg *Asg) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
// Register ASG. Returns true if the ASG was registered.
func (m *asgCache) Register(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

for i := range m.registeredAsgs {
if existing := m.registeredAsgs[i].config; existing.AwsRef == asg.AwsRef {
if reflect.DeepEqual(existing, asg) {
return false
}
m.registeredAsgs[i].config = asg
glog.V(4).Infof("Updated ASG %s", asg.AwsRef.Name)
m.invalidateUnownedInstanceCache()
return true
}
}

glog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
m.registeredAsgs = append(m.registeredAsgs, &asgInformation{
config: asg,
})
m.invalidateUnownedInstanceCache()
return true
}

// Unregister ASG. Returns true if the ASG was unregistered.
func (m *asgCache) Unregister(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

updated := make([]*asgInformation, 0, len(m.registeredAsgs))
changed := false
for _, existing := range m.registeredAsgs {
if existing.config.AwsRef == asg.AwsRef {
glog.V(1).Infof("Unregistered ASG %s", asg.AwsRef.Name)
changed = true
continue
}
updated = append(updated, existing)
}
m.registeredAsgs = updated
return changed
}

func (m *asgCache) get() []*asgInformation {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.registeredAsgs
}

// FindForInstance returns AsgConfig of the given Instance
func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
func (m *asgCache) FindForInstance(instance *AwsRef) (*Asg, error) {
// TODO(negz): Prevent this calling describe ASGs too often.
m.mutex.Lock()
defer m.mutex.Unlock()

if m.notInRegisteredAsg[*instance] {
// We already know we don't own this instance. Return early and avoid
// additional calls to describe ASGs.
return nil, nil
}

if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
if _, found := m.instancesNotInManagedAsg[*instance]; found {
// The instance is already known to not belong to any configured ASG
// Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups
// See https://github.com/kubernetes/contrib/issues/2541
return nil, nil
}
if err := m.regenerateCache(); err != nil {
if err := m.regenerate(); err != nil {
return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err)
}
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
// instance does not belong to any configured ASG
glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance)
m.instancesNotInManagedAsg[*instance] = struct{}{}

m.notInRegisteredAsg[*instance] = true
return nil, nil
}

func (m *autoScalingGroups) regenerateCache() error {
func (m *asgCache) invalidateUnownedInstanceCache() {
glog.V(4).Info("Invalidating unowned instance cache")
m.notInRegisteredAsg = make(map[AwsRef]bool)
}

func (m *asgCache) regenerate() error {
newCache := make(map[AwsRef]*Asg)

for _, asg := range m.registeredAsgs {
glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name)
names := make([]string, len(m.registeredAsgs))
configs := make(map[string]*Asg)
for i, asg := range m.registeredAsgs {
names[i] = asg.config.Name
configs[asg.config.Name] = asg.config
}

group, err := m.service.getAutoscalingGroupByName(asg.config.Name)
if err != nil {
return err
}
glog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", names)
groups, err := m.service.getAutoscalingGroupsByNames(names)
if err != nil {
return err
}
for _, group := range groups {
for _, instance := range group.Instances {
ref := AwsRef{Name: *instance.InstanceId}
newCache[ref] = asg.config
ref := AwsRef{Name: aws.StringValue(instance.InstanceId)}
newCache[ref] = configs[aws.StringValue(group.AutoScalingGroupName)]
}
}

m.instanceToAsg = newCache
return nil
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
Loading