From 428fa91a870082ccc94babdea5a1ba0bd7e5c235 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Sun, 30 Jun 2019 20:36:34 +0800 Subject: [PATCH 1/5] Allow scaling multiple vmss synchronously --- .../cloudprovider/azure/azure_scale_set.go | 70 +++++++++++++------ 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 1627db82d61d..9c6ab3309602 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -19,6 +19,7 @@ package azure import ( "fmt" "math/rand" + "net/http" "strings" "sync" "time" @@ -35,6 +36,10 @@ import ( schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) +var ( + vmssSizeRefreshPeriod = 15 * time.Second +) + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -112,7 +117,7 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { scaleSet.mutex.Lock() defer scaleSet.mutex.Unlock() - if scaleSet.lastRefresh.Add(15 * time.Second).After(time.Now()) { + if scaleSet.lastRefresh.Add(vmssSizeRefreshPeriod).After(time.Now()) { return scaleSet.curSize, nil } @@ -133,34 +138,55 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { return scaleSet.getCurSize() } -// SetScaleSetSize sets ScaleSet size. -func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() +// updateVMSSCapacity invokes virtualMachineScaleSetsClient to update the capacity for VMSS. +func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { + var op compute.VirtualMachineScaleSet + var resp *http.Response + var isSuccess bool + var err error + + defer func() { + if err != nil { + klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) + // Invalidate the VMSS size cache in order to fetch the size from the API. + scaleSet.mutex.Lock() + defer scaleSet.mutex.Unlock() + scaleSet.lastRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod) + } + }() resourceGroup := scaleSet.manager.config.ResourceGroup - op, err := scaleSet.getVMSSInfo() + op, err = scaleSet.getVMSSInfo() if err != nil { - return err + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return } op.Sku.Capacity = &size op.VirtualMachineScaleSetProperties.ProvisioningState = nil - updateCtx, updateCancel := getContextWithCancel() - defer updateCancel() - - glog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name) - resp, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(updateCtx, resourceGroup, scaleSet.Name, op) - isSuccess, realError := isSuccessHTTPResponse(resp, err) + ctx, cancel := getContextWithCancel() + defer cancel() + klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name) + resp, err = scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroup, scaleSet.Name, op) + isSuccess, err = isSuccessHTTPResponse(resp, err) if isSuccess { - glog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name) - scaleSet.curSize = size - scaleSet.lastRefresh = time.Now() - return nil + klog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name) + return } - glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, realError) - return realError + klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err) + return +} + +// SetScaleSetSize sets ScaleSet size. +func (scaleSet *ScaleSet) SetScaleSetSize(size int64) { + scaleSet.mutex.Lock() + defer scaleSet.mutex.Unlock() + + // Proactively set the VMSS size so autoscaler makes better decisions. + scaleSet.curSize = size + scaleSet.lastRefresh = time.Now() + go scaleSet.updateVMSSCapacity(size) } // TargetSize returns the current TARGET size of the node group. It is possible that the @@ -185,7 +211,8 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize()) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + scaleSet.SetScaleSetSize(size + int64(delta)) + return nil } // GetScaleSetVms returns list of nodes for the given scale set. @@ -240,7 +267,8 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { size, delta, len(nodes)) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + scaleSet.SetScaleSetSize(size + int64(delta)) + return nil } // Belongs returns true if the given node belongs to the NodeGroup. From 280560037e5578af9008f30ca943fcea2e4b68d3 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Sun, 30 Jun 2019 23:46:21 +0800 Subject: [PATCH 2/5] Ensure upcoming nodes are different --- cluster-autoscaler/core/scale_up.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 7c505f9de462..cff8483f4b92 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -25,6 +25,7 @@ import ( apiv1 "k8s.io/api/core/v1" extensionsv1 "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -294,7 +295,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto nodeGroup) } for i := 0; i < numberOfNodes; i++ { - upcomingNodes = append(upcomingNodes, nodeTemplate) + upcomingNodes = append(upcomingNodes, buildNodeInfoForNodeTemplate(nodeTemplate, i)) } } glog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) @@ -519,6 +520,14 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto return &status.ScaleUpStatus{ScaledUp: false, PodsRemainUnschedulable: getRemainingPods(podsRemainUnschedulable, skippedNodeGroups)}, nil } +func buildNodeInfoForNodeTemplate(nodeTemplate *schedulercache.NodeInfo, index int) *schedulercache.NodeInfo { + nodeInfo := nodeTemplate.Clone() + node := nodeInfo.Node() + node.Name = fmt.Sprintf("%s-%d", node.Name, index) + node.UID = uuid.NewUUID() + return nodeInfo +} + func getRemainingPods(schedulingErrors map[*apiv1.Pod]map[string]status.Reasons, skipped map[string]status.Reasons) []status.NoScaleUpInfo { remaining := []status.NoScaleUpInfo{} for pod, errs := range schedulingErrors { From 45ff79ec96ec16907adfec33a4ab2fb38559c1ff Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Sat, 29 Jun 2019 22:34:22 +0800 Subject: [PATCH 3/5] Add cache for vmss VM --- .../cloudprovider/azure/azure_scale_set.go | 84 +++++++++++++------ .../azure/azure_scale_set_test.go | 13 ++- 2 files changed, 72 insertions(+), 25 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 9c6ab3309602..729944d067ad 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -37,7 +37,8 @@ import ( ) var ( - vmssSizeRefreshPeriod = 15 * time.Second + vmssSizeRefreshPeriod = 15 * time.Second + vmssInstancesRefreshPeriod = 5 * time.Minute ) // ScaleSet implements NodeGroup interface. @@ -48,9 +49,13 @@ type ScaleSet struct { minSize int maxSize int - mutex sync.Mutex - lastRefresh time.Time - curSize int64 + sizeMutex sync.Mutex + curSize int64 + lastSizeRefresh time.Time + + instanceMutex sync.Mutex + instanceCache []string + lastInstanceRefresh time.Time } // NewScaleSet creates a new NewScaleSet. @@ -114,22 +119,33 @@ func (scaleSet *ScaleSet) getVMSSInfo() (compute.VirtualMachineScaleSet, error) } func (scaleSet *ScaleSet) getCurSize() (int64, error) { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() - if scaleSet.lastRefresh.Add(vmssSizeRefreshPeriod).After(time.Now()) { + if scaleSet.lastSizeRefresh.Add(vmssSizeRefreshPeriod).After(time.Now()) { return scaleSet.curSize, nil } glog.V(5).Infof("Get scale set size for %q", scaleSet.Name) set, err := scaleSet.getVMSSInfo() if err != nil { + if isAzureRequestsThrottled(err) { + // Log a warning and update the size refresh time so that it would retry after next vmssSizeRefreshPeriod. + glog.Warningf("getVMSSInfo() is throttled with message %v, would return the cached vmss size", err) + scaleSet.lastSizeRefresh = time.Now() + return scaleSet.curSize, nil + } return -1, err } glog.V(5).Infof("Getting scale set (%q) capacity: %d\n", scaleSet.Name, *set.Sku.Capacity) + if scaleSet.curSize != *set.Sku.Capacity { + // Invalidate the instance cache if the capacity has changed. + scaleSet.invalidateInstanceCache() + } + scaleSet.curSize = *set.Sku.Capacity - scaleSet.lastRefresh = time.Now() + scaleSet.lastSizeRefresh = time.Now() return scaleSet.curSize, nil } @@ -147,10 +163,10 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { defer func() { if err != nil { - klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) + glog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) // Invalidate the VMSS size cache in order to fetch the size from the API. - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() scaleSet.lastRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod) } }() @@ -158,7 +174,7 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { resourceGroup := scaleSet.manager.config.ResourceGroup op, err = scaleSet.getVMSSInfo() if err != nil { - klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + glog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) return } @@ -166,26 +182,27 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { op.VirtualMachineScaleSetProperties.ProvisioningState = nil ctx, cancel := getContextWithCancel() defer cancel() - klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name) + glog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdate(%s)", scaleSet.Name) resp, err = scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(ctx, resourceGroup, scaleSet.Name, op) isSuccess, err = isSuccessHTTPResponse(resp, err) if isSuccess { - klog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name) + glog.V(3).Infof("virtualMachineScaleSetsClient.CreateOrUpdate(%s) success", scaleSet.Name) + scaleSet.invalidateInstanceCache() return } - klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err) + glog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, err) return } // SetScaleSetSize sets ScaleSet size. func (scaleSet *ScaleSet) SetScaleSetSize(size int64) { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() // Proactively set the VMSS size so autoscaler makes better decisions. scaleSet.curSize = size - scaleSet.lastRefresh = time.Now() + scaleSet.lastSizeRefresh = time.Now() go scaleSet.updateVMSSCapacity(size) } @@ -467,19 +484,38 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { // Nodes returns a list of all nodes that belong to this node group. func (scaleSet *ScaleSet) Nodes() ([]string, error) { - scaleSet.mutex.Lock() - defer scaleSet.mutex.Unlock() + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + if int64(len(scaleSet.instanceCache)) == scaleSet.curSize && + scaleSet.lastInstanceRefresh.Add(vmssInstancesRefreshPeriod).After(time.Now()) { + return scaleSet.instanceCache, nil + } vms, err := scaleSet.GetScaleSetVms() if err != nil { + if isAzureRequestsThrottled(err) { + // Log a warning and update the instance refresh time so that it would retry after next vmssInstancesRefreshPeriod. + glog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", err) + scaleSet.lastInstanceRefresh = time.Now() + return scaleSet.instanceCache, nil + } return nil, err } - result := make([]string, 0, len(vms)) + instances := make([]string, len(vms)) for i := range vms { - name := "azure://" + strings.ToLower(vms[i]) - result = append(result, name) + instances[i] = "azure://" + strings.ToLower(vms[i]) } - return result, nil + scaleSet.instanceCache = instances + scaleSet.lastInstanceRefresh = time.Now() + return instances, nil +} + +func (scaleSet *ScaleSet) invalidateInstanceCache() { + scaleSet.instanceMutex.Lock() + // Set the instanceCache as outdated. + scaleSet.lastInstanceRefresh = time.Now().Add(-1 * vmssInstancesRefreshPeriod) + scaleSet.instanceMutex.Unlock() } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index c17e80a1bcee..f3e87647dac4 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -76,8 +76,19 @@ func TestIncreaseSize(t *testing.T) { assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) - err := provider.NodeGroups()[0].IncreaseSize(1) + // current target size is 2. + targetSize, err := provider.NodeGroups()[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, targetSize, 2) + + // increase 3 nodes. + err = provider.NodeGroups()[0].IncreaseSize(3) + assert.NoError(t, err) + + // new target size should be 5. + targetSize, err = provider.NodeGroups()[0].TargetSize() assert.NoError(t, err) + assert.Equal(t, 5, targetSize) } func TestBelongs(t *testing.T) { From 1fa8e0a6fd1e8bc21b2a8ac27601039fbd8eeb33 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Sun, 30 Jun 2019 13:09:07 +0800 Subject: [PATCH 4/5] Fix Azure client requests stuck issues on http.StatusTooManyRequests --- .../cloudprovider/azure/azure_scale_set.go | 18 ++++++++- .../cloudprovider/azure/azure_util.go | 18 +++++++++ .../cloudprovider/azure/azure_util_test.go | 38 +++++++++++++++++++ 3 files changed, 73 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 729944d067ad..05a736aefc10 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -25,6 +25,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" + "github.com/Azure/go-autorest/autorest" "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -41,6 +42,21 @@ var ( vmssInstancesRefreshPeriod = 5 * time.Minute ) +func init() { + // In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L242, + // if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count, + // hence the Azure clients will keep retrying forever until it get a status code other than 429. + // So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry. + // Refer https://github.com/Azure/go-autorest/issues/398. + statusCodesForRetry := make([]int, 0) + for _, code := range autorest.StatusCodesForRetry { + if code != http.StatusTooManyRequests { + statusCodesForRetry = append(statusCodesForRetry, code) + } + } + autorest.StatusCodesForRetry = statusCodesForRetry +} + // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -167,7 +183,7 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(size int64) { // Invalidate the VMSS size cache in order to fetch the size from the API. scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() - scaleSet.lastRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod) + scaleSet.lastSizeRefresh = time.Now().Add(-1 * vmssSizeRefreshPeriod) } }() diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util.go b/cluster-autoscaler/cloudprovider/azure/azure_util.go index 771f9c483efc..5c1d07e045df 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util.go @@ -628,3 +628,21 @@ func isSuccessHTTPResponse(resp *http.Response, err error) (isSuccess bool, real // This shouldn't happen, it only ensures all exceptions are handled. return false, fmt.Errorf("failed with unknown error") } + +// isAzureRequestsThrottled returns true when the err is http.StatusTooManyRequests (429). +func isAzureRequestsThrottled(err error) bool { + if err == nil { + return false + } + + v, ok := err.(autorest.DetailedError) + if !ok { + return false + } + + if v.StatusCode == http.StatusTooManyRequests { + return true + } + + return false +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util_test.go b/cluster-autoscaler/cloudprovider/azure/azure_util_test.go index 5ae856ab54c9..b96d61a4ef71 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-04-01/compute" + "github.com/Azure/go-autorest/autorest" "github.com/stretchr/testify/assert" ) @@ -189,3 +190,40 @@ func TestIsSuccessResponse(t *testing.T) { assert.Equal(t, test.expectedError, realError, "[%s] expected: %v, saw: %v", test.name, realError, test.expectedError) } } + +func TestIsAzureRequestsThrottled(t *testing.T) { + tests := []struct { + desc string + err error + expected bool + }{ + { + desc: "nil error should return false", + expected: false, + }, + { + desc: "non autorest.DetailedError error should return false", + err: fmt.Errorf("unknown error"), + expected: false, + }, + { + desc: "non http.StatusTooManyRequests error should return false", + err: autorest.DetailedError{ + StatusCode: http.StatusBadRequest, + }, + expected: false, + }, + { + desc: "http.StatusTooManyRequests error should return true", + err: autorest.DetailedError{ + StatusCode: http.StatusTooManyRequests, + }, + expected: true, + }, + } + + for _, test := range tests { + real := isAzureRequestsThrottled(test.err) + assert.Equal(t, test.expected, real, test.desc) + } +} From 91f2eacf635e5a73156264db18efcf890a2685f0 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 8 Jul 2019 16:45:42 +0800 Subject: [PATCH 5/5] Fix race conditions when checking the VMSS size --- cluster-autoscaler/cloudprovider/azure/azure_scale_set.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 05a736aefc10..b9e5c9bc8735 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -500,10 +500,16 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { // Nodes returns a list of all nodes that belong to this node group. func (scaleSet *ScaleSet) Nodes() ([]string, error) { + curSize, err := scaleSet.getCurSize() + if err != nil { + glog.Errorf("Failed to get current size for vmss %q: %v", scaleSet.Name, err) + return nil, err + } + scaleSet.instanceMutex.Lock() defer scaleSet.instanceMutex.Unlock() - if int64(len(scaleSet.instanceCache)) == scaleSet.curSize && + if int64(len(scaleSet.instanceCache)) == curSize && scaleSet.lastInstanceRefresh.Add(vmssInstancesRefreshPeriod).After(time.Now()) { return scaleSet.instanceCache, nil }