Skip to content

Commit

Permalink
Merge pull request kubernetes#4238 from DataDog/autoscaling-options-aws
Browse files Browse the repository at this point in the history
implement GetOptions for AWS
  • Loading branch information
k8s-ci-robot authored and Anton Kirillov committed Oct 27, 2022
1 parent 27c1a57 commit af8d362
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 1 deletion.
14 changes: 14 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ be labeled or tainted when they join the cluster, such as:
* `k8s.io/cluster-autoscaler/node-template/label/foo`: `bar`
* `k8s.io/cluster-autoscaler/node-template/taint/dedicated`: `NoSchedule`

ASG labels can specify autoscaling options, overriding the global cluster-autoscaler
settings for the labeled ASGs. Those labels takes the same values format as the
cluster-autoscaler command line flags they override (a float or a duration, encoded
as string). Currently supported autoscaling options (and example values) are:

* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/scaledownutilizationthreshold`: `0.5`
(overrides `--scale-down-utilization-threshold` value for that specific ASG)
* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/scaledowngpuutilizationthreshold`: `0.5`
(overrides `--scale-down-gpu-utilization-threshold` value for that specific ASG)
* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/scaledownunneededtime`: `10m0s`
(overrides `--scale-down-unneeded-time` value for that specific ASG)
* `k8s.io/cluster-autoscaler/node-template/autoscaling-options/scaledownunreadytime`: `20m0s`
(overrides `--scale-down-unready-time` value for that specific ASG)

**NOTE:** It is your responsibility to ensure such labels and/or taints are
applied via the node's kubelet configuration at startup.

Expand Down
20 changes: 20 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type asgCache struct {

asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
explicitlyConfigured map[AwsRef]bool
autoscalingOptions map[AwsRef]map[string]string
}

type launchTemplate struct {
Expand Down Expand Up @@ -82,6 +83,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
explicitlyConfigured: make(map[AwsRef]bool),
autoscalingOptions: make(map[AwsRef]map[string]string),
}

if err := registry.parseExplicitAsgs(explicitSpecs); err != nil {
Expand Down Expand Up @@ -189,6 +191,13 @@ func (m *asgCache) Get() []*asg {
return m.registeredAsgs
}

// GetAutoscalingOptions return autoscaling options strings obtained from ASG tags.
func (m *asgCache) GetAutoscalingOptions(ref AwsRef) map[string]string {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.autoscalingOptions[ref]
}

// FindForInstance returns AsgConfig of the given Instance
func (m *asgCache) FindForInstance(instance AwsInstanceRef) *asg {
m.mutex.Lock()
Expand Down Expand Up @@ -409,8 +418,19 @@ func (m *asgCache) regenerate() error {
klog.Warningf("Failed to fully populate ASG->instanceType mapping: %v", err)
}

// Rebuild autoscaling options cache
newAutoscalingOptions := make(map[AwsRef]map[string]string)
for _, asg := range m.registeredAsgs {
options := extractAutoscalingOptionsFromTags(asg.Tags)
if !reflect.DeepEqual(m.autoscalingOptions[asg.AwsRef], options) {
klog.V(4).Infof("Extracted autoscaling options from %q ASG tags: %v", asg.Name, options)
}
newAutoscalingOptions[asg.AwsRef] = options
}

m.asgToInstances = newAsgToInstancesCache
m.instanceToAsg = newInstanceToAsgCache
m.autoscalingOptions = newAutoscalingOptions
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ func (ng *AwsNodeGroup) Delete() error {
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
func (ng *AwsNodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
return nil, cloudprovider.ErrNotImplemented
if ng.asg == nil || ng.asg.Tags == nil || len(ng.asg.Tags) == 0 {
return &defaults, nil
}
return ng.awsManager.GetAsgOptions(*ng.asg, defaults), nil
}

// IncreaseSize increases Asg size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func newTestAwsManagerWithMockServices(mockAutoScaling autoScalingI, mockEC2 ec2
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
awsService: &awsService,
autoscalingOptions: make(map[AwsRef]map[string]string),
},
}
}
Expand Down
68 changes: 68 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"os"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
klog "k8s.io/klog/v2"
kubeletapis "k8s.io/kubelet/pkg/apis"
Expand All @@ -53,6 +55,7 @@ const (
refreshInterval = 1 * time.Minute
autoDiscovererTypeASG = "asg"
asgAutoDiscovererKeyTag = "tag"
optionsTagsPrefix = "k8s.io/cluster-autoscaler/node-template/autoscaling-options/"
)

// AwsManager is handles aws communication and data caching.
Expand Down Expand Up @@ -274,6 +277,10 @@ func (m *AwsManager) getAsgs() []*asg {
return m.asgCache.Get()
}

func (m *AwsManager) getAutoscalingOptions(ref AwsRef) map[string]string {
return m.asgCache.GetAutoscalingOptions(ref)
}

// SetAsgSize sets ASG size.
func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
return m.asgCache.SetAsgSize(asg, size)
Expand Down Expand Up @@ -322,6 +329,52 @@ func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) {
return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName)
}

// GetAsgOptions parse options extracted from ASG tags and merges them with provided defaults
func (m *AwsManager) GetAsgOptions(asg asg, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions {
options := m.getAutoscalingOptions(asg.AwsRef)
if options == nil || len(options) == 0 {
return &defaults
}

if stringOpt, found := options[config.DefaultScaleDownUtilizationThresholdKey]; found {
if opt, err := strconv.ParseFloat(stringOpt, 64); err != nil {
klog.Warning("failed to convert asg %s %s tag to float: %v",
asg.Name, config.DefaultScaleDownUtilizationThresholdKey, err)
} else {
defaults.ScaleDownUtilizationThreshold = opt
}
}

if stringOpt, found := options[config.DefaultScaleDownGpuUtilizationThresholdKey]; found {
if opt, err := strconv.ParseFloat(stringOpt, 64); err != nil {
klog.Warning("failed to convert asg %s %s tag to float: %v",
asg.Name, config.DefaultScaleDownGpuUtilizationThresholdKey, err)
} else {
defaults.ScaleDownGpuUtilizationThreshold = opt
}
}

if stringOpt, found := options[config.DefaultScaleDownUnneededTimeKey]; found {
if opt, err := time.ParseDuration(stringOpt); err != nil {
klog.Warning("failed to convert asg %s %s tag to duration: %v",
asg.Name, config.DefaultScaleDownUnneededTimeKey, err)
} else {
defaults.ScaleDownUnneededTime = opt
}
}

if stringOpt, found := options[config.DefaultScaleDownUnreadyTimeKey]; found {
if opt, err := time.ParseDuration(stringOpt); err != nil {
klog.Warning("failed to convert asg %s %s tag to duration: %v",
asg.Name, config.DefaultScaleDownUnreadyTimeKey, err)
} else {
defaults.ScaleDownUnreadyTime = opt
}
}

return &defaults
}

func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) {
node := apiv1.Node{}
nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63())
Expand Down Expand Up @@ -393,6 +446,21 @@ func extractLabelsFromAsg(tags []*autoscaling.TagDescription) map[string]string
return result
}

func extractAutoscalingOptionsFromTags(tags []*autoscaling.TagDescription) map[string]string {
options := make(map[string]string)
for _, tag := range tags {
if !strings.HasPrefix(aws.StringValue(tag.Key), optionsTagsPrefix) {
continue
}
splits := strings.Split(aws.StringValue(tag.Key), optionsTagsPrefix)
if len(splits) != 2 || splits[1] == "" {
continue
}
options[splits[1]] = aws.StringValue(tag.Value)
}
return options
}

func extractAllocatableResourcesFromAsg(tags []*autoscaling.TagDescription) map[string]*resource.Quantity {
result := make(map[string]*resource.Quantity)

Expand Down
73 changes: 73 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
Expand All @@ -38,6 +39,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
kubeletapis "k8s.io/kubelet/pkg/apis"
provider_aws "k8s.io/legacy-cloud-providers/aws"
)
Expand Down Expand Up @@ -112,6 +114,77 @@ func TestExtractAllocatableResourcesFromAsg(t *testing.T) {
assert.Equal(t, (&expectedEphemeralStorage).String(), labels["ephemeral-storage"].String())
}

func TestGetAsgOptions(t *testing.T) {
defaultOptions := config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.1,
ScaleDownGpuUtilizationThreshold: 0.2,
ScaleDownUnneededTime: time.Second,
ScaleDownUnreadyTime: time.Minute,
}

tests := []struct {
description string
tags map[string]string
expected *config.NodeGroupAutoscalingOptions
}{
{
description: "use defaults on unspecified tags",
tags: make(map[string]string),
expected: &defaultOptions,
},
{
description: "keep defaults on invalid tags values",
tags: map[string]string{
"scaledownutilizationthreshold": "not-a-float",
"scaledownunneededtime": "not-a-duration",
"ScaleDownUnreadyTime": "",
},
expected: &defaultOptions,
},
{
description: "use provided tags and fill missing with defaults",
tags: map[string]string{
"scaledownutilizationthreshold": "0.42",
"scaledownunneededtime": "1h",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.42,
ScaleDownGpuUtilizationThreshold: defaultOptions.ScaleDownGpuUtilizationThreshold,
ScaleDownUnneededTime: time.Hour,
ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime,
},
},
{
description: "ignore unknown tags",
tags: map[string]string{
"scaledownutilizationthreshold": "0.6",
"scaledowngpuutilizationthreshold": "0.7",
"scaledownunneededtime": "1m",
"scaledownunreadytime": "1h",
"notyetspecified": "42",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.6,
ScaleDownGpuUtilizationThreshold: 0.7,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: time.Hour,
},
},
}

for _, tt := range tests {
t.Run(tt.description, func(t *testing.T) {
testAsg := asg{AwsRef: AwsRef{Name: "testAsg"}}
cache, _ := newASGCache(nil, []string{}, []asgAutoDiscoveryConfig{})
cache.autoscalingOptions[testAsg.AwsRef] = tt.tags
awsManager := &AwsManager{asgCache: cache}

actual := awsManager.GetAsgOptions(testAsg, defaultOptions)
assert.Equal(t, tt.expected, actual)
})
}
}

func TestBuildNodeFromTemplate(t *testing.T) {
awsManager := &AwsManager{}
asg := &asg{AwsRef: AwsRef{Name: "test-auto-scaling-group"}}
Expand Down

0 comments on commit af8d362

Please sign in to comment.