Skip to content

Commit

Permalink
Merge pull request #462 from negz/gcedisco
Browse files Browse the repository at this point in the history
Support autodetection of GCE managed instance groups by name prefix
  • Loading branch information
mwielgus authored Dec 18, 2017
2 parents af51acc + e96ff07 commit 88d97c2
Show file tree
Hide file tree
Showing 21 changed files with 1,559 additions and 926 deletions.
123 changes: 37 additions & 86 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package aws

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -28,8 +27,9 @@ import (
// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
type autoScaling interface {
DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error)
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error)
DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error)
DescribeTagsPages(input *autoscaling.DescribeTagsInput, fn func(*autoscaling.DescribeTagsOutput, bool) bool) error
SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error)
TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error)
}
Expand Down Expand Up @@ -73,55 +73,29 @@ func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling
}

func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names)

if len(names) < 1 {
glog.V(4).Info("Failed to describe ASGs: Must specify at least one ASG name.")
return nil, fmt.Errorf("List of ASG names was empty. Must specify at least one ASG name")
}

nameRefs := []*string{}
for _, n := range names {
nameRefs = append(nameRefs, aws.String(n))
if len(names) == 0 {
return nil, nil
}
params := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: nameRefs,
input := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice(names),
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
}
description, err := m.DescribeAutoScalingGroups(params)
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
asgs := make([]*autoscaling.Group, 0)
if err := m.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool {
asgs = append(asgs, output.AutoScalingGroups...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}
if len(description.AutoScalingGroups) < 1 {
return nil, errors.New("No ASGs found")
}

asgs := description.AutoScalingGroups
for description.NextToken != nil {
description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
return nil, err
}
asgs = append(asgs, description.AutoScalingGroups...)
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs)

return asgs, nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByTag with keys=%v", keys)

numKeys := len(keys)

