diff --git a/cluster-autoscaler/cloudprovider/azure/README.md b/cluster-autoscaler/cloudprovider/azure/README.md index 4ede19bb21eb..e46345b502f7 100644 --- a/cluster-autoscaler/cloudprovider/azure/README.md +++ b/cluster-autoscaler/cloudprovider/azure/README.md @@ -153,7 +153,7 @@ In addition, cluster-autoscaler exposes a `AZURE_VMSS_CACHE_TTL` environment var | Config Name | Default | Environment Variable | Cloud Config File | | ----------- | ------- | -------------------- | ----------------- | -| VmssCacheTTL | 15 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL | +| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL | The `AZURE_VMSS_VMS_CACHE_TTL` environment variable affects the `GetScaleSetVms` (VMSS VM List) calls rate. The default value is 300 seconds. A configurable jitter (`AZURE_VMSS_VMS_CACHE_JITTER` environment variable, default 0) expresses the maximum number of second that will be subtracted from that initial VMSS cache TTL after a new VMSS is discovered by the cluster-autoscaler: this can prevent a dogpile effect on clusters having many VMSS. diff --git a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go index 80e1a0e9556d..354748fd5e7c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go @@ -35,21 +35,13 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" klog "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/legacy-cloud-providers/azure/retry" ) const ( - vmInstancesRefreshPeriod = 5 * time.Minute clusterAutoscalerDeploymentPrefix = `cluster-autoscaler-` defaultMaxDeploymentsCount = 10 ) -var virtualMachinesStatusCache struct { - lastRefresh map[string]time.Time - mutex sync.Mutex - virtualMachines map[string][]compute.VirtualMachine -} - // AgentPool implements NodeGroup interface for agent pools deployed by aks-engine. type AgentPool struct { azureRef @@ -132,54 +124,24 @@ func (as *AgentPool) MaxSize() int { return as.maxSize } -func (as *AgentPool) getVirtualMachinesFromCache() ([]compute.VirtualMachine, error) { - virtualMachinesStatusCache.mutex.Lock() - defer virtualMachinesStatusCache.mutex.Unlock() - klog.V(4).Infof("getVirtualMachinesFromCache: starts for %+v", as) - - if virtualMachinesStatusCache.virtualMachines == nil { - klog.V(4).Infof("getVirtualMachinesFromCache: initialize vm cache") - virtualMachinesStatusCache.virtualMachines = make(map[string][]compute.VirtualMachine) - } - if virtualMachinesStatusCache.lastRefresh == nil { - klog.V(4).Infof("getVirtualMachinesFromCache: initialize last refresh time cache") - virtualMachinesStatusCache.lastRefresh = make(map[string]time.Time) - } - - if virtualMachinesStatusCache.lastRefresh[as.Id()].Add(vmInstancesRefreshPeriod).After(time.Now()) { - klog.V(4).Infof("getVirtualMachinesFromCache: get vms from cache") - return virtualMachinesStatusCache.virtualMachines[as.Id()], nil - } - klog.V(4).Infof("getVirtualMachinesFromCache: get vms from API") - vms, rerr := as.GetVirtualMachines() - klog.V(4).Infof("getVirtualMachinesFromCache: got vms from API, len = %d", len(vms)) - - if rerr != nil { - if isAzureRequestsThrottled(rerr) { - klog.Warningf("getAllVirtualMachines: throttling with message %v, would return the cached vms", rerr) - return virtualMachinesStatusCache.virtualMachines[as.Id()], nil - } - - return []compute.VirtualMachine{}, rerr.Error() - } - - virtualMachinesStatusCache.virtualMachines[as.Id()] = vms - virtualMachinesStatusCache.lastRefresh[as.Id()] = time.Now() - - return vms, nil +// Id returns AgentPool id. +func (as *AgentPool) Id() string { + return as.Name } -func invalidateVMCache(agentpoolName string) { - virtualMachinesStatusCache.mutex.Lock() - virtualMachinesStatusCache.lastRefresh[agentpoolName] = time.Now().Add(-1 * vmInstancesRefreshPeriod) - virtualMachinesStatusCache.mutex.Unlock() +func (as *AgentPool) getVMsFromCache() ([]compute.VirtualMachine, error) { + allVMs := as.manager.azureCache.getVirtualMachines() + if _, exists := allVMs[as.Name]; !exists { + return []compute.VirtualMachine{}, fmt.Errorf("could not find VMs with poolName: %s", as.Name) + } + return allVMs[as.Name], nil } // GetVMIndexes gets indexes of all virtual machines belonging to the agent pool. func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) { klog.V(6).Infof("GetVMIndexes: starts for as %v", as) - instances, err := as.getVirtualMachinesFromCache() + instances, err := as.getVMsFromCache() if err != nil { return nil, nil, err } @@ -222,8 +184,8 @@ func (as *AgentPool) getCurSize() (int64, error) { klog.V(5).Infof("Returning agent pool (%q) size: %d\n", as.Name, len(indexes)) if as.curSize != int64(len(indexes)) { - klog.V(6).Infof("getCurSize:as.curSize(%d) != real size (%d), invalidating vm cache", as.curSize, len(indexes)) - invalidateVMCache(as.Id()) + klog.V(6).Infof("getCurSize:as.curSize(%d) != real size (%d), invalidating cache", as.curSize, len(indexes)) + as.manager.invalidateCache() } as.curSize = int64(len(indexes)) @@ -316,8 +278,8 @@ func (as *AgentPool) IncreaseSize(delta int) error { klog.Warningf("IncreaseSize: failed to cleanup outdated deployments with err: %v.", err) } - klog.V(6).Infof("IncreaseSize: invalidating vm cache") - invalidateVMCache(as.Id()) + klog.V(6).Infof("IncreaseSize: invalidating cache") + as.manager.invalidateCache() indexes, _, err := as.GetVMIndexes() if err != nil { @@ -357,8 +319,8 @@ func (as *AgentPool) IncreaseSize(delta int) error { // Update cache after scale success. as.curSize = int64(expectedSize) as.lastRefresh = time.Now() - klog.V(6).Info("IncreaseSize: invalidating vm cache") - invalidateVMCache(as.Id()) + klog.V(6).Info("IncreaseSize: invalidating cache") + as.manager.invalidateCache() return nil } @@ -366,34 +328,6 @@ func (as *AgentPool) IncreaseSize(delta int) error { return realError } -// GetVirtualMachines returns list of nodes for the given agent pool. -func (as *AgentPool) GetVirtualMachines() ([]compute.VirtualMachine, *retry.Error) { - ctx, cancel := getContextWithCancel() - defer cancel() - - result, rerr := as.manager.azClient.virtualMachinesClient.List(ctx, as.manager.config.ResourceGroup) - if rerr != nil { - return nil, rerr - } - - instances := make([]compute.VirtualMachine, 0) - for _, instance := range result { - if instance.Tags == nil { - continue - } - - tags := instance.Tags - vmPoolName := tags["poolName"] - if vmPoolName == nil || !strings.EqualFold(*vmPoolName, as.Id()) { - continue - } - - instances = append(instances, instance) - } - - return instances, nil -} - // DecreaseTargetSize decreases the target size of the node group. This function // doesn't permit to delete any existing node and can be used only to reduce the // request for new nodes that have not been yet fulfilled. Delta should be negative. @@ -403,7 +337,7 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error { as.mutex.Lock() defer as.mutex.Unlock() - nodes, err := as.getVirtualMachinesFromCache() + nodes, err := as.getVMsFromCache() if err != nil { return err } @@ -427,14 +361,14 @@ func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) { Name: node.Spec.ProviderID, } - targetAsg, err := as.manager.GetAsgForInstance(ref) + targetAsg, err := as.manager.GetNodeGroupForInstance(ref) if err != nil { return false, err } if targetAsg == nil { return false, fmt.Errorf("%s doesn't belong to a known agent pool", node.Name) } - if !strings.EqualFold(targetAsg.Id(), as.Id()) { + if !strings.EqualFold(targetAsg.Id(), as.Name) { return false, nil } return true, nil @@ -446,13 +380,13 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error { return nil } - commonAsg, err := as.manager.GetAsgForInstance(instances[0]) + commonAsg, err := as.manager.GetNodeGroupForInstance(instances[0]) if err != nil { return err } for _, instance := range instances { - asg, err := as.manager.GetAsgForInstance(instance) + asg, err := as.manager.GetNodeGroupForInstance(instance) if err != nil { return err } @@ -476,8 +410,8 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error { } } - klog.V(6).Infof("DeleteInstances: invalidating vm cache") - invalidateVMCache(as.Id()) + klog.V(6).Infof("DeleteInstances: invalidating cache") + as.manager.invalidateCache() return nil } @@ -501,7 +435,7 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error { } if belongs != true { - return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Id()) + return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Name) } ref := &azureRef{ @@ -518,14 +452,9 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error { return as.DeleteInstances(refs) } -// Id returns AgentPool id. -func (as *AgentPool) Id() string { - return as.Name -} - // Debug returns a debug string for the agent pool. func (as *AgentPool) Debug() string { - return fmt.Sprintf("%s (%d:%d)", as.Id(), as.MinSize(), as.MaxSize()) + return fmt.Sprintf("%s (%d:%d)", as.Name, as.MinSize(), as.MaxSize()) } // TemplateNodeInfo returns a node template for this agent pool. @@ -535,7 +464,7 @@ func (as *AgentPool) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { // Nodes returns a list of all nodes that belong to this node group. func (as *AgentPool) Nodes() ([]cloudprovider.Instance, error) { - instances, err := as.getVirtualMachinesFromCache() + instances, err := as.getVMsFromCache() if err != nil { return nil, err } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go index ab5d7786cd28..0c32de654093 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go @@ -48,8 +48,6 @@ var ( ) func newTestAgentPool(manager *AzureManager, name string) *AgentPool { - virtualMachinesStatusCache.lastRefresh = make(map[string]time.Time) - return &AgentPool{ azureRef: azureRef{ Name: name, @@ -173,7 +171,7 @@ func TestDeleteOutdatedDeployments(t *testing.T) { } } -func TestGetVirtualMachinesFromCache(t *testing.T) { +func TestGetVMsFromCache(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -186,35 +184,16 @@ func TestGetVirtualMachinesFromCache(t *testing.T) { mockVMClient := mockvmclient.NewMockInterface(ctrl) testAS.manager.azClient.virtualMachinesClient = mockVMClient - - mockVMClient.EXPECT().List(gomock.Any(), testAS.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrTooManyReqs) - vms, err := testAS.getVirtualMachinesFromCache() - assert.NoError(t, err) - assert.Empty(t, vms) - - mockVMClient.EXPECT().List(gomock.Any(), testAS.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) - vms, err = testAS.getVirtualMachinesFromCache() - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - assert.Empty(t, vms) - mockVMClient.EXPECT().List(gomock.Any(), testAS.manager.config.ResourceGroup).Return(expectedVMs, nil) - vms, err = testAS.getVirtualMachinesFromCache() - assert.Equal(t, 1, len(vms)) + ac, err := newAzureCache(testAS.manager.azClient, refreshInterval, testAS.manager.config.ResourceGroup, vmTypeStandard) assert.NoError(t, err) + testAS.manager.azureCache = ac - vms, err = testAS.getVirtualMachinesFromCache() + vms, err := testAS.getVMsFromCache() assert.Equal(t, 1, len(vms)) assert.NoError(t, err) } -func TestInvalidateVMCache(t *testing.T) { - virtualMachinesStatusCache.lastRefresh = make(map[string]time.Time) - virtualMachinesStatusCache.lastRefresh["test"] = time.Now() - invalidateVMCache("test") - assert.True(t, virtualMachinesStatusCache.lastRefresh["test"].Add(vmInstancesRefreshPeriod).Before(time.Now())) -} - func TestGetVMIndexes(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -223,32 +202,29 @@ func TestGetVMIndexes(t *testing.T) { expectedVMs := getExpectedVMs() mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) sortedIndexes, indexToVM, err := as.GetVMIndexes() - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - assert.Nil(t, sortedIndexes) - assert.Nil(t, indexToVM) - - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) - sortedIndexes, indexToVM, err = as.GetVMIndexes() assert.NoError(t, err) assert.Equal(t, 2, len(sortedIndexes)) assert.Equal(t, 2, len(indexToVM)) - invalidateVMCache("as") expectedVMs[0].ID = to.StringPtr("foo") mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + err = as.manager.forceRefresh() + assert.NoError(t, err) sortedIndexes, indexToVM, err = as.GetVMIndexes() - expectedErr = fmt.Errorf("\"azure://foo\" isn't in Azure resource ID format") + expectedErr := fmt.Errorf("\"azure://foo\" isn't in Azure resource ID format") assert.Equal(t, expectedErr, err) assert.Nil(t, sortedIndexes) assert.Nil(t, indexToVM) - invalidateVMCache("as") expectedVMs[0].Name = to.StringPtr("foo") mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + err = as.manager.forceRefresh() sortedIndexes, indexToVM, err = as.GetVMIndexes() expectedErr = fmt.Errorf("resource name was missing from identifier") assert.Equal(t, expectedErr, err) @@ -265,23 +241,17 @@ func TestGetCurSize(t *testing.T) { expectedVMs := getExpectedVMs() mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac as.lastRefresh = time.Now() curSize, err := as.getCurSize() assert.NoError(t, err) assert.Equal(t, int64(1), curSize) - invalidateVMCache("as") - as.lastRefresh = time.Now().Add(-1 * 15 * time.Second) - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) - curSize, err = as.getCurSize() - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - assert.Zero(t, curSize) - - invalidateVMCache("as") - as.lastRefresh = time.Now().Add(-1 * 15 * time.Second) - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + as.lastRefresh = time.Now().Add(-1 * 3 * time.Minute) curSize, err = as.getCurSize() assert.NoError(t, err) assert.Equal(t, int64(2), curSize) @@ -295,18 +265,13 @@ func TestAgentPoolTargetSize(t *testing.T) { mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient expectedVMs := getExpectedVMs() + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - invalidateVMCache("as") - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) - size, err := as.getCurSize() - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - assert.Zero(t, size) - - invalidateVMCache("as") as.lastRefresh = time.Now().Add(-1 * 15 * time.Second) - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) - size, err = as.getCurSize() + size, err := as.getCurSize() assert.NoError(t, err) assert.Equal(t, int64(2), size) } @@ -319,12 +284,18 @@ func TestAgentPoolIncreaseSize(t *testing.T) { mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient expectedVMs := getExpectedVMs() + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - err := as.IncreaseSize(-1) + err = as.IncreaseSize(-1) expectedErr := fmt.Errorf("size increase must be positive") assert.Equal(t, expectedErr, err) mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2) + err = as.manager.Refresh() + assert.NoError(t, err) err = as.IncreaseSize(4) expectedErr = fmt.Errorf("size increase too large - desired:6 max:5") @@ -341,12 +312,18 @@ func TestDecreaseTargetSize(t *testing.T) { mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient expectedVMs := getExpectedVMs() + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(3) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) - err := as.DecreaseTargetSize(-1) + err = as.DecreaseTargetSize(-1) assert.NoError(t, err) assert.Equal(t, int64(2), as.curSize) + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2) + err = as.manager.Refresh() + assert.NoError(t, err) err = as.DecreaseTargetSize(-1) expectedErr := fmt.Errorf("attempt to delete existing nodes targetSize:2 delta:-1 existingNodes: 2") assert.Equal(t, expectedErr, err) @@ -357,7 +334,7 @@ func TestAgentPoolBelongs(t *testing.T) { defer ctrl.Finish() as := newTestAgentPool(newTestAzureManager(t), "as") - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as flag, err := as.Belongs(&apiv1.Node{Spec: apiv1.NodeSpec{ProviderID: testValidProviderID0}}) assert.NoError(t, err) @@ -372,7 +349,7 @@ func TestAgentPoolBelongs(t *testing.T) { assert.False(t, flag) as1 := newTestAgentPool(newTestAzureManager(t), "as1") - as1.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as + as1.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as flag, err = as1.Belongs(&apiv1.Node{Spec: apiv1.NodeSpec{ProviderID: testValidProviderID0}}) assert.NoError(t, err) assert.False(t, flag) @@ -384,9 +361,9 @@ func TestDeleteInstances(t *testing.T) { as := newTestAgentPool(newTestAzureManager(t), "as") as1 := newTestAgentPool(newTestAzureManager(t), "as1") - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID1}] = as1 - as.manager.asgCache.instanceToAsg[azureRef{Name: testInvalidProviderID}] = as + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID1}] = as1 + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testInvalidProviderID}] = as mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient @@ -447,30 +424,28 @@ func TestAgentPoolDeleteNodes(t *testing.T) { defer ctrl.Finish() as := newTestAgentPool(newTestAzureManager(t), "as") - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as expectedVMs := getExpectedVMs() mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient mockSAClient := mockstorageaccountclient.NewMockInterface(ctrl) as.manager.azClient.storageAccountsClient = mockSAClient + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) - err := as.DeleteNodes([]*apiv1.Node{}) - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).Times(2) err = as.DeleteNodes([]*apiv1.Node{ { Spec: apiv1.NodeSpec{ProviderID: testInvalidProviderID}, ObjectMeta: v1.ObjectMeta{Name: "node"}, }, }) - expectedErr = fmt.Errorf("node doesn't belong to a known agent pool") + expectedErr := fmt.Errorf("node doesn't belong to a known agent pool") assert.Equal(t, expectedErr, err) as1 := newTestAgentPool(newTestAzureManager(t), "as1") - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as1 + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as1 err = as.DeleteNodes([]*apiv1.Node{ { Spec: apiv1.NodeSpec{ProviderID: testValidProviderID0}, @@ -480,7 +455,7 @@ func TestAgentPoolDeleteNodes(t *testing.T) { expectedErr = fmt.Errorf("node belongs to a different asg than as") assert.Equal(t, expectedErr, err) - as.manager.asgCache.instanceToAsg[azureRef{Name: testValidProviderID0}] = as + as.manager.azureCache.instanceToNodeGroup[azureRef{Name: testValidProviderID0}] = as mockVMClient.EXPECT().Get(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0", gomock.Any()).Return(getExpectedVMs()[0], nil) mockVMClient.EXPECT().Delete(gomock.Any(), as.manager.config.ResourceGroup, "as-vm-0").Return(nil) mockSAClient.EXPECT().ListKeys(gomock.Any(), as.manager.config.ResourceGroup, "foo").Return(storage.AccountListKeysResult{ @@ -521,15 +496,12 @@ func TestAgentPoolNodes(t *testing.T) { mockVMClient := mockvmclient.NewMockInterface(ctrl) as.manager.azClient.virtualMachinesClient = mockVMClient + mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) + ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard) + assert.NoError(t, err) + as.manager.azureCache = ac - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, &rerrInternalErr) nodes, err := as.Nodes() - expectedErr := fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: ") - assert.Equal(t, expectedErr, err) - assert.Nil(t, nodes) - - mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) - nodes, err = as.Nodes() assert.NoError(t, err) assert.Equal(t, 1, len(nodes)) @@ -540,9 +512,10 @@ func TestAgentPoolNodes(t *testing.T) { }, } mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil) - invalidateVMCache("as") + err = as.manager.forceRefresh() + assert.NoError(t, err) nodes, err = as.Nodes() - expectedErr = fmt.Errorf("\"azure://foo\" isn't in Azure resource ID format") + expectedErr := fmt.Errorf("\"azure://foo\" isn't in Azure resource ID format") assert.Equal(t, expectedErr, err) assert.Nil(t, nodes) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go b/cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go index 17cdd602c86c..51112ace97cc 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_autodiscovery.go @@ -26,14 +26,14 @@ const ( autoDiscovererTypeLabel = "label" ) -// A labelAutoDiscoveryConfig specifies how to auto-discover Azure scale sets. +// A labelAutoDiscoveryConfig specifies how to auto-discover Azure node groups. type labelAutoDiscoveryConfig struct { // Key-values to match on. Selector map[string]string } // ParseLabelAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs -// parsed into configuration appropriate for ASG autodiscovery. +// parsed into configuration appropriate for node group autodiscovery. func ParseLabelAutoDiscoverySpecs(o cloudprovider.NodeGroupDiscoveryOptions) ([]labelAutoDiscoveryConfig, error) { cfgs := make([]labelAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs)) var err error diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_cache.go index 5babcd1c913c..43a160372402 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cache.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache.go @@ -20,97 +20,244 @@ import ( "regexp" "strings" "sync" + "time" + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute" + "github.com/Azure/go-autorest/autorest/to" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - klog "k8s.io/klog/v2" + + "k8s.io/klog/v2" ) var ( virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/Microsoft.Compute/virtualMachines/(.+)$`) ) -type asgCache struct { - registeredAsgs []cloudprovider.NodeGroup - instanceToAsg map[azureRef]cloudprovider.NodeGroup - notInRegisteredAsg map[azureRef]bool - mutex sync.Mutex - interrupt chan struct{} +// azureCache is used for caching cluster resources state. +// +// It is needed to: +// - keep track of node groups (AKS, VM and VMSS types) in the cluster, +// - keep track of instances and which node group they belong to, +// - limit repetitive Azure API calls. +type azureCache struct { + mutex sync.Mutex + interrupt chan struct{} + azClient *azClient + refreshInterval time.Duration + + // Cache content. + resourceGroup string + vmType string + scaleSets map[string]compute.VirtualMachineScaleSet + virtualMachines map[string][]compute.VirtualMachine + registeredNodeGroups []cloudprovider.NodeGroup + instanceToNodeGroup map[azureRef]cloudprovider.NodeGroup + unownedInstances map[azureRef]bool } -func newAsgCache() (*asgCache, error) { - cache := &asgCache{ - registeredAsgs: make([]cloudprovider.NodeGroup, 0), - instanceToAsg: make(map[azureRef]cloudprovider.NodeGroup), - notInRegisteredAsg: make(map[azureRef]bool), - interrupt: make(chan struct{}), +func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmType string) (*azureCache, error) { + cache := &azureCache{ + interrupt: make(chan struct{}), + azClient: client, + refreshInterval: cacheTTL, + resourceGroup: resourceGroup, + vmType: vmType, + scaleSets: make(map[string]compute.VirtualMachineScaleSet), + virtualMachines: make(map[string][]compute.VirtualMachine), + registeredNodeGroups: make([]cloudprovider.NodeGroup, 0), + instanceToNodeGroup: make(map[azureRef]cloudprovider.NodeGroup), + unownedInstances: make(map[azureRef]bool), } - cache.mutex.Lock() - defer cache.mutex.Unlock() if err := cache.regenerate(); err != nil { - klog.Errorf("Error while regenerating Asg cache: %v", err) + klog.Errorf("Error while regenerating Azure cache: %v", err) } return cache, nil } +func (m *azureCache) getVirtualMachines() map[string][]compute.VirtualMachine { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.virtualMachines +} + +func (m *azureCache) getScaleSets() map[string]compute.VirtualMachineScaleSet { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.scaleSets +} + +// Cleanup closes the channel to signal the go routine to stop that is handling the cache +func (m *azureCache) Cleanup() { + close(m.interrupt) +} + +func (m *azureCache) regenerate() error { + err := m.fetchAzureResources() + if err != nil { + return err + } + + // Regenerate instance to node groups mapping. + newInstanceToNodeGroupCache := make(map[azureRef]cloudprovider.NodeGroup) + for _, ng := range m.registeredNodeGroups { + klog.V(4).Infof("regenerate: finding nodes for node group %s", ng.Id()) + instances, err := ng.Nodes() + if err != nil { + return err + } + klog.V(4).Infof("regenerate: found nodes for node group %s: %+v", ng.Id(), instances) + + for _, instance := range instances { + ref := azureRef{Name: instance.Id} + newInstanceToNodeGroupCache[ref] = ng + } + } + + m.mutex.Lock() + defer m.mutex.Unlock() + + m.instanceToNodeGroup = newInstanceToNodeGroupCache + + // Reset unowned instances cache. + m.unownedInstances = make(map[azureRef]bool) + + return nil +} + +func (m *azureCache) fetchAzureResources() error { + m.mutex.Lock() + defer m.mutex.Unlock() + + switch m.vmType { + case vmTypeVMSS: + // List all VMSS in the RG. + vmssResult, err := m.fetchScaleSets() + if err == nil { + m.scaleSets = vmssResult + } else { + return err + } + case vmTypeStandard, vmTypeAKS: + // List all VMs in the RG. + vmResult, err := m.fetchVirtualMachines() + if err == nil { + m.virtualMachines = vmResult + } else { + return err + } + } + + return nil +} + +// fetchVirtualMachines returns the updated list of virtual machines in the config resource group using the Azure API. +func (m *azureCache) fetchVirtualMachines() (map[string][]compute.VirtualMachine, error) { + ctx, cancel := getContextWithCancel() + defer cancel() + + result, err := m.azClient.virtualMachinesClient.List(ctx, m.resourceGroup) + if err != nil { + klog.Errorf("VirtualMachinesClient.List in resource group %q failed: %v", m.resourceGroup, err) + return nil, err.Error() + } + + instances := make(map[string][]compute.VirtualMachine) + for _, instance := range result { + if instance.Tags == nil { + continue + } + + tags := instance.Tags + vmPoolName := tags["poolName"] + if vmPoolName == nil { + continue + } + + instances[to.String(vmPoolName)] = append(instances[to.String(vmPoolName)], instance) + } + return instances, nil +} + +// fetchScaleSets returns the updated list of scale sets in the config resource group using the Azure API. +func (m *azureCache) fetchScaleSets() (map[string]compute.VirtualMachineScaleSet, error) { + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + + result, err := m.azClient.virtualMachineScaleSetsClient.List(ctx, m.resourceGroup) + if err != nil { + klog.Errorf("VirtualMachineScaleSetsClient.List in resource group %q failed: %v", m.resourceGroup, err) + return nil, err.Error() + } + + sets := make(map[string]compute.VirtualMachineScaleSet) + for _, vmss := range result { + sets[*vmss.Name] = vmss + } + return sets, nil +} + // Register registers a node group if it hasn't been registered. -func (m *asgCache) Register(newAsg cloudprovider.NodeGroup) bool { +func (m *azureCache) Register(nodeGroup cloudprovider.NodeGroup) bool { m.mutex.Lock() defer m.mutex.Unlock() - for i := range m.registeredAsgs { - if existing := m.registeredAsgs[i]; strings.EqualFold(existing.Id(), newAsg.Id()) { - if existing.MinSize() == newAsg.MinSize() && existing.MaxSize() == newAsg.MaxSize() { + for i := range m.registeredNodeGroups { + if existing := m.registeredNodeGroups[i]; strings.EqualFold(existing.Id(), nodeGroup.Id()) { + if existing.MinSize() == nodeGroup.MinSize() && existing.MaxSize() == nodeGroup.MaxSize() { + // Node group is already registered and min/max size haven't changed, no action required. return false } - m.registeredAsgs[i] = newAsg - klog.V(4).Infof("ASG %q updated", newAsg.Id()) + m.registeredNodeGroups[i] = nodeGroup + klog.V(4).Infof("Node group %q updated", nodeGroup.Id()) m.invalidateUnownedInstanceCache() return true } } - klog.V(4).Infof("Registering ASG %q", newAsg.Id()) - m.registeredAsgs = append(m.registeredAsgs, newAsg) + klog.V(4).Infof("Registering Node Group %q", nodeGroup.Id()) + m.registeredNodeGroups = append(m.registeredNodeGroups, nodeGroup) m.invalidateUnownedInstanceCache() return true } -func (m *asgCache) invalidateUnownedInstanceCache() { +func (m *azureCache) invalidateUnownedInstanceCache() { klog.V(4).Info("Invalidating unowned instance cache") - m.notInRegisteredAsg = make(map[azureRef]bool) + m.unownedInstances = make(map[azureRef]bool) } -// Unregister ASG. Returns true if the ASG was unregistered. -func (m *asgCache) Unregister(asg cloudprovider.NodeGroup) bool { +// Unregister node group. Returns true if the node group was unregistered. +func (m *azureCache) Unregister(nodeGroup cloudprovider.NodeGroup) bool { m.mutex.Lock() defer m.mutex.Unlock() - updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredAsgs)) + updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredNodeGroups)) changed := false - for _, existing := range m.registeredAsgs { - if strings.EqualFold(existing.Id(), asg.Id()) { - klog.V(1).Infof("Unregistered ASG %s", asg.Id()) + for _, existing := range m.registeredNodeGroups { + if strings.EqualFold(existing.Id(), nodeGroup.Id()) { + klog.V(1).Infof("Unregistered node group %s", nodeGroup.Id()) changed = true continue } updated = append(updated, existing) } - m.registeredAsgs = updated + m.registeredNodeGroups = updated return changed } -func (m *asgCache) get() []cloudprovider.NodeGroup { +func (m *azureCache) getRegisteredNodeGroups() []cloudprovider.NodeGroup { m.mutex.Lock() defer m.mutex.Unlock() - return m.registeredAsgs + return m.registeredNodeGroups } -// FindForInstance returns Asg of the given Instance -func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) { +// FindForInstance returns node group of the given Instance +func (m *azureCache) FindForInstance(instance *azureRef, vmType string) (cloudprovider.NodeGroup, error) { m.mutex.Lock() defer m.mutex.Unlock() @@ -121,7 +268,7 @@ func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprov return nil, err } inst := azureRef{Name: resourceID} - if m.notInRegisteredAsg[inst] { + if m.unownedInstances[inst] { // We already know we don't own this instance. Return early and avoid // additional calls. klog.V(4).Infof("FindForInstance: Couldn't find NodeGroup of instance %q", inst) @@ -132,7 +279,7 @@ func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprov // Omit virtual machines not managed by vmss. if ok := virtualMachineRE.Match([]byte(inst.Name)); ok { klog.V(3).Infof("Instance %q is not managed by vmss, omit it in autoscaler", instance.Name) - m.notInRegisteredAsg[inst] = true + m.unownedInstances[inst] = true return nil, nil } } @@ -141,57 +288,27 @@ func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprov // Omit virtual machines with providerID not in Azure resource ID format. if ok := virtualMachineRE.Match([]byte(inst.Name)); !ok { klog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name) - m.notInRegisteredAsg[inst] = true + m.unownedInstances[inst] = true return nil, nil } } // Look up caches for the instance. - klog.V(6).Infof("FindForInstance: attempting to retrieve instance %v from cache", m.instanceToAsg) - if asg := m.getInstanceFromCache(inst.Name); asg != nil { - klog.V(4).Infof("FindForInstance: found asg %s in cache", asg.Id()) - return asg, nil + klog.V(6).Infof("FindForInstance: attempting to retrieve instance %v from cache", m.instanceToNodeGroup) + if nodeGroup := m.getInstanceFromCache(inst.Name); nodeGroup != nil { + klog.V(4).Infof("FindForInstance: found node group %q in cache", nodeGroup.Id()) + return nodeGroup, nil } - klog.V(4).Infof("FindForInstance: Couldn't find NodeGroup of instance %q", inst) + klog.V(4).Infof("FindForInstance: Couldn't find node group of instance %q", inst) return nil, nil } -// Cleanup closes the channel to signal the go routine to stop that is handling the cache -func (m *asgCache) Cleanup() { - close(m.interrupt) -} - -func (m *asgCache) regenerate() error { - newCache := make(map[azureRef]cloudprovider.NodeGroup) - - for _, nsg := range m.registeredAsgs { - klog.V(6).Infof("regenerate: finding nodes for nsg %+v", nsg) - instances, err := nsg.Nodes() - if err != nil { - return err - } - klog.V(6).Infof("regenerate: found nodes for nsg %v: %+v", nsg, instances) - - for _, instance := range instances { - ref := azureRef{Name: instance.Id} - newCache[ref] = nsg - } - } - - m.instanceToAsg = newCache - - // Invalidating unowned instance cache. - m.invalidateUnownedInstanceCache() - - return nil -} - -// Get node group from cache. nil would be return if not found. -// Should be call with lock protected. -func (m *asgCache) getInstanceFromCache(providerID string) cloudprovider.NodeGroup { - for instanceID, asg := range m.instanceToAsg { +// getInstanceFromCache gets the node group from cache. Returns nil if not found. +// Should be called with lock. +func (m *azureCache) getInstanceFromCache(providerID string) cloudprovider.NodeGroup { + for instanceID, nodeGroup := range m.instanceToNodeGroup { if strings.EqualFold(instanceID.GetKey(), providerID) { - return asg + return nodeGroup } } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache_test.go b/cluster-autoscaler/cloudprovider/azure/azure_cache_test.go index efb3eaebe1f2..2b87ab938486 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cache_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache_test.go @@ -28,9 +28,8 @@ func TestRegister(t *testing.T) { provider := newTestProvider(t) ss := newTestScaleSet(provider.azureManager, "ss") - ac, err := newAsgCache() - assert.NoError(t, err) - ac.registeredAsgs = []cloudprovider.NodeGroup{ss} + ac := provider.azureManager.azureCache + ac.registeredNodeGroups = []cloudprovider.NodeGroup{ss} isSuccess := ac.Register(ss) assert.False(t, isSuccess) @@ -46,29 +45,28 @@ func TestUnRegister(t *testing.T) { ss := newTestScaleSet(provider.azureManager, "ss") ss1 := newTestScaleSet(provider.azureManager, "ss1") - ac, err := newAsgCache() - assert.NoError(t, err) - ac.registeredAsgs = []cloudprovider.NodeGroup{ss, ss1} + ac := provider.azureManager.azureCache + ac.registeredNodeGroups = []cloudprovider.NodeGroup{ss, ss1} isSuccess := ac.Unregister(ss) assert.True(t, isSuccess) - assert.Equal(t, 1, len(ac.registeredAsgs)) + assert.Equal(t, 1, len(ac.registeredNodeGroups)) } func TestFindForInstance(t *testing.T) { - ac, err := newAsgCache() - assert.NoError(t, err) + provider := newTestProvider(t) + ac := provider.azureManager.azureCache inst := azureRef{Name: "/subscriptions/sub/resourceGroups/rg/providers/foo"} - ac.notInRegisteredAsg = make(map[azureRef]bool) - ac.notInRegisteredAsg[inst] = true + ac.unownedInstances = make(map[azureRef]bool) + ac.unownedInstances[inst] = true nodeGroup, err := ac.FindForInstance(&inst, vmTypeVMSS) assert.Nil(t, nodeGroup) assert.NoError(t, err) - ac.notInRegisteredAsg[inst] = false + ac.unownedInstances[inst] = false nodeGroup, err = ac.FindForInstance(&inst, vmTypeStandard) assert.Nil(t, nodeGroup) assert.NoError(t, err) - assert.True(t, ac.notInRegisteredAsg[inst]) + assert.True(t, ac.unownedInstances[inst]) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go index cec81a1802de..18ca771b7300 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go @@ -80,7 +80,7 @@ func (azure *AzureCloudProvider) GetAvailableGPUTypes() map[string]struct{} { // NodeGroups returns all node groups configured for this cloud provider. func (azure *AzureCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - asgs := azure.azureManager.getAsgs() + asgs := azure.azureManager.getNodeGroups() ngs := make([]cloudprovider.NodeGroup, len(asgs)) for i, asg := range asgs { @@ -102,7 +102,7 @@ func (azure *AzureCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovid } klog.V(6).Infof("NodeGroupForNode: ref.Name %s", ref.Name) - return azure.azureManager.GetAsgForInstance(ref) + return azure.azureManager.GetNodeGroupForInstance(ref) } // Pricing returns pricing model for this cloud provider or error if not available. diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go index bd8243df22dd..39cf87b17186 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go @@ -18,25 +18,31 @@ package azure import ( "fmt" - "io/ioutil" - "os" + "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" + "github.com/Azure/go-autorest/autorest/to" "testing" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config" - azclients "k8s.io/legacy-cloud-providers/azure/clients" "k8s.io/legacy-cloud-providers/azure/clients/vmssclient/mockvmssclient" "k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient" - "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" "github.com/Azure/go-autorest/autorest/azure" - "github.com/Azure/go-autorest/autorest/to" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" ) func newTestAzureManager(t *testing.T) *AzureManager { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + expectedScaleSets := newTestVMSSList(3, "test-vmss", "eastus") + expectedVMSSVMs := newTestVMSSVMList(3) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), "rg").Return(expectedScaleSets, nil).AnyTimes() + mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) + mockVMSSVMClient.EXPECT().List(gomock.Any(), "rg", "test-vmss", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + manager := &AzureManager{ env: azure.PublicCloud, explicitlyConfigured: make(map[string]bool), @@ -46,8 +52,9 @@ func newTestAzureManager(t *testing.T) *AzureManager { MaxDeploymentsCount: 2, Deployment: "deployment", }, - azClient: &azClient{ + virtualMachineScaleSetsClient: mockVMSSClient, + virtualMachineScaleSetVMsClient: mockVMSSVMClient, deploymentsClient: &DeploymentsClientMock{ FakeStore: map[string]resources.DeploymentExtended{ "deployment": { @@ -68,10 +75,10 @@ func newTestAzureManager(t *testing.T) *AzureManager { }, } - cache, error := newAsgCache() + cache, error := newAzureCache(manager.azClient, refreshInterval, manager.config.ResourceGroup, vmTypeVMSS) assert.NoError(t, error) - manager.asgCache = cache + manager.azureCache = cache return manager } @@ -105,7 +112,7 @@ func TestNodeGroups(t *testing.T) { provider := newTestProvider(t) assert.Equal(t, len(provider.NodeGroups()), 0) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -126,7 +133,7 @@ func TestNodeGroupForNode(t *testing.T) { mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -137,7 +144,7 @@ func TestNodeGroupForNode(t *testing.T) { }, } // refresh cache - provider.azureManager.regenerateCache() + provider.azureManager.forceRefresh() group, err := provider.NodeGroupForNode(node) assert.NoError(t, err) assert.NotNil(t, group, "Group should not be nil") @@ -158,7 +165,7 @@ func TestNodeGroupForNode(t *testing.T) { func TestNodeGroupForNodeWithNoProviderId(t *testing.T) { provider := newTestProvider(t) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -173,122 +180,3 @@ func TestNodeGroupForNodeWithNoProviderId(t *testing.T) { assert.NoError(t, err) assert.Equal(t, group, nil) } - -func TestBuildAzure(t *testing.T) { - expectedConfig := &Config{ - Cloud: "AzurePublicCloud", - Location: "southeastasia", - TenantID: "tenantId", - SubscriptionID: "subId", - ResourceGroup: "rg", - VMType: "vmss", - AADClientID: "clientId", - AADClientSecret: "clientsecret", - MaxDeploymentsCount: 10, - CloudProviderRateLimitConfig: CloudProviderRateLimitConfig{ - RateLimitConfig: azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - InterfaceRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - VirtualMachineRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - StorageAccountRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - DiskRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - VirtualMachineScaleSetRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - KubernetesServiceRateLimit: &azclients.RateLimitConfig{ - CloudProviderRateLimit: false, - CloudProviderRateLimitBucket: 5, - CloudProviderRateLimitBucketWrite: 5, - CloudProviderRateLimitQPS: 1, - CloudProviderRateLimitQPSWrite: 1, - }, - }, - } - - cloudConfig := "cfg.json" - azureCfg := `{ - "cloud": "AzurePublicCloud", - "tenantId": "tenantId", - "subscriptionId": "subId", - "aadClientId": "clientId", - "aadClientSecret": "clientsecret", - "resourceGroup": "rg", - "location": "southeastasia" - }` - err := ioutil.WriteFile(cloudConfig, []byte(azureCfg), 0644) - assert.Nil(t, err) - defer os.Remove(cloudConfig) - - os.Setenv("ARM_CLOUD", "AzurePublicCloud") - os.Setenv("LOCATION", "southeastasia") - os.Setenv("ARM_RESOURCE_GROUP", "rg") - os.Setenv("ARM_SUBSCRIPTION_ID", "subId") - os.Setenv("ARM_TENANT_ID", "tenantId") - os.Setenv("ARM_CLIENT_ID", "clientId") - os.Setenv("ARM_CLIENT_SECRET", "clientsecret") - - resourceLimiter := &cloudprovider.ResourceLimiter{} - discoveryOptions := cloudprovider.NodeGroupDiscoveryOptions{} - - testCases := []struct { - name string - cloudConfig string - }{ - { - name: "BuildAzure should create Azure Manager using cloud-config file", - cloudConfig: cloudConfig, - }, - { - name: "BuildAzure should create Azure Manager using environment variable", - }, - } - - for _, test := range testCases { - opts := config.AutoscalingOptions{ - CloudConfig: test.cloudConfig, - } - cloudProvider := BuildAzure(opts, discoveryOptions, resourceLimiter) - assert.Equal(t, expectedConfig, cloudProvider.(*AzureCloudProvider).azureManager.config, test.name) - } - - os.Unsetenv("ARM_CLOUD") - os.Unsetenv("LOCATION") - os.Unsetenv("ARM_RESOURCE_GROUP") - os.Unsetenv("ARM_SUBSCRIPTION_ID") - os.Unsetenv("ARM_TENANT_ID") - os.Unsetenv("ARM_CLIENT_ID") - os.Unsetenv("ARM_CLIENT_SECRET") -} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index ac010f804db1..cb986309cdd3 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -47,14 +47,14 @@ type AzureManager struct { azClient *azClient env azure.Environment - asgCache *asgCache - lastRefresh time.Time - asgAutoDiscoverySpecs []labelAutoDiscoveryConfig - explicitlyConfigured map[string]bool + azureCache *azureCache + lastRefresh time.Time + autoDiscoverySpecs []labelAutoDiscoveryConfig + explicitlyConfigured map[string]bool } -// CreateAzureManager creates Azure Manager object to work with Azure. -func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AzureManager, error) { +// createAzureManagerInternal allows for a custom azClient to be passed in by tests. +func createAzureManagerInternal(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, azClient *azClient) (*AzureManager, error) { cfg, err := BuildAzureConfig(configReader) if err != nil { return nil, err @@ -71,9 +71,11 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node klog.Infof("Starting azure manager with subscription ID %q", cfg.SubscriptionID) - azClient, err := newAzClient(cfg, &env) - if err != nil { - return nil, err + if azClient == nil { + azClient, err = newAzClient(cfg, &env) + if err != nil { + return nil, err + } } // Create azure manager. @@ -84,19 +86,23 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node explicitlyConfigured: make(map[string]bool), } - cache, err := newAsgCache() + cacheTTL := refreshInterval + if cfg.VmssCacheTTL != 0 { + cacheTTL = time.Duration(cfg.VmssCacheTTL) * time.Second + } + cache, err := newAzureCache(azClient, cacheTTL, cfg.ResourceGroup, cfg.VMType) if err != nil { return nil, err } - manager.asgCache = cache + manager.azureCache = cache specs, err := ParseLabelAutoDiscoverySpecs(discoveryOpts) if err != nil { return nil, err } - manager.asgAutoDiscoverySpecs = specs + manager.autoDiscoverySpecs = specs - if err := manager.fetchExplicitAsgs(discoveryOpts.NodeGroupSpecs); err != nil { + if err := manager.fetchExplicitNodeGroups(discoveryOpts.NodeGroupSpecs); err != nil { return nil, err } @@ -107,28 +113,31 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node return manager, nil } -func (m *AzureManager) fetchExplicitAsgs(specs []string) error { +// CreateAzureManager creates Azure Manager object to work with Azure. +func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AzureManager, error) { + return createAzureManagerInternal(configReader, discoveryOpts, nil) +} + +func (m *AzureManager) fetchExplicitNodeGroups(specs []string) error { changed := false for _, spec := range specs { - asg, err := m.buildAsgFromSpec(spec) + nodeGroup, err := m.buildNodeGroupFromSpec(spec) if err != nil { return fmt.Errorf("failed to parse node group spec: %v", err) } - if m.RegisterAsg(asg) { + if m.RegisterNodeGroup(nodeGroup) { changed = true } - m.explicitlyConfigured[asg.Id()] = true + m.explicitlyConfigured[nodeGroup.Id()] = true } if changed { - if err := m.regenerateCache(); err != nil { - return err - } + m.invalidateCache() } return nil } -func (m *AzureManager) buildAsgFromSpec(spec string) (cloudprovider.NodeGroup, error) { +func (m *AzureManager) buildNodeGroupFromSpec(spec string) (cloudprovider.NodeGroup, error) { scaleToZeroSupported := scaleToZeroSupportedStandard if strings.EqualFold(m.config.VMType, vmTypeVMSS) { scaleToZeroSupported = scaleToZeroSupportedVMSS @@ -153,125 +162,110 @@ func (m *AzureManager) buildAsgFromSpec(spec string) (cloudprovider.NodeGroup, e // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (m *AzureManager) Refresh() error { - if m.lastRefresh.Add(refreshInterval).After(time.Now()) { + if m.lastRefresh.Add(m.azureCache.refreshInterval).After(time.Now()) { return nil } return m.forceRefresh() } func (m *AzureManager) forceRefresh() error { - // TODO: Refactor some of this logic out of forceRefresh and - // consider merging the list call with the Nodes() call - if err := m.fetchAutoAsgs(); err != nil { - klog.Errorf("Failed to fetch ASGs: %v", err) - } - if err := m.regenerateCache(); err != nil { - klog.Errorf("Failed to regenerate ASG cache: %v", err) + if err := m.azureCache.regenerate(); err != nil { + klog.Errorf("Failed to regenerate Azure cache: %v", err) return err } m.lastRefresh = time.Now() - klog.V(2).Infof("Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval)) + klog.V(2).Infof("Refreshed Azure VM and VMSS list, next refresh after %v", m.lastRefresh.Add(m.azureCache.refreshInterval)) return nil } -// Fetch automatically discovered ASGs. These ASGs should be unregistered if +func (m *AzureManager) invalidateCache() { + m.lastRefresh = time.Now().Add(-1 * m.azureCache.refreshInterval) + klog.V(2).Infof("Invalidated Azure cache") +} + +// Fetch automatically discovered NodeGroups. These NodeGroups should be unregistered if // they no longer exist in Azure. -func (m *AzureManager) fetchAutoAsgs() error { - groups, err := m.getFilteredAutoscalingGroups(m.asgAutoDiscoverySpecs) +func (m *AzureManager) fetchAutoNodeGroups() error { + groups, err := m.getFilteredNodeGroups(m.autoDiscoverySpecs) if err != nil { - return fmt.Errorf("cannot autodiscover ASGs: %s", err) + return fmt.Errorf("cannot autodiscover NodeGroups: %s", err) } changed := false exists := make(map[string]bool) - for _, asg := range groups { - asgID := asg.Id() - exists[asgID] = true - if m.explicitlyConfigured[asgID] { - // This ASG was explicitly configured, but would also be + for _, group := range groups { + id := group.Id() + exists[id] = true + if m.explicitlyConfigured[id] { + // This NodeGroup was explicitly configured, but would also be // autodiscovered. We want the explicitly configured min and max // nodes to take precedence. - klog.V(3).Infof("Ignoring explicitly configured ASG %s for autodiscovery.", asg.Id()) + klog.V(3).Infof("Ignoring explicitly configured NodeGroup %s for autodiscovery.", group.Id()) continue } - if m.RegisterAsg(asg) { - klog.V(3).Infof("Autodiscovered ASG %s using tags %v", asg.Id(), m.asgAutoDiscoverySpecs) + if m.RegisterNodeGroup(group) { + klog.V(3).Infof("Autodiscovered NodeGroup %s using tags %v", group.Id(), m.autoDiscoverySpecs) changed = true } } - for _, asg := range m.getAsgs() { - asgID := asg.Id() - if !exists[asgID] && !m.explicitlyConfigured[asgID] { - m.UnregisterAsg(asg) + for _, nodeGroup := range m.getNodeGroups() { + nodeGroupID := nodeGroup.Id() + if !exists[nodeGroupID] && !m.explicitlyConfigured[nodeGroupID] { + m.UnregisterNodeGroup(nodeGroup) changed = true } } if changed { - if err := m.regenerateCache(); err != nil { - return err - } + m.invalidateCache() } return nil } -func (m *AzureManager) getAsgs() []cloudprovider.NodeGroup { - return m.asgCache.get() -} - -// RegisterAsg registers an ASG. -func (m *AzureManager) RegisterAsg(asg cloudprovider.NodeGroup) bool { - return m.asgCache.Register(asg) +func (m *AzureManager) getNodeGroups() []cloudprovider.NodeGroup { + return m.azureCache.getRegisteredNodeGroups() } -// UnregisterAsg unregisters an ASG. -func (m *AzureManager) UnregisterAsg(asg cloudprovider.NodeGroup) bool { - return m.asgCache.Unregister(asg) +// RegisterNodeGroup registers an a NodeGroup. +func (m *AzureManager) RegisterNodeGroup(nodeGroup cloudprovider.NodeGroup) bool { + return m.azureCache.Register(nodeGroup) } -// GetAsgForInstance returns AsgConfig of the given Instance -func (m *AzureManager) GetAsgForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) { - return m.asgCache.FindForInstance(instance, m.config.VMType) +// UnregisterNodeGroup unregisters a NodeGroup. +func (m *AzureManager) UnregisterNodeGroup(nodeGroup cloudprovider.NodeGroup) bool { + return m.azureCache.Unregister(nodeGroup) } -func (m *AzureManager) regenerateCache() error { - m.asgCache.mutex.Lock() - defer m.asgCache.mutex.Unlock() - return m.asgCache.regenerate() +// GetNodeGroupForInstance returns the NodeGroup of the given Instance +func (m *AzureManager) GetNodeGroupForInstance(instance *azureRef) (cloudprovider.NodeGroup, error) { + return m.azureCache.FindForInstance(instance, m.config.VMType) } -// Cleanup the ASG cache. +// Cleanup the cache. func (m *AzureManager) Cleanup() { - m.asgCache.Cleanup() + m.azureCache.Cleanup() } -func (m *AzureManager) getFilteredAutoscalingGroups(filter []labelAutoDiscoveryConfig) (asgs []cloudprovider.NodeGroup, err error) { +func (m *AzureManager) getFilteredNodeGroups(filter []labelAutoDiscoveryConfig) (nodeGroups []cloudprovider.NodeGroup, err error) { if len(filter) == 0 { return nil, nil } if m.config.VMType == vmTypeVMSS { - return m.listScaleSets(filter) + return m.getFilteredScaleSets(filter) } return nil, fmt.Errorf("vmType %q does not support autodiscovery", m.config.VMType) } -// listScaleSets gets a list of scale sets and instanceIDs. -func (m *AzureManager) listScaleSets(filter []labelAutoDiscoveryConfig) ([]cloudprovider.NodeGroup, error) { - ctx, cancel := getContextWithCancel() - defer cancel() - - result, rerr := m.azClient.virtualMachineScaleSetsClient.List(ctx, m.config.ResourceGroup) - if rerr != nil { - klog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", m.config.ResourceGroup, rerr) - return nil, rerr.Error() - } +// getFilteredScaleSets gets a list of scale sets and instanceIDs. +func (m *AzureManager) getFilteredScaleSets(filter []labelAutoDiscoveryConfig) ([]cloudprovider.NodeGroup, error) { + vmssList := m.azureCache.getScaleSets() - var asgs []cloudprovider.NodeGroup - for _, scaleSet := range result { + var nodeGroups []cloudprovider.NodeGroup + for _, scaleSet := range vmssList { if len(filter) > 0 { if scaleSet.Tags == nil || len(scaleSet.Tags) == 0 { continue @@ -292,30 +286,30 @@ func (m *AzureManager) listScaleSets(filter []labelAutoDiscoveryConfig) ([]cloud if minSize, err := strconv.Atoi(*val); err == nil { spec.MinSize = minSize } else { - klog.Warningf("ignoring nodegroup %q because of invalid minimum size specified for vmss: %s", *scaleSet.Name, err) + klog.Warningf("ignoring vmss %q because of invalid minimum size specified for vmss: %s", *scaleSet.Name, err) continue } } else { - klog.Warningf("ignoring nodegroup %q because of no minimum size specified for vmss", *scaleSet.Name) + klog.Warningf("ignoring vmss %q because of no minimum size specified for vmss", *scaleSet.Name) continue } if spec.MinSize < 0 { - klog.Warningf("ignoring nodegroup %q because of minimum size must be a non-negative number of nodes", *scaleSet.Name) + klog.Warningf("ignoring vmss %q because of minimum size must be a non-negative number of nodes", *scaleSet.Name) continue } if val, ok := scaleSet.Tags["max"]; ok { if maxSize, err := strconv.Atoi(*val); err == nil { spec.MaxSize = maxSize } else { - klog.Warningf("ignoring nodegroup %q because of invalid maximum size specified for vmss: %s", *scaleSet.Name, err) + klog.Warningf("ignoring vmss %q because of invalid maximum size specified for vmss: %s", *scaleSet.Name, err) continue } } else { - klog.Warningf("ignoring nodegroup %q because of no maximum size specified for vmss", *scaleSet.Name) + klog.Warningf("ignoring vmss %q because of no maximum size specified for vmss", *scaleSet.Name) continue } if spec.MaxSize < spec.MinSize { - klog.Warningf("ignoring nodegroup %q because of maximum size must be greater than minimum size: max=%d < min=%d", *scaleSet.Name, spec.MaxSize, spec.MinSize) + klog.Warningf("ignoring vmss %q because of maximum size must be greater than minimum size: max=%d < min=%d", *scaleSet.Name, spec.MaxSize, spec.MinSize) continue } @@ -324,13 +318,13 @@ func (m *AzureManager) listScaleSets(filter []labelAutoDiscoveryConfig) ([]cloud curSize = *scaleSet.Sku.Capacity } - asg, err := NewScaleSet(spec, m, curSize) + vmss, err := NewScaleSet(spec, m, curSize) if err != nil { - klog.Warningf("ignoring nodegroup %q %s", *scaleSet.Name, err) + klog.Warningf("ignoring vmss %q %s", *scaleSet.Name, err) continue } - asgs = append(asgs, asg) + nodeGroups = append(nodeGroups, vmss) } - return asgs, nil + return nodeGroups, nil } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 6859e6ee893c..2525521fba18 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -18,24 +18,23 @@ package azure import ( "fmt" + "k8s.io/legacy-cloud-providers/azure/clients/vmclient/mockvmclient" "os" "reflect" "strings" "testing" "time" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - azclients "k8s.io/legacy-cloud-providers/azure/clients" - "k8s.io/legacy-cloud-providers/azure/clients/vmssclient/mockvmssclient" - "k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient" - "k8s.io/legacy-cloud-providers/azure/retry" - "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute" "github.com/Azure/azure-sdk-for-go/services/resources/mgmt/2017-05-10/resources" "github.com/Azure/go-autorest/autorest/date" "github.com/Azure/go-autorest/autorest/to" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + azclients "k8s.io/legacy-cloud-providers/azure/clients" + "k8s.io/legacy-cloud-providers/azure/clients/vmssclient/mockvmssclient" + "k8s.io/legacy-cloud-providers/azure/clients/vmssvmclient/mockvmssvmclient" ) const validAzureCfg = `{ @@ -139,7 +138,16 @@ const validAzureCfgForStandardVMTypeWithoutDeploymentParameters = `{ const invalidAzureCfg = `{{}"cloud": "AzurePublicCloud",}` func TestCreateAzureManagerValidConfig(t *testing.T) { - manager, err := CreateAzureManager(strings.NewReader(validAzureCfg), cloudprovider.NodeGroupDiscoveryOptions{}) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockVMClient := mockvmclient.NewMockInterface(ctrl) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), "fakeId").Return([]compute.VirtualMachineScaleSet{}, nil).Times(2) + mockAzClient := &azClient{ + virtualMachinesClient: mockVMClient, + virtualMachineScaleSetsClient: mockVMSSClient, + } + manager, err := createAzureManagerInternal(strings.NewReader(validAzureCfg), cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedConfig := &Config{ Cloud: "AzurePublicCloud", @@ -212,7 +220,17 @@ func TestCreateAzureManagerValidConfig(t *testing.T) { } func TestCreateAzureManagerValidConfigForStandardVMType(t *testing.T) { - manager, err := CreateAzureManager(strings.NewReader(validAzureCfgForStandardVMType), cloudprovider.NodeGroupDiscoveryOptions{}) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockVMClient := mockvmclient.NewMockInterface(ctrl) + mockVMClient.EXPECT().List(gomock.Any(), "fakeId").Return([]compute.VirtualMachine{}, nil).Times(2) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockAzClient := &azClient{ + virtualMachinesClient: mockVMClient, + virtualMachineScaleSetsClient: mockVMSSClient, + } + manager, err := createAzureManagerInternal(strings.NewReader(validAzureCfgForStandardVMType), cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) + expectedConfig := &Config{ Cloud: "AzurePublicCloud", Location: "southeastasia", @@ -307,13 +325,23 @@ func TestCreateAzureManagerValidConfigForStandardVMType(t *testing.T) { } func TestCreateAzureManagerValidConfigForStandardVMTypeWithoutDeploymentParameters(t *testing.T) { - manager, err := CreateAzureManager(strings.NewReader(validAzureCfgForStandardVMTypeWithoutDeploymentParameters), cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err := createAzureManagerInternal(strings.NewReader(validAzureCfgForStandardVMTypeWithoutDeploymentParameters), cloudprovider.NodeGroupDiscoveryOptions{}, &azClient{}) expectedErr := "open /var/lib/azure/azuredeploy.parameters.json: no such file or directory" assert.Nil(t, manager) assert.Equal(t, expectedErr, err.Error(), "return error does not match, expected: %v, actual: %v", expectedErr, err.Error()) } func TestCreateAzureManagerWithNilConfig(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockVMClient := mockvmclient.NewMockInterface(ctrl) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), "resourceGroup").Return([]compute.VirtualMachineScaleSet{}, nil).AnyTimes() + mockAzClient := &azClient{ + virtualMachinesClient: mockVMClient, + virtualMachineScaleSetsClient: mockVMSSClient, + } + expectedConfig := &Config{ Cloud: "AzurePublicCloud", Location: "southeastasia", @@ -418,13 +446,13 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { os.Setenv("BACKOFF_JITTER", "1") os.Setenv("CLOUD_PROVIDER_RATE_LIMIT", "true") - manager, err := CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, true, reflect.DeepEqual(*expectedConfig, *manager.config), "unexpected azure manager configuration") // invalid bool for ARM_USE_MANAGED_IDENTITY_EXTENSION os.Setenv("ARM_USE_MANAGED_IDENTITY_EXTENSION", "invalidbool") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr0 := "strconv.ParseBool: parsing \"invalidbool\": invalid syntax" assert.Nil(t, manager) assert.Equal(t, expectedErr0, err.Error(), "Return err does not match, expected: %v, actual: %v", expectedErr0, err.Error()) @@ -433,7 +461,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid int for AZURE_VMSS_CACHE_TTL os.Setenv("AZURE_VMSS_CACHE_TTL", "invalidint") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr := fmt.Errorf("failed to parse AZURE_VMSS_CACHE_TTL \"invalidint\": strconv.ParseInt: parsing \"invalidint\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -442,7 +470,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid int for AZURE_MAX_DEPLOYMENT_COUNT os.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "invalidint") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse AZURE_MAX_DEPLOYMENT_COUNT \"invalidint\": strconv.ParseInt: parsing \"invalidint\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -451,7 +479,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // zero AZURE_MAX_DEPLOYMENT_COUNT will use default value os.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "0") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, int64(defaultMaxDeploymentsCount), (*manager.config).MaxDeploymentsCount, "MaxDeploymentsCount does not match.") // revert back to good AZURE_MAX_DEPLOYMENT_COUNT @@ -459,7 +487,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid bool for ENABLE_BACKOFF os.Setenv("ENABLE_BACKOFF", "invalidbool") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse ENABLE_BACKOFF \"invalidbool\": strconv.ParseBool: parsing \"invalidbool\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -468,7 +496,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid int for BACKOFF_RETRIES os.Setenv("BACKOFF_RETRIES", "invalidint") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse BACKOFF_RETRIES '\\x00': strconv.ParseInt: parsing \"invalidint\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -477,7 +505,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // empty BACKOFF_RETRIES will use default value os.Setenv("BACKOFF_RETRIES", "") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, backoffRetriesDefault, (*manager.config).CloudProviderBackoffRetries, "CloudProviderBackoffRetries does not match.") // revert back to good BACKOFF_RETRIES @@ -485,7 +513,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid float for BACKOFF_EXPONENT os.Setenv("BACKOFF_EXPONENT", "invalidfloat") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse BACKOFF_EXPONENT \"invalidfloat\": strconv.ParseFloat: parsing \"invalidfloat\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -494,7 +522,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // empty BACKOFF_EXPONENT will use default value os.Setenv("BACKOFF_EXPONENT", "") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, backoffExponentDefault, (*manager.config).CloudProviderBackoffExponent, "CloudProviderBackoffExponent does not match.") // revert back to good BACKOFF_EXPONENT @@ -502,7 +530,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid int for BACKOFF_DURATION os.Setenv("BACKOFF_DURATION", "invalidint") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse BACKOFF_DURATION \"invalidint\": strconv.ParseInt: parsing \"invalidint\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -511,7 +539,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // empty BACKOFF_DURATION will use default value os.Setenv("BACKOFF_DURATION", "") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, backoffDurationDefault, (*manager.config).CloudProviderBackoffDuration, "CloudProviderBackoffDuration does not match.") // revert back to good BACKOFF_DURATION @@ -519,7 +547,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid float for BACKOFF_JITTER os.Setenv("BACKOFF_JITTER", "invalidfloat") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse BACKOFF_JITTER \"invalidfloat\": strconv.ParseFloat: parsing \"invalidfloat\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -528,7 +556,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // empty BACKOFF_JITTER will use default value os.Setenv("BACKOFF_JITTER", "") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) assert.NoError(t, err) assert.Equal(t, backoffJitterDefault, (*manager.config).CloudProviderBackoffJitter, "CloudProviderBackoffJitter does not match.") // revert back to good BACKOFF_JITTER @@ -536,7 +564,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { // invalid bool for CLOUD_PROVIDER_RATE_LIMIT os.Setenv("CLOUD_PROVIDER_RATE_LIMIT", "invalidbool") - manager, err = CreateAzureManager(nil, cloudprovider.NodeGroupDiscoveryOptions{}) + manager, err = createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient) expectedErr = fmt.Errorf("failed to parse CLOUD_PROVIDER_RATE_LIMIT: \"invalidbool\", strconv.ParseBool: parsing \"invalidbool\": invalid syntax") assert.Nil(t, manager) assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err) @@ -569,11 +597,11 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) { } func TestCreateAzureManagerInvalidConfig(t *testing.T) { - _, err := CreateAzureManager(strings.NewReader(invalidAzureCfg), cloudprovider.NodeGroupDiscoveryOptions{}) + _, err := createAzureManagerInternal(strings.NewReader(invalidAzureCfg), cloudprovider.NodeGroupDiscoveryOptions{}, &azClient{}) assert.Error(t, err, "failed to unmarshal config body") } -func TestFetchExplicitAsgs(t *testing.T) { +func TestFetchExplicitNodeGroups(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -594,9 +622,9 @@ func TestFetchExplicitAsgs(t *testing.T) { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - manager.fetchExplicitAsgs(ngdo.NodeGroupSpecs) + manager.fetchExplicitNodeGroups(ngdo.NodeGroupSpecs) - asgs := manager.asgCache.get() + asgs := manager.azureCache.getRegisteredNodeGroups() assert.Equal(t, 1, len(asgs)) assert.Equal(t, name, asgs[0].Id()) assert.Equal(t, min, asgs[0].MinSize()) @@ -618,122 +646,17 @@ func TestFetchExplicitAsgs(t *testing.T) { }, } testAS.manager.config.VMType = vmTypeStandard - err := testAS.manager.fetchExplicitAsgs([]string{"1:5:testAS"}) + err := testAS.manager.fetchExplicitNodeGroups([]string{"1:5:testAS"}) expectedErr := fmt.Errorf("failed to parse node group spec: deployment not found") - assert.Equal(t, expectedErr, err, "testAS.manager.fetchExplicitAsgs return error does not match, expected: %v, actual: %v", expectedErr, err) - err = testAS.manager.fetchExplicitAsgs(nil) + assert.Equal(t, expectedErr, err, "testAS.manager.fetchExplicitNodeGroups return error does not match, expected: %v, actual: %v", expectedErr, err) + err = testAS.manager.fetchExplicitNodeGroups(nil) assert.NoError(t, err) // test invalidVMType manager.config.VMType = "invalidVMType" - err = manager.fetchExplicitAsgs(ngdo.NodeGroupSpecs) + err = manager.fetchExplicitNodeGroups(ngdo.NodeGroupSpecs) expectedErr = fmt.Errorf("failed to parse node group spec: vmtype invalidVMType not supported") - assert.Equal(t, expectedErr, err, "manager.fetchExplicitAsgs return error does not match, expected: %v, actual: %v", expectedErr, err) -} - -func TestListScalesets(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - manager := newTestAzureManager(t) - vmssTag := "fake-tag" - vmssTagValue := "fake-value" - vmssName := "test-vmss" - - ngdo := cloudprovider.NodeGroupDiscoveryOptions{ - NodeGroupAutoDiscoverySpecs: []string{fmt.Sprintf("label:%s=%s", vmssTag, vmssTagValue)}, - } - specs, err := ParseLabelAutoDiscoverySpecs(ngdo) - assert.NoError(t, err) - - testCases := []struct { - name string - specs map[string]string - isListVMSSFail bool - expected []cloudprovider.NodeGroup - expectedErrString string - }{ - { - name: "ValidMinMax", - specs: map[string]string{"min": "5", "max": "50"}, - expected: []cloudprovider.NodeGroup{&ScaleSet{ - azureRef: azureRef{ - Name: vmssName, - }, - minSize: 5, - maxSize: 50, - manager: manager, - curSize: 3, - sizeRefreshPeriod: defaultVmssSizeRefreshPeriod, - instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, - }}, - }, - { - name: "InvalidMin", - specs: map[string]string{"min": "some-invalid-string"}, - }, - { - name: "NoMin", - specs: map[string]string{"max": "50"}, - }, - { - name: "InvalidMax", - specs: map[string]string{"min": "5", "max": "some-invalid-string"}, - }, - { - name: "NoMax", - specs: map[string]string{"min": "5"}, - }, - { - name: "MinLessThanZero", - specs: map[string]string{"min": "-4", "max": "20"}, - }, - { - name: "MinGreaterThanMax", - specs: map[string]string{"min": "50", "max": "5"}, - }, - { - name: "ListVMSSFail", - specs: map[string]string{"min": "5", "max": "50"}, - isListVMSSFail: true, - expectedErrString: "List VMSS failed", - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - tags := make(map[string]*string) - tags[vmssTag] = &vmssTagValue - if val, ok := tc.specs["min"]; ok { - tags["min"] = &val - } - if val, ok := tc.specs["max"]; ok { - tags["max"] = &val - } - - expectedScaleSets := []compute.VirtualMachineScaleSet{fakeVMSSWithTags(vmssName, tags)} - mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) - if tc.isListVMSSFail { - mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(nil, &retry.Error{RawError: fmt.Errorf("List VMSS failed")}).AnyTimes() - } else { - mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() - } - manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient - - asgs, err := manager.listScaleSets(specs) - if tc.expected != nil { - assert.NoError(t, err) - assert.True(t, assert.ObjectsAreEqualValues(tc.expected, asgs), "expected %#v, but found: %#v", tc.expected, asgs) - return - } - assert.Len(t, asgs, 0) - if tc.expectedErrString == "" { - assert.NoError(t, err) - return - } - assert.Error(t, err) - assert.Contains(t, err.Error(), tc.expectedErrString) - }) - } + assert.Equal(t, expectedErr, err, "manager.fetchExplicitNodeGroups return error does not match, expected: %v, actual: %v", expectedErr, err) } func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { @@ -757,11 +680,13 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + err := manager.forceRefresh() + assert.NoError(t, err) specs, err := ParseLabelAutoDiscoverySpecs(ngdo) assert.NoError(t, err) - asgs, err := manager.getFilteredAutoscalingGroups(specs) + asgs, err := manager.getFilteredNodeGroups(specs) assert.NoError(t, err) expectedAsgs := []cloudprovider.NodeGroup{&ScaleSet{ azureRef: azureRef{ @@ -771,7 +696,6 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { maxSize: maxVal, manager: manager, curSize: 3, - sizeRefreshPeriod: defaultVmssSizeRefreshPeriod, instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) @@ -796,13 +720,13 @@ func TestGetFilteredAutoscalingGroupsWithInvalidVMType(t *testing.T) { assert.NoError(t, err) expectedErr := fmt.Errorf("vmType \"aks\" does not support autodiscovery") - asgs, err2 := manager.getFilteredAutoscalingGroups(specs) + asgs, err2 := manager.getFilteredNodeGroups(specs) assert.Nil(t, asgs) assert.Equal(t, expectedErr, err2, "Not match, expected: %v, actual: %v", expectedErr, err2) manager.config.VMType = "invalidVMType" expectedErr = fmt.Errorf("vmType \"invalidVMType\" does not support autodiscovery") - asgs, err2 = manager.getFilteredAutoscalingGroups(specs) + asgs, err2 = manager.getFilteredNodeGroups(specs) assert.Nil(t, asgs) assert.Equal(t, expectedErr, err2, "Not match, expected: %v, actual: %v", expectedErr, err2) } @@ -833,17 +757,19 @@ func TestFetchAutoAsgsVmss(t *testing.T) { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + err := manager.forceRefresh() + assert.NoError(t, err) specs, err := ParseLabelAutoDiscoverySpecs(ngdo) assert.NoError(t, err) - manager.asgAutoDiscoverySpecs = specs + manager.autoDiscoverySpecs = specs // assert cache is empty before fetching auto asgs - asgs := manager.asgCache.get() + asgs := manager.azureCache.getRegisteredNodeGroups() assert.Equal(t, 0, len(asgs)) - manager.fetchAutoAsgs() - asgs = manager.asgCache.get() + manager.fetchAutoNodeGroups() + asgs = manager.azureCache.getRegisteredNodeGroups() assert.Equal(t, 1, len(asgs)) assert.Equal(t, vmssName, asgs[0].Id()) assert.Equal(t, minVal, asgs[0].MinSize()) @@ -851,8 +777,8 @@ func TestFetchAutoAsgsVmss(t *testing.T) { // test explicitlyConfigured manager.explicitlyConfigured[vmssName] = true - manager.fetchAutoAsgs() - asgs = manager.asgCache.get() + manager.fetchAutoNodeGroups() + asgs = manager.azureCache.getRegisteredNodeGroups() assert.Equal(t, 1, len(asgs)) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index f4d71f7e78eb..0313866cc127 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -19,7 +19,6 @@ package azure import ( "fmt" "math/rand" - "net/http" "strings" "sync" "time" @@ -32,39 +31,16 @@ import ( "k8s.io/legacy-cloud-providers/azure/retry" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute" - "github.com/Azure/go-autorest/autorest" "github.com/Azure/go-autorest/autorest/azure" "github.com/Azure/go-autorest/autorest/to" ) var ( - defaultVmssSizeRefreshPeriod = 15 * time.Second defaultVmssInstancesRefreshPeriod = 5 * time.Minute vmssContextTimeout = 3 * time.Minute vmssSizeMutex sync.Mutex ) -var scaleSetStatusCache struct { - lastRefresh time.Time - mutex sync.Mutex - scaleSets map[string]compute.VirtualMachineScaleSet -} - -func init() { - // In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L242, - // if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count, - // hence the Azure clients will keep retrying forever until it get a status code other than 429. - // So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry. - // Refer https://github.com/Azure/go-autorest/issues/398. - statusCodesForRetry := make([]int, 0) - for _, code := range autorest.StatusCodesForRetry { - if code != http.StatusTooManyRequests { - statusCodesForRetry = append(statusCodesForRetry, code) - } - } - autorest.StatusCodesForRetry = statusCodesForRetry -} - // ScaleSet implements NodeGroup interface. type ScaleSet struct { azureRef @@ -100,12 +76,6 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) ( instancesRefreshJitter: az.config.VmssVmsCacheJitter, } - if az.config.VmssCacheTTL != 0 { - scaleSet.sizeRefreshPeriod = time.Duration(az.config.VmssCacheTTL) * time.Second - } else { - scaleSet.sizeRefreshPeriod = defaultVmssSizeRefreshPeriod - } - if az.config.VmssVmsCacheTTL != 0 { scaleSet.instancesRefreshPeriod = time.Duration(az.config.VmssVmsCacheTTL) * time.Second } else { @@ -147,50 +117,14 @@ func (scaleSet *ScaleSet) MaxSize() int { return scaleSet.maxSize } -func (scaleSet *ScaleSet) getVMSSInfo() (compute.VirtualMachineScaleSet, *retry.Error) { - scaleSetStatusCache.mutex.Lock() - defer scaleSetStatusCache.mutex.Unlock() - - if scaleSetStatusCache.lastRefresh.Add(scaleSet.sizeRefreshPeriod).After(time.Now()) { - if status, exists := scaleSetStatusCache.scaleSets[scaleSet.Name]; exists { - return status, nil - } - } - - var allVMSS []compute.VirtualMachineScaleSet - var rerr *retry.Error - - allVMSS, rerr = scaleSet.getAllVMSSInfo() - if rerr != nil { - return compute.VirtualMachineScaleSet{}, rerr - } - - var newStatus = make(map[string]compute.VirtualMachineScaleSet) - for _, vmss := range allVMSS { - newStatus[*vmss.Name] = vmss - } +func (scaleSet *ScaleSet) getVMSSFromCache() (compute.VirtualMachineScaleSet, *retry.Error) { + allVMSS := scaleSet.manager.azureCache.getScaleSets() - scaleSetStatusCache.lastRefresh = time.Now() - scaleSetStatusCache.scaleSets = newStatus - - if _, exists := scaleSetStatusCache.scaleSets[scaleSet.Name]; !exists { + if _, exists := allVMSS[scaleSet.Name]; !exists { return compute.VirtualMachineScaleSet{}, &retry.Error{RawError: fmt.Errorf("could not find vmss: %s", scaleSet.Name)} } - return scaleSetStatusCache.scaleSets[scaleSet.Name], nil -} - -func (scaleSet *ScaleSet) getAllVMSSInfo() ([]compute.VirtualMachineScaleSet, *retry.Error) { - ctx, cancel := getContextWithTimeout(vmssContextTimeout) - defer cancel() - - resourceGroup := scaleSet.manager.config.ResourceGroup - setInfo, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.List(ctx, resourceGroup) - if rerr != nil { - return []compute.VirtualMachineScaleSet{}, rerr - } - - return setInfo, nil + return allVMSS[scaleSet.Name], nil } func (scaleSet *ScaleSet) getCurSize() (int64, error) { @@ -202,14 +136,8 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { } klog.V(5).Infof("Get scale set size for %q", scaleSet.Name) - set, rerr := scaleSet.getVMSSInfo() + set, rerr := scaleSet.getVMSSFromCache() if rerr != nil { - if isAzureRequestsThrottled(rerr) { - // Log a warning and update the size refresh time so that it would retry after next sizeRefreshPeriod. - klog.Warningf("getVMSSInfo() is throttled with message %v, would return the cached vmss size", rerr) - scaleSet.lastSizeRefresh = time.Now() - return scaleSet.curSize, nil - } return -1, rerr.Error() } @@ -262,7 +190,7 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { if err != nil { klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) // Invalidate the VMSS size cache in order to fetch the size from the API. - scaleSet.invalidateStatusCacheWithLock() + scaleSet.manager.invalidateCache() } }() @@ -287,7 +215,7 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { scaleSet.sizeMutex.Lock() defer scaleSet.sizeMutex.Unlock() - vmssInfo, rerr := scaleSet.getVMSSInfo() + vmssInfo, rerr := scaleSet.getVMSSFromCache() if rerr != nil { klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, rerr) return rerr.Error() @@ -373,7 +301,7 @@ func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, // It is assumed that cloud provider will not delete the existing nodes if the size // when there is an option to just decrease the target. func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { - // VMSS size would be changed automatically after the Node deletion, hence this operation is not required. + // VMSS size should be changed automatically after the Node deletion, hence this operation is not required. // To prevent some unreproducible bugs, an extra refresh of cache is needed. scaleSet.invalidateInstanceCache() _, err := scaleSet.GetScaleSetSize() @@ -391,7 +319,7 @@ func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) { Name: node.Spec.ProviderID, } - targetAsg, err := scaleSet.manager.GetAsgForInstance(ref) + targetAsg, err := scaleSet.manager.GetNodeGroupForInstance(ref) if err != nil { return false, err } @@ -412,14 +340,14 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { klog.V(3).Infof("Deleting vmss instances %v", instances) - commonAsg, err := scaleSet.manager.GetAsgForInstance(instances[0]) + commonAsg, err := scaleSet.manager.GetNodeGroupForInstance(instances[0]) if err != nil { return err } instancesToDelete := []*azureRef{} for _, instance := range instances { - asg, err := scaleSet.manager.GetAsgForInstance(instance) + asg, err := scaleSet.manager.GetNodeGroupForInstance(instance) if err != nil { return err } @@ -528,7 +456,7 @@ func (scaleSet *ScaleSet) Debug() string { // TemplateNodeInfo returns a node template for this scale set. func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { - template, rerr := scaleSet.getVMSSInfo() + template, rerr := scaleSet.getVMSSFromCache() if rerr != nil { return nil, rerr.Error() } @@ -595,7 +523,7 @@ func buildInstanceCache(vms []compute.VirtualMachineScaleSetVM) []cloudprovider. resourceID, err := convertResourceGroupNameToLower(*vm.ID) if err != nil { - // This shouldn't happen. Log a waring message for tracking. + // This shouldn't happen. Log a warning message for tracking. klog.Warningf("buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err) continue } @@ -656,13 +584,3 @@ func (scaleSet *ScaleSet) invalidateInstanceCache() { scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod) scaleSet.instanceMutex.Unlock() } - -func (scaleSet *ScaleSet) invalidateStatusCacheWithLock() { - scaleSet.sizeMutex.Lock() - scaleSet.lastSizeRefresh = time.Now().Add(-1 * scaleSet.sizeRefreshPeriod) - scaleSet.sizeMutex.Unlock() - - scaleSetStatusCache.mutex.Lock() - scaleSetStatusCache.lastRefresh = time.Now().Add(-1 * scaleSet.sizeRefreshPeriod) - scaleSetStatusCache.mutex.Unlock() -} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index 1d2daf2ac0e1..c411a09bc1a0 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -38,10 +38,9 @@ func newTestScaleSet(manager *AzureManager, name string) *ScaleSet { azureRef: azureRef{ Name: name, }, - manager: manager, - minSize: 1, - maxSize: 5, - sizeRefreshPeriod: defaultVmssSizeRefreshPeriod, + manager: manager, + minSize: 1, + maxSize: 5, } } @@ -76,7 +75,7 @@ func newTestVMSSVMList(count int) []compute.VirtualMachineScaleSetVM { func TestMaxSize(t *testing.T) { provider := newTestProvider(t) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -85,7 +84,7 @@ func TestMaxSize(t *testing.T) { func TestMinSize(t *testing.T) { provider := newTestProvider(t) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -101,13 +100,15 @@ func TestTargetSize(t *testing.T) { provider := newTestProvider(t) mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) - mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil) + mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + err := provider.azureManager.forceRefresh() + assert.NoError(t, err) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -133,15 +134,18 @@ func TestIncreaseSize(t *testing.T) { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + err := provider.azureManager.forceRefresh() + assert.NoError(t, err) ss := newTestScaleSet(provider.azureManager, "test-asg") ss.lastSizeRefresh = time.Now() + ss.sizeRefreshPeriod = 1 * time.Minute ss.curSize = -1 - err := ss.IncreaseSize(100) + err = ss.IncreaseSize(100) expectedErr := fmt.Errorf("the scale set test-asg is under initialization, skipping IncreaseSize") assert.Equal(t, expectedErr, err) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -190,9 +194,9 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "vmss-updating", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - registered := manager.RegisterAsg(newTestScaleSet(manager, vmssName)) + registered := manager.RegisterNodeGroup(newTestScaleSet(manager, vmssName)) assert.True(t, registered) - manager.regenerateCache() + manager.Refresh() provider, err := BuildAzureCloudProvider(manager, nil) assert.NoError(t, err) @@ -219,15 +223,13 @@ func TestBelongs(t *testing.T) { mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) - // TODO: this should call manager.Refresh() once the fetchAutoASG - // logic is refactored out - provider.azureManager.regenerateCache() + provider.azureManager.Refresh() invalidNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ @@ -266,17 +268,15 @@ func TestDeleteNodes(t *testing.T) { expectedVMSSVMs := newTestVMSSVMList(3) mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) - mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() + mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).Times(2) mockVMSSClient.EXPECT().DeleteInstancesAsync(gomock.Any(), manager.config.ResourceGroup, gomock.Any(), gomock.Any()).Return(nil, nil) mockVMSSClient.EXPECT().WaitForAsyncOperationResult(gomock.Any(), gomock.Any()).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes() manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - - // TODO: this should call manager.Refresh() once the fetchAutoASG - // logic is refactored out - manager.regenerateCache() + err := manager.forceRefresh() + assert.NoError(t, err) resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, @@ -284,12 +284,11 @@ func TestDeleteNodes(t *testing.T) { provider, err := BuildAzureCloudProvider(manager, resourceLimiter) assert.NoError(t, err) - registered := manager.RegisterAsg( + registered := manager.RegisterNodeGroup( newTestScaleSet(manager, "test-asg")) assert.True(t, registered) - // TODO: this should call manager.Refresh() once the fetchAutoASG - // logic is refactored out - manager.regenerateCache() + err = manager.forceRefresh() + assert.NoError(t, err) scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) @@ -313,6 +312,21 @@ func TestDeleteNodes(t *testing.T) { } err = scaleSet.DeleteNodes(nodesToDelete) assert.NoError(t, err) + vmssCapacity = 1 + expectedScaleSets = []compute.VirtualMachineScaleSet{ + { + Name: &vmssName, + Sku: &compute.Sku{ + Capacity: &vmssCapacity, + }, + }, + } + mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() + expectedVMSSVMs[0].ProvisioningState = to.StringPtr(string(compute.ProvisioningStateDeleting)) + expectedVMSSVMs[2].ProvisioningState = to.StringPtr(string(compute.ProvisioningStateDeleting)) + mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + err = manager.forceRefresh() + assert.NoError(t, err) // Ensure the the cached size has been proactively decremented by 2 targetSize, err = scaleSet.TargetSize() @@ -327,7 +341,6 @@ func TestDeleteNodes(t *testing.T) { instance2, found := scaleSet.getInstanceByProviderID("azure://" + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 2)) assert.True(t, found, true) assert.Equal(t, instance2.Status.State, cloudprovider.InstanceDeleting) - } func TestDeleteNoConflictRequest(t *testing.T) { @@ -371,9 +384,9 @@ func TestDeleteNoConflictRequest(t *testing.T) { provider, err := BuildAzureCloudProvider(manager, resourceLimiter) assert.NoError(t, err) - registered := manager.RegisterAsg(newTestScaleSet(manager, "test-asg")) + registered := manager.RegisterNodeGroup(newTestScaleSet(manager, "test-asg")) assert.True(t, registered) - manager.regenerateCache() + manager.Refresh() node := &apiv1.Node{ Spec: apiv1.NodeSpec{ @@ -389,7 +402,7 @@ func TestDeleteNoConflictRequest(t *testing.T) { func TestId(t *testing.T) { provider := newTestProvider(t) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -421,11 +434,9 @@ func TestScaleSetNodes(t *testing.T) { mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) - // TODO: this should call manager.Refresh() once the fetchAutoASG - // logic is refactored out - provider.azureManager.regenerateCache() + provider.azureManager.Refresh() assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -462,8 +473,10 @@ func TestTemplateNodeInfo(t *testing.T) { mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + err := provider.azureManager.forceRefresh() + assert.NoError(t, err) - registered := provider.azureManager.RegisterAsg( + registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1)