Skip to content

Commit

Permalink
Merge pull request #2167 from feiskyer/cluster-autoscaler-release-1.12
Browse files Browse the repository at this point in the history
CA 1.12: cherry pick of  #2151 and #2152
  • Loading branch information
k8s-ci-robot authored Jul 8, 2019
2 parents 9e88eb1 + 91f2eac commit 13f58aa
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 33 deletions.
148 changes: 117 additions & 31 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package azure
import (
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/golang/glog"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -35,6 +37,26 @@ import (
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)

var (
vmssSizeRefreshPeriod = 15 * time.Second
vmssInstancesRefreshPeriod = 5 * time.Minute
)

func init() {
// In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L242,
// if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count,
// hence the Azure clients will keep retrying forever until it get a status code other than 429.
// So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry.
// Refer https://github.com/Azure/go-autorest/issues/398.
statusCodesForRetry := make([]int, 0)
for _, code := range autorest.StatusCodesForRetry {
if code != http.StatusTooManyRequests {
statusCodesForRetry = append(statusCodesForRetry, code)
}
}
autorest.StatusCodesForRetry = statusCodesForRetry
}

// ScaleSet implements NodeGroup interface.
type ScaleSet struct {
azureRef
Expand All @@ -43,9 +65,13 @@ type ScaleSet struct {
minSize int
maxSize int

mutex sync.Mutex
lastRefresh time.Time
curSize int64
sizeMutex sync.Mutex
curSize int64
lastSizeRefresh time.Time

instanceMutex sync.Mutex
instanceCache []string
lastInstanceRefresh time.Time
}

// NewScaleSet creates a new NewScaleSet.
Expand Down Expand Up @@ -109,22 +135,33 @@ func (scaleSet *ScaleSet) getVMSSInfo() (compute.VirtualMachineScaleSet, error)
}

func (scaleSet *ScaleSet) getCurSize() (int64, error) {
scaleSet.mutex.Lock()
defer scaleSet.mutex.Unlock()
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()

if scaleSet.lastRefresh.Add(15 * time.Second).After(time.Now()) {
if scaleSet.lastSizeRefresh.Add(vmssSizeRefreshPeriod).After(time.Now()) {
return scaleSet.curSize, nil
}

glog.V(5).Infof("Get scale set size for %q", scaleSet.Name)
set, err := scaleSet.getVMSSInfo()
if err != nil {
if isAzureRequestsThrottled(err) {
// Log a warning and update the size refresh time so that it would retry after next vmssSizeRefreshPeriod.
glog.Warningf("getVMSSInfo() is throttled with message %v, would return the cached vmss size", err)
scaleSet.lastSizeRefresh = time.Now()
return scaleSet.curSize, nil
}
return -1, err
}
glog.V(5).Infof("Getting scale set (%q) capacity: %d\n", scaleSet.Name, *set.Sku.Capacity)

if scaleSet.curSize != *set.Sku.Capacity {
// Invalidate the instance cache if the capacity has changed.
scaleSet.invalidateInstanceCache()
}

scaleSet.curSize = *set.Sku.Capacity
scaleSet.lastRefresh = time.Now()
scaleSet.lastSizeRefresh = time.Now()
return scaleSet.curSize, nil
}

Expand All @@ -133,34 +170,56 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return scaleSet.getCurSize()
}

// SetScaleSetSize sets ScaleSet size.
func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error {
scaleSet.mutex.Lock()
defer scaleSet.mutex.Unlock()
// updateVMSSCapacity invokes virtualMachineScaleSetsClient to update the capacity for VMSS.
func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) {
var op compute.VirtualMachineScaleSet
var resp *http.Response
var isSuccess bool
var err error

defer func() {
if err != nil {
glog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err)
// Invalidate the VMSS size cache in order to fetch the size from the API.
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()
scaleSet.lastSizeRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod)
}
}()

resourceGroup := scaleSet.manager.config.ResourceGroup
op, err := scaleSet.getVMSSInfo()
op, err = scaleSet.getVMSSInfo()
if err != nil {
return err
glog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err)
return
}

op.Sku.Capacity = &size
op.VirtualMachineScaleSetProperties.ProvisioningState = nil
updateCtx, updateCancel := getContextWithCancel()
defer updateCancel()

ctx, cancel := getContextWithCancel()
defer cancel()
glog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name)
resp, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(updateCtx, resourceGroup, scaleSet.Name, op)
isSuccess, realError := isSuccessHTTPResponse(resp, err)
resp, err = scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroup, scaleSet.Name, op)
isSuccess, err = isSuccessHTTPResponse(resp, err)
if isSuccess {
glog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name)
scaleSet.curSize = size
scaleSet.lastRefresh = time.Now()
return nil
scaleSet.invalidateInstanceCache()
return
}

glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, realError)
return realError
glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err)
return
}

// SetScaleSetSize sets ScaleSet size.
func (scaleSet *ScaleSet) SetScaleSetSize(size int64) {
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()

// Proactively set the VMSS size so autoscaler makes better decisions.
scaleSet.curSize = size
scaleSet.lastSizeRefresh = time.Now()
go scaleSet.updateVMSSCapacity(size)
}

// TargetSize returns the current TARGET size of the node group. It is possible that the
Expand All @@ -185,7 +244,8 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize())
}

return scaleSet.SetScaleSetSize(size + int64(delta))
scaleSet.SetScaleSetSize(size + int64(delta))
return nil
}

// GetScaleSetVms returns list of nodes for the given scale set.
Expand Down Expand Up @@ -240,7 +300,8 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error {
size, delta, len(nodes))
}

return scaleSet.SetScaleSetSize(size + int64(delta))
scaleSet.SetScaleSetSize(size + int64(delta))
return nil
}

// Belongs returns true if the given node belongs to the NodeGroup.
Expand Down Expand Up @@ -439,19 +500,44 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulercache.NodeInfo, error) {

// Nodes returns a list of all nodes that belong to this node group.
func (scaleSet *ScaleSet) Nodes() ([]string, error) {
scaleSet.mutex.Lock()
defer scaleSet.mutex.Unlock()
curSize, err := scaleSet.getCurSize()
if err != nil {
glog.Errorf("Failed to get current size for vmss %q: %v", scaleSet.Name, err)
return nil, err
}

scaleSet.instanceMutex.Lock()
defer scaleSet.instanceMutex.Unlock()

if int64(len(scaleSet.instanceCache)) == curSize &&
scaleSet.lastInstanceRefresh.Add(vmssInstancesRefreshPeriod).After(time.Now()) {
return scaleSet.instanceCache, nil
}

vms, err := scaleSet.GetScaleSetVms()
if err != nil {
if isAzureRequestsThrottled(err) {
// Log a warning and update the instance refresh time so that it would retry after next vmssInstancesRefreshPeriod.
glog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", err)
scaleSet.lastInstanceRefresh = time.Now()
return scaleSet.instanceCache, nil
}
return nil, err
}

result := make([]string, 0, len(vms))
instances := make([]string, len(vms))
for i := range vms {
name := "azure://" + strings.ToLower(vms[i])
result = append(result, name)
instances[i] = "azure://" + strings.ToLower(vms[i])
}

return result, nil
scaleSet.instanceCache = instances
scaleSet.lastInstanceRefresh = time.Now()
return instances, nil
}

func (scaleSet *ScaleSet) invalidateInstanceCache() {
scaleSet.instanceMutex.Lock()
// Set the instanceCache as outdated.
scaleSet.lastInstanceRefresh = time.Now().Add(-1 * vmssInstancesRefreshPeriod)
scaleSet.instanceMutex.Unlock()
}
13 changes: 12 additions & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,19 @@ func TestIncreaseSize(t *testing.T) {
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)

err := provider.NodeGroups()[0].IncreaseSize(1)
// current target size is 2.
targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, targetSize, 2)

// increase 3 nodes.
err = provider.NodeGroups()[0].IncreaseSize(3)
assert.NoError(t, err)

// new target size should be 5.
targetSize, err = provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 5, targetSize)
}

func TestBelongs(t *testing.T) {
Expand Down
18 changes: 18 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,21 @@ func isSuccessHTTPResponse(resp *http.Response, err error) (isSuccess bool, real
// This shouldn't happen, it only ensures all exceptions are handled.
return false, fmt.Errorf("failed with unknown error")
}

// isAzureRequestsThrottled returns true when the err is http.StatusTooManyRequests (429).
func isAzureRequestsThrottled(err error) bool {
if err == nil {
return false
}

v, ok := err.(autorest.DetailedError)
if !ok {
return false
}

if v.StatusCode == http.StatusTooManyRequests {
return true
}

return false
}
38 changes: 38 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute"
"github.com/Azure/go-autorest/autorest"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -189,3 +190,40 @@ func TestIsSuccessResponse(t *testing.T) {
assert.Equal(t, test.expectedError, realError, "[%s] expected: %v, saw: %v", test.name, realError, test.expectedError)
}
}

func TestIsAzureRequestsThrottled(t *testing.T) {
tests := []struct {
desc string
err error
expected bool
}{
{
desc: "nil error should return false",
expected: false,
},
{
desc: "non autorest.DetailedError error should return false",
err: fmt.Errorf("unknown error"),
expected: false,
},
{
desc: "non http.StatusTooManyRequests error should return false",
err: autorest.DetailedError{
StatusCode: http.StatusBadRequest,
},
expected: false,
},
{
desc: "http.StatusTooManyRequests error should return true",
err: autorest.DetailedError{
StatusCode: http.StatusTooManyRequests,
},
expected: true,
},
}

for _, test := range tests {
real := isAzureRequestsThrottled(test.err)
assert.Equal(t, test.expected, real, test.desc)
}
}
11 changes: 10 additions & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
Expand Down Expand Up @@ -294,7 +295,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
nodeGroup)
}
for i := 0; i < numberOfNodes; i++ {
upcomingNodes = append(upcomingNodes, nodeTemplate)
upcomingNodes = append(upcomingNodes, buildNodeInfoForNodeTemplate(nodeTemplate, i))
}
}
glog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
Expand Down Expand Up @@ -519,6 +520,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
return &status.ScaleUpStatus{ScaledUp: false, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups)}, nil
}

func buildNodeInfoForNodeTemplate(nodeTemplate *schedulercache.NodeInfo, index int) *schedulercache.NodeInfo {
nodeInfo := nodeTemplate.Clone()
node := nodeInfo.Node()
node.Name = fmt.Sprintf("%s-%d", node.Name, index)
node.UID = uuid.NewUUID()
return nodeInfo
}

func getRemainingPods(schedulingErrors map[*apiv1.Pod]map[string]status.Reasons, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
remaining := []status.NoScaleUpInfo{}
for pod, errs := range schedulingErrors {
Expand Down

0 comments on commit 13f58aa

Please sign in to comment.