// DescribeTags does an OR query when multiple filters on different tags are specified.
// In other words, DescribeTags returns [asg1, asg1] for keys [t1, t2] when there's only one asg tagged both t1 and t2.
// DescribeTags does an OR query when multiple filters on different tags are
// specified. In other words, DescribeTags returns [asg1, asg1] for keys
// [t1, t2] when there's only one asg tagged both t1 and t2.
filters := []*autoscaling.Filter{}
for _, key := range keys {
filter := &autoscaling.Filter{
Expand All @@ -130,58 +104,35 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByTags(keys []string) ([]*autos
}
filters = append(filters, filter)
}
description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{

tags := []*autoscaling.TagDescription{}
input := &autoscaling.DescribeTagsInput{
Filters: filters,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for keys %v : %v", keys, err)
return nil, err
}
if len(description.Tags) < 1 {
return nil, fmt.Errorf("Unable to find ASGs for tag keys %v", keys)
}
tags := []*autoscaling.TagDescription{}
tags = append(tags, description.Tags...)

for description.NextToken != nil {
description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for key %v: %v", keys, err)
return nil, err
}
tags = append(tags, description.Tags...)
if err := m.DescribeTagsPages(input, func(out *autoscaling.DescribeTagsOutput, _ bool) bool {
tags = append(tags, out.Tags...)
// We return true while we want to be called with the next page of
// results, if any.
return true
}); err != nil {
return nil, err
}

// De-duplicate asg names
asgNameOccurrences := map[string]int{}
for _, t := range tags {
asgName := *(t.ResourceId)
if n, ok := asgNameOccurrences[asgName]; ok {
asgNameOccurrences[asgName] = n + 1
} else {
asgNameOccurrences[asgName] = 1
}
}
// Accordingly to how DescribeTags API works, the result contains ASGs which not all but only subset of tags are associated.
// Explicitly select ASGs to which all the tags are associated so that we won't end up calling DescribeAutoScalingGroups API
// multiple times on an ASG
// According to how DescribeTags API works, the result contains ASGs which
// not all but only subset of tags are associated. Explicitly select ASGs to
// which all the tags are associated so that we won't end up calling
// DescribeAutoScalingGroups API multiple times on an ASG.
asgNames := []string{}
for asgName, n := range asgNameOccurrences {
if n == numKeys {
asgNameOccurrences := make(map[string]int)
for _, t := range tags {
asgName := aws.StringValue(t.ResourceId)
occurrences := asgNameOccurrences[asgName] + 1
if occurrences >= len(keys) {
asgNames = append(asgNames, asgName)
}
asgNameOccurrences[asgName] = occurrences
}

asgs, err := m.getAutoscalingGroupsByNames(asgNames)
if err != nil {
return nil, err
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs)

return asgs, nil
return m.getAutoscalingGroupsByNames(asgNames)
}
153 changes: 114 additions & 39 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,155 @@ package aws

import (
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/aws/aws-sdk-go/aws"
"github.com/golang/glog"
)

type autoScalingGroups struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
cacheMutex sync.Mutex
instancesNotInManagedAsg map[AwsRef]struct{}
service autoScalingWrapper
const scaleToZeroSupported = false

type asgCache struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
notInRegisteredAsg map[AwsRef]bool
mutex sync.Mutex
service autoScalingWrapper
interrupt chan struct{}
}

func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups {
registry := &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
func newASGCache(service autoScalingWrapper) (*asgCache, error) {
registry := &asgCache{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
notInRegisteredAsg: make(map[AwsRef]bool),
interrupt: make(chan struct{}),
}
return registry
go wait.Until(func() {
registry.mutex.Lock()
defer registry.mutex.Unlock()
if err := registry.regenerate(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour, registry.interrupt)

return registry, nil
}

// Register registers asg in Aws Manager.
func (m *autoScalingGroups) Register(asg *Asg) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
// Register ASG. Returns true if the ASG was registered.
func (m *asgCache) Register(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

for i := range m.registeredAsgs {
if existing := m.registeredAsgs[i].config; existing.AwsRef == asg.AwsRef {
if reflect.DeepEqual(existing, asg) {
return false
}
m.registeredAsgs[i].config = asg
glog.V(4).Infof("Updated ASG %s", asg.AwsRef.Name)
m.invalidateUnownedInstanceCache()
return true
}
}

glog.V(1).Infof("Registering ASG %s", asg.AwsRef.Name)
m.registeredAsgs = append(m.registeredAsgs, &asgInformation{
config: asg,
})
m.invalidateUnownedInstanceCache()
return true
}

// Unregister ASG. Returns true if the ASG was unregistered.
func (m *asgCache) Unregister(asg *Asg) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

updated := make([]*asgInformation, 0, len(m.registeredAsgs))
changed := false
for _, existing := range m.registeredAsgs {
if existing.config.AwsRef == asg.AwsRef {
glog.V(1).Infof("Unregistered ASG %s", asg.AwsRef.Name)
changed = true
continue
}
updated = append(updated, existing)
}
m.registeredAsgs = updated
return changed
}

func (m *asgCache) get() []*asgInformation {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.registeredAsgs
}

// FindForInstance returns AsgConfig of the given Instance
func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
func (m *asgCache) FindForInstance(instance *AwsRef) (*Asg, error) {
// TODO(negz): Prevent this calling describe ASGs too often.
m.mutex.Lock()
defer m.mutex.Unlock()

if m.notInRegisteredAsg[*instance] {
// We already know we don't own this instance. Return early and avoid
// additional calls to describe ASGs.
return nil, nil
}

if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
if _, found := m.instancesNotInManagedAsg[*instance]; found {
// The instance is already known to not belong to any configured ASG
// Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups
// See https://github.com/kubernetes/contrib/issues/2541
return nil, nil
}
if err := m.regenerateCache(); err != nil {
if err := m.regenerate(); err != nil {
return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err)
}
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
// instance does not belong to any configured ASG
glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance)
m.instancesNotInManagedAsg[*instance] = struct{}{}

m.notInRegisteredAsg[*instance] = true
return nil, nil
}

func (m *autoScalingGroups) regenerateCache() error {
func (m *asgCache) invalidateUnownedInstanceCache() {
glog.V(4).Info("Invalidating unowned instance cache")
m.notInRegisteredAsg = make(map[AwsRef]bool)
}

func (m *asgCache) regenerate() error {
newCache := make(map[AwsRef]*Asg)

for _, asg := range m.registeredAsgs {
glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name)
names := make([]string, len(m.registeredAsgs))
configs := make(map[string]*Asg)
for i, asg := range m.registeredAsgs {
names[i] = asg.config.Name
configs[asg.config.Name] = asg.config
}

group, err := m.service.getAutoscalingGroupByName(asg.config.Name)
if err != nil {
return err
}
glog.V(4).Infof("Regenerating instance to ASG map for ASGs: %v", names)
groups, err := m.service.getAutoscalingGroupsByNames(names)
if err != nil {
return err
}
for _, group := range groups {
for _, instance := range group.Instances {
ref := AwsRef{Name: *instance.InstanceId}
newCache[ref] = asg.config
ref := AwsRef{Name: aws.StringValue(instance.InstanceId)}
newCache[ref] = configs[aws.StringValue(group.AutoScalingGroupName)]
}
}

m.instanceToAsg = newCache
return nil
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *asgCache) Cleanup() {
close(m.interrupt)
}
Loading

0 comments on commit 88d97c2

Please sign in to comment.