Skip to content

Commit

Permalink
implement GetOptions for Azure
Browse files Browse the repository at this point in the history
Support per-VMSS (scaledown) settings as permited by the
cloudprovider's interface `GetOptions()` method.
  • Loading branch information
bpineau committed Aug 24, 2021
1 parent d09b893 commit 4b0d5f7
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 1 deletion.
20 changes: 20 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ k8s.io_cluster-autoscaler_node-template_resources_cpu: 3800m
k8s.io_cluster-autoscaler_node-template_resources_memory: 11Gi
```

#### Autoscaling options

Some autoscaling options can be defined per VM Scale Set, with tags.
Those tags values have the format as the respective cluster-autoscaler flags they override: floats or durations encoded as strings.

Supported options tags (with example values) are:
```
# overrides --scale-down-utilization-threshold global value for that specific VM Scale Set
k8s.io_cluster-autoscaler_node-template_autoscaling-options_scaledownutilizationthreshold: "0.5"
# overrides --scale-down-gpu-utilization-threshold global value for that specific VM Scale Set
k8s.io_cluster-autoscaler_node-template_autoscaling-options_scaledowngpuutilizationthreshold: "0.5"
# overrides --scale-down-unneeded-time global value for that specific VM Scale Set
k8s.io_cluster-autoscaler_node-template_autoscaling-options_scaledownunneededtime: "10m0s"
# overrides --scale-down-unready-time global value for that specific VM Scale Set
k8s.io_cluster-autoscaler_node-template_autoscaling-options_scaledownunreadytime: "20m0s"
```

## Deployment manifests

Cluster autoscaler supports four Kubernetes cluster options on Azure:
Expand Down
22 changes: 22 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package azure

import (
"reflect"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -53,6 +54,7 @@ type azureCache struct {
registeredNodeGroups []cloudprovider.NodeGroup
instanceToNodeGroup map[azureRef]cloudprovider.NodeGroup
unownedInstances map[azureRef]bool
autoscalingOptions map[azureRef]map[string]string
}

func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmType string) (*azureCache, error) {
Expand All @@ -67,6 +69,7 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmTy
registeredNodeGroups: make([]cloudprovider.NodeGroup, 0),
instanceToNodeGroup: make(map[azureRef]cloudprovider.NodeGroup),
unownedInstances: make(map[azureRef]bool),
autoscalingOptions: make(map[azureRef]map[string]string),
}

if err := cache.regenerate(); err != nil {
Expand Down Expand Up @@ -117,10 +120,22 @@ func (m *azureCache) regenerate() error {
}
}

// Regenerate VMSS to autoscaling options mapping.
newAutoscalingOptions := make(map[azureRef]map[string]string)
for _, vmss := range m.scaleSets {
ref := azureRef{Name: *vmss.Name}
options := extractAutoscalingOptionsFromScaleSetTags(vmss.Tags)
if !reflect.DeepEqual(m.getAutoscalingOptions(ref), options) {
klog.V(4).Infof("Extracted autoscaling options from %q ScaleSet tags: %v", *vmss.Name, options)
newAutoscalingOptions[ref] = options
}
}

m.mutex.Lock()
defer m.mutex.Unlock()

m.instanceToNodeGroup = newInstanceToNodeGroupCache
m.autoscalingOptions = newAutoscalingOptions

// Reset unowned instances cache.
m.unownedInstances = make(map[azureRef]bool)
Expand Down Expand Up @@ -256,6 +271,13 @@ func (m *azureCache) getRegisteredNodeGroups() []cloudprovider.NodeGroup {
return m.registeredNodeGroups
}

func (m *azureCache) getAutoscalingOptions(ref azureRef) map[string]string {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.autoscalingOptions[ref]
}

// FindForInstance returns node group of the given Instance
func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) {
m.mutex.Lock()
Expand Down
24 changes: 24 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/Azure/go-autorest/autorest/azure"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
)
Expand Down Expand Up @@ -246,6 +247,29 @@ func (m *AzureManager) GetNodeGroupForInstance(instance *azureRef) (cloudprovide
return m.azureCache.FindForInstance(instance, m.config.VMType)
}

// GetScaleSetOptions parse options extracted from VMSS tags and merges them with provided defaults
func (m *AzureManager) GetScaleSetOptions(scaleSetName string, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions {
options := m.azureCache.getAutoscalingOptions(azureRef{Name: scaleSetName})
if options == nil || len(options) == 0 {
return &defaults
}

if opt, ok := getFloat64Option(options, scaleSetName, config.DefaultScaleDownUtilizationThresholdKey); ok {
defaults.ScaleDownUtilizationThreshold = opt
}
if opt, ok := getFloat64Option(options, scaleSetName, config.DefaultScaleDownGpuUtilizationThresholdKey); ok {
defaults.ScaleDownGpuUtilizationThreshold = opt
}
if opt, ok := getDurationOption(options, scaleSetName, config.DefaultScaleDownUnneededTimeKey); ok {
defaults.ScaleDownUnneededTime = opt
}
if opt, ok := getDurationOption(options, scaleSetName, config.DefaultScaleDownUnreadyTimeKey); ok {
defaults.ScaleDownUnreadyTime = opt
}

return &defaults
}

// Cleanup the cache.
func (m *AzureManager) Cleanup() {
m.azureCache.Cleanup()
Expand Down
45 changes: 45 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
azclients "k8s.io/legacy-cloud-providers/azure/clients"
"k8s.io/legacy-cloud-providers/azure/clients/vmssclient/mockvmssclient"
"k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient"
Expand Down Expand Up @@ -791,3 +792,47 @@ func TestManagerRefreshAndCleanup(t *testing.T) {
assert.NoError(t, err)
manager.Cleanup()
}

func TestGetScaleSetOptions(t *testing.T) {
manager := &AzureManager{
azureCache: &azureCache{
autoscalingOptions: make(map[azureRef]map[string]string),
},
}
defaultOptions := config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.1,
ScaleDownGpuUtilizationThreshold: 0.2,
ScaleDownUnneededTime: time.Second,
ScaleDownUnreadyTime: time.Minute,
}

tags := map[string]string{
config.DefaultScaleDownUtilizationThresholdKey: "0.2",
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.3",
config.DefaultScaleDownUnneededTimeKey: "30m",
config.DefaultScaleDownUnreadyTimeKey: "1h",
}
manager.azureCache.autoscalingOptions[azureRef{Name: "test1"}] = tags
opts := manager.GetScaleSetOptions("test1", defaultOptions)
assert.Equal(t, opts.ScaleDownUtilizationThreshold, 0.2)
assert.Equal(t, opts.ScaleDownGpuUtilizationThreshold, 0.3)
assert.Equal(t, opts.ScaleDownUnneededTime, 30*time.Minute)
assert.Equal(t, opts.ScaleDownUnreadyTime, time.Hour)

tags = map[string]string{
//config.DefaultScaleDownUtilizationThresholdKey: ... // not specified (-> default)
config.DefaultScaleDownGpuUtilizationThresholdKey: "not-a-float",
config.DefaultScaleDownUnneededTimeKey: "1m",
config.DefaultScaleDownUnreadyTimeKey: "not-a-duration",
}
manager.azureCache.autoscalingOptions[azureRef{Name: "test2"}] = tags
opts = manager.GetScaleSetOptions("test2", defaultOptions)
assert.Equal(t, opts.ScaleDownUtilizationThreshold, defaultOptions.ScaleDownUtilizationThreshold)
assert.Equal(t, opts.ScaleDownGpuUtilizationThreshold, defaultOptions.ScaleDownGpuUtilizationThreshold)
assert.Equal(t, opts.ScaleDownUnneededTime, time.Minute)
assert.Equal(t, opts.ScaleDownUnreadyTime, defaultOptions.ScaleDownUnreadyTime)

manager.azureCache.autoscalingOptions[azureRef{Name: "test3"}] = map[string]string{}
opts = manager.GetScaleSetOptions("test3", defaultOptions)
assert.Equal(t, *opts, defaultOptions)
}
6 changes: 5 additions & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func (scaleSet *ScaleSet) Autoprovisioned() bool {
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
func (scaleSet *ScaleSet) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
return nil, cloudprovider.ErrNotImplemented
template, err := scaleSet.getVMSSFromCache()
if err != nil {
return nil, err.Error()
}
return scaleSet.manager.GetScaleSetOptions(*template.Name, defaults), nil
}

// MaxSize returns maximum size of the node group.
Expand Down
49 changes: 49 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"k8s.io/klog/v2"
"math/rand"
"regexp"
"strconv"
"strings"
"time"
)

func buildInstanceOS(template compute.VirtualMachineScaleSet) string {
Expand Down Expand Up @@ -183,6 +185,53 @@ func extractTaintsFromScaleSet(tags map[string]*string) []apiv1.Taint {
return taints
}

func extractAutoscalingOptionsFromScaleSetTags(tags map[string]*string) map[string]string {
options := make(map[string]string)
for tagName, tagValue := range tags {
if !strings.HasPrefix(tagName, nodeOptionsTagName) {
continue
}
resourceName := strings.Split(tagName, nodeOptionsTagName)
if len(resourceName) < 2 || resourceName[1] == "" || tagValue == nil {
continue
}
options[resourceName[1]] = strings.ToLower(*tagValue)
}
return options
}

func getFloat64Option(options map[string]string, vmssName, name string) (float64, bool) {
raw, ok := options[strings.ToLower(name)]
if !ok {
return 0, false
}

option, err := strconv.ParseFloat(raw, 64)
if err != nil {
klog.Warningf("failed to convert VMSS %q tag %s_%s value %q to float: %v",
vmssName, nodeOptionsTagName, name, raw, err)
return 0, false
}

return option, true
}

func getDurationOption(options map[string]string, vmssName, name string) (time.Duration, bool) {
raw, ok := options[strings.ToLower(name)]
if !ok {
return 0, false
}

option, err := time.ParseDuration(raw)
if err != nil {
klog.Warningf("failed to convert VMSS %q tag %s_%s value %q to duration: %v",
vmssName, nodeOptionsTagName, name, raw, err)
return 0, false
}

return option, true
}

func extractAllocatableResourcesFromScaleSet(tags map[string]*string) map[string]*resource.Quantity {
resources := make(map[string]*resource.Quantity)

Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
nodeLabelTagName = "k8s.io_cluster-autoscaler_node-template_label_"
nodeTaintTagName = "k8s.io_cluster-autoscaler_node-template_taint_"
nodeResourcesTagName = "k8s.io_cluster-autoscaler_node-template_resources_"
nodeOptionsTagName = "k8s.io_cluster-autoscaler_node-template_autoscaling-options_"
)

var (
Expand Down

0 comments on commit 4b0d5f7

Please sign in to comment.