diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 1627db82d61d..b9e5c9bc8735 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -19,11 +19,13 @@ package azure import ( "fmt" "math/rand" + "net/http" "strings" "sync" "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" + "github.com/Azure/go-autorest/autorest" "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -35,6 +37,26 @@ import ( schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) +var ( + vmssSizeRefreshPeriod = 15 * time.Second + vmssInstancesRefreshPeriod = 5 * time.Minute +) + +func init() { + // In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L242, + // if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count, + // hence the Azure clients will keep retrying forever until it get a status code other than 429. + // So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry. + // Refer https://github.com/Azure/go-autorest/issues/398. + statusCodesForRetry := make([]int, 0) + for _, code := range autorest.StatusCodesForRetry { + if code != http.StatusTooManyRequests { + statusCodesForRetry = append(statusCodesForRetry, code) + } + } + autorest.StatusCodesForRetry = statusCodesForRetry +} + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -43,9 +65,13 @@ type ScaleSet struct { minSize int maxSize int - mutex sync.Mutex - lastRefresh time.Time - curSize int64 + sizeMutex sync.Mutex + curSize int64 + lastSizeRefresh time.Time + + instanceMutex sync.Mutex + instanceCache []string + lastInstanceRefresh time.Time } // NewScaleSet creates a new NewScaleSet. @@ -109,22 +135,33 @@ func (scaleSet *ScaleSet) getVMSSInfo() (compute.VirtualMachineScaleSet, error) } func (scaleSet *ScaleSet) getCurSize() (int64, error) { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() - if scaleSet.lastRefresh.Add(15 * time.Second).After(time.Now()) { + if scaleSet.lastSizeRefresh.Add(vmssSizeRefreshPeriod).After(time.Now()) { return scaleSet.curSize, nil } glog.V(5).Infof("Get scale set size for %q", scaleSet.Name) set, err := scaleSet.getVMSSInfo() if err != nil { + if isAzureRequestsThrottled(err) { + // Log a warning and update the size refresh time so that it would retry after next vmssSizeRefreshPeriod. + glog.Warningf("getVMSSInfo() is throttled with message %v, would return the cached vmss size", err) + scaleSet.lastSizeRefresh = time.Now() + return scaleSet.curSize, nil + } return -1, err } glog.V(5).Infof("Getting scale set (%q) capacity: %d\n", scaleSet.Name, *set.Sku.Capacity) + if scaleSet.curSize != *set.Sku.Capacity { + // Invalidate the instance cache if the capacity has changed. + scaleSet.invalidateInstanceCache() + } + scaleSet.curSize = *set.Sku.Capacity - scaleSet.lastRefresh = time.Now() + scaleSet.lastSizeRefresh = time.Now() return scaleSet.curSize, nil } @@ -133,34 +170,56 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { return scaleSet.getCurSize() } -// SetScaleSetSize sets ScaleSet size. -func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() +// updateVMSSCapacity invokes virtualMachineScaleSetsClient to update the capacity for VMSS. +func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { + var op compute.VirtualMachineScaleSet + var resp *http.Response + var isSuccess bool + var err error + + defer func() { + if err != nil { + glog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) + // Invalidate the VMSS size cache in order to fetch the size from the API. + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() + scaleSet.lastSizeRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod) + } + }() resourceGroup := scaleSet.manager.config.ResourceGroup - op, err := scaleSet.getVMSSInfo() + op, err = scaleSet.getVMSSInfo() if err != nil { - return err + glog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return } op.Sku.Capacity = &size op.VirtualMachineScaleSetProperties.ProvisioningState = nil - updateCtx, updateCancel := getContextWithCancel() - defer updateCancel() - + ctx, cancel := getContextWithCancel() + defer cancel() glog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name) - resp, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(updateCtx, resourceGroup, scaleSet.Name, op) - isSuccess, realError := isSuccessHTTPResponse(resp, err) + resp, err = scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroup, scaleSet.Name, op) + isSuccess, err = isSuccessHTTPResponse(resp, err) if isSuccess { glog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name) - scaleSet.curSize = size - scaleSet.lastRefresh = time.Now() - return nil + scaleSet.invalidateInstanceCache() + return } - glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, realError) - return realError + glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err) + return +} + +// SetScaleSetSize sets ScaleSet size. +func (scaleSet *ScaleSet) SetScaleSetSize(size int64) { + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() + + // Proactively set the VMSS size so autoscaler makes better decisions. + scaleSet.curSize = size + scaleSet.lastSizeRefresh = time.Now() + go scaleSet.updateVMSSCapacity(size) } // TargetSize returns the current TARGET size of the node group. It is possible that the @@ -185,7 +244,8 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize()) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + scaleSet.SetScaleSetSize(size + int64(delta)) + return nil } // GetScaleSetVms returns list of nodes for the given scale set. @@ -240,7 +300,8 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { size, delta, len(nodes)) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + scaleSet.SetScaleSetSize(size + int64(delta)) + return nil } // Belongs returns true if the given node belongs to the NodeGroup. @@ -439,19 +500,44 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { // Nodes returns a list of all nodes that belong to this node group. func (scaleSet *ScaleSet) Nodes() ([]string, error) { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + curSize, err := scaleSet.getCurSize() + if err != nil { + glog.Errorf("Failed to get current size for vmss %q: %v", scaleSet.Name, err) + return nil, err + } + + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + if int64(len(scaleSet.instanceCache)) == curSize && + scaleSet.lastInstanceRefresh.Add(vmssInstancesRefreshPeriod).After(time.Now()) { + return scaleSet.instanceCache, nil + } vms, err := scaleSet.GetScaleSetVms() if err != nil { + if isAzureRequestsThrottled(err) { + // Log a warning and update the instance refresh time so that it would retry after next vmssInstancesRefreshPeriod. + glog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", err) + scaleSet.lastInstanceRefresh = time.Now() + return scaleSet.instanceCache, nil + } return nil, err } - result := make([]string, 0, len(vms)) + instances := make([]string, len(vms)) for i := range vms { - name := "azure://" + strings.ToLower(vms[i]) - result = append(result, name) + instances[i] = "azure://" + strings.ToLower(vms[i]) } - return result, nil + scaleSet.instanceCache = instances + scaleSet.lastInstanceRefresh = time.Now() + return instances, nil +} + +func (scaleSet *ScaleSet) invalidateInstanceCache() { + scaleSet.instanceMutex.Lock() + // Set the instanceCache as outdated. + scaleSet.lastInstanceRefresh = time.Now().Add(-1 * vmssInstancesRefreshPeriod) + scaleSet.instanceMutex.Unlock() } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index c17e80a1bcee..f3e87647dac4 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -76,8 +76,19 @@ func TestIncreaseSize(t *testing.T) { assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) - err := provider.NodeGroups()[0].IncreaseSize(1) + // current target size is 2. + targetSize, err := provider.NodeGroups()[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, targetSize, 2) + + // increase 3 nodes. + err = provider.NodeGroups()[0].IncreaseSize(3) + assert.NoError(t, err) + + // new target size should be 5. + targetSize, err = provider.NodeGroups()[0].TargetSize() assert.NoError(t, err) + assert.Equal(t, 5, targetSize) } func TestBelongs(t *testing.T) { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util.go b/cluster-autoscaler/cloudprovider/azure/azure_util.go index 771f9c483efc..5c1d07e045df 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util.go @@ -628,3 +628,21 @@ func isSuccessHTTPResponse(resp *http.Response, err error) (isSuccess bool, real // This shouldn't happen, it only ensures all exceptions are handled. return false, fmt.Errorf("failed with unknown error") } + +// isAzureRequestsThrottled returns true when the err is http.StatusTooManyRequests (429). +func isAzureRequestsThrottled(err error) bool { + if err == nil { + return false + } + + v, ok := err.(autorest.DetailedError) + if !ok { + return false + } + + if v.StatusCode == http.StatusTooManyRequests { + return true + } + + return false +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util_test.go b/cluster-autoscaler/cloudprovider/azure/azure_util_test.go index 5ae856ab54c9..b96d61a4ef71 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" + "github.com/Azure/go-autorest/autorest" "github.com/stretchr/testify/assert" ) @@ -189,3 +190,40 @@ func TestIsSuccessResponse(t *testing.T) { assert.Equal(t, test.expectedError, realError, "[%s] expected: %v, saw: %v", test.name, realError, test.expectedError) } } + +func TestIsAzureRequestsThrottled(t *testing.T) { + tests := []struct { + desc string + err error + expected bool + }{ + { + desc: "nil error should return false", + expected: false, + }, + { + desc: "non autorest.DetailedError error should return false", + err: fmt.Errorf("unknown error"), + expected: false, + }, + { + desc: "non http.StatusTooManyRequests error should return false", + err: autorest.DetailedError{ + StatusCode: http.StatusBadRequest, + }, + expected: false, + }, + { + desc: "http.StatusTooManyRequests error should return true", + err: autorest.DetailedError{ + StatusCode: http.StatusTooManyRequests, + }, + expected: true, + }, + } + + for _, test := range tests { + real := isAzureRequestsThrottled(test.err) + assert.Equal(t, test.expected, real, test.desc) + } +} diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 7c505f9de462..cff8483f4b92 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -25,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -294,7 +295,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto nodeGroup) } for i := 0; i < numberOfNodes; i++ { - upcomingNodes = append(upcomingNodes, nodeTemplate) + upcomingNodes = append(upcomingNodes, buildNodeInfoForNodeTemplate(nodeTemplate, i)) } } glog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) @@ -519,6 +520,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto return &status.ScaleUpStatus{ScaledUp: false, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups)}, nil } +func buildNodeInfoForNodeTemplate(nodeTemplate *schedulercache.NodeInfo, index int) *schedulercache.NodeInfo { + nodeInfo := nodeTemplate.Clone() + node := nodeInfo.Node() + node.Name = fmt.Sprintf("%s-%d", node.Name, index) + node.UID = uuid.NewUUID() + return nodeInfo +} + func getRemainingPods(schedulingErrors map[*apiv1.Pod]map[string]status.Reasons, skipped map[string]status.Reasons) []status.NoScaleUpInfo { remaining := []status.NoScaleUpInfo{} for pod, errs := range schedulingErrors {