Skip to content

Commit

Permalink
Merge pull request #5047 from DataDog/azure-cache-sku
Browse files Browse the repository at this point in the history
Azure: effectively cache instance-types SKUs
  • Loading branch information
k8s-ci-robot authored Aug 11, 2022
2 parents 62c3b26 + 20c451b commit bb8e41f
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 36 deletions.
16 changes: 8 additions & 8 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestGetVMsFromCache(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
testAS.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), testAS.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(testAS.manager.azClient, refreshInterval, testAS.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(testAS.manager.azClient, refreshInterval, testAS.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
testAS.manager.azureCache = ac

Expand All @@ -203,7 +203,7 @@ func TestGetVMIndexes(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand Down Expand Up @@ -242,7 +242,7 @@ func TestGetCurSize(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand All @@ -266,7 +266,7 @@ func TestAgentPoolTargetSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand All @@ -285,7 +285,7 @@ func TestAgentPoolIncreaseSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(2)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand Down Expand Up @@ -313,7 +313,7 @@ func TestDecreaseTargetSize(t *testing.T) {
as.manager.azClient.virtualMachinesClient = mockVMClient
expectedVMs := getExpectedVMs()
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil).MaxTimes(3)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand Down Expand Up @@ -431,7 +431,7 @@ func TestAgentPoolDeleteNodes(t *testing.T) {
mockSAClient := mockstorageaccountclient.NewMockInterface(ctrl)
as.manager.azClient.storageAccountsClient = mockSAClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand Down Expand Up @@ -497,7 +497,7 @@ func TestAgentPoolNodes(t *testing.T) {
mockVMClient := mockvmclient.NewMockInterface(ctrl)
as.manager.azClient.virtualMachinesClient = mockVMClient
mockVMClient.EXPECT().List(gomock.Any(), as.manager.config.ResourceGroup).Return(expectedVMs, nil)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard)
ac, err := newAzureCache(as.manager.azClient, refreshInterval, as.manager.config.ResourceGroup, vmTypeStandard, false, "")
assert.NoError(t, err)
as.manager.azureCache = ac

Expand Down
45 changes: 44 additions & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package azure

import (
"context"
"reflect"
"regexp"
"strings"
Expand All @@ -25,6 +26,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-07-01/compute"
"github.com/Azure/go-autorest/autorest/to"
"github.com/Azure/skewer"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -55,9 +57,10 @@ type azureCache struct {
instanceToNodeGroup map[azureRef]cloudprovider.NodeGroup
unownedInstances map[azureRef]bool
autoscalingOptions map[azureRef]map[string]string
skus map[string]*skewer.Cache
}

func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmType string) (*azureCache, error) {
func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmType string, enableDynamicInstanceList bool, defaultLocation string) (*azureCache, error) {
cache := &azureCache{
interrupt: make(chan struct{}),
azClient: client,
Expand All @@ -70,6 +73,11 @@ func newAzureCache(client *azClient, cacheTTL time.Duration, resourceGroup, vmTy
instanceToNodeGroup: make(map[azureRef]cloudprovider.NodeGroup),
unownedInstances: make(map[azureRef]bool),
autoscalingOptions: make(map[azureRef]map[string]string),
skus: make(map[string]*skewer.Cache),
}

if enableDynamicInstanceList {
cache.skus[defaultLocation] = &skewer.Cache{}
}

if err := cache.regenerate(); err != nil {
Expand Down Expand Up @@ -131,11 +139,21 @@ func (m *azureCache) regenerate() error {
newAutoscalingOptions[ref] = options
}

newSkuCache := make(map[string]*skewer.Cache)
for location := range m.skus {
cache, err := m.fetchSKUs(context.Background(), location)
if err != nil {
return err
}
newSkuCache[location] = cache
}

m.mutex.Lock()
defer m.mutex.Unlock()

m.instanceToNodeGroup = newInstanceToNodeGroupCache
m.autoscalingOptions = newAutoscalingOptions
m.skus = newSkuCache

// Reset unowned instances cache.
m.unownedInstances = make(map[azureRef]bool)
Expand Down Expand Up @@ -264,6 +282,31 @@ func (m *azureCache) Unregister(nodeGroup cloudprovider.NodeGroup) bool {
return changed
}

func (m *azureCache) fetchSKUs(ctx context.Context, location string) (*skewer.Cache, error) {
return skewer.NewCache(ctx,
skewer.WithLocation(location),
skewer.WithResourceClient(m.azClient.skuClient),
)
}

func (m *azureCache) GetSKU(ctx context.Context, skuName, location string) (skewer.SKU, error) {
m.mutex.Lock()
defer m.mutex.Unlock()

cache, ok := m.skus[location]
if !ok {
var err error
cache, err = m.fetchSKUs(ctx, location)
if err != nil {
klog.V(1).Infof("Failed to instantiate cache, err: %v", err)
return skewer.SKU{}, err
}
m.skus[location] = cache
}

return cache.Get(ctx, skuName, skewer.VirtualMachines, location)
}

func (m *azureCache) getRegisteredNodeGroups() []cloudprovider.NodeGroup {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func newTestAzureManager(t *testing.T) *AzureManager {
},
}

cache, error := newAzureCache(manager.azClient, refreshInterval, manager.config.ResourceGroup, vmTypeVMSS)
cache, error := newAzureCache(manager.azClient, refreshInterval, manager.config.ResourceGroup, vmTypeVMSS, false, "")
assert.NoError(t, error)

manager.azureCache = cache
Expand Down
19 changes: 6 additions & 13 deletions cluster-autoscaler/cloudprovider/azure/azure_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package azure
import (
"context"
"fmt"
compute20190701 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-07-01/compute"
"github.com/Azure/skewer"
"k8s.io/klog/v2"
"regexp"
"strings"
Expand Down Expand Up @@ -61,24 +59,19 @@ var GetVMSSTypeStatically = func(template compute.VirtualMachineScaleSet) (*Inst

// GetVMSSTypeDynamically fetched vmss instance information using sku api calls.
// It is declared as a variable for testing purpose.
var GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, skuClient compute20190701.ResourceSkusClient) (InstanceType, error) {
var GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) {
ctx := context.Background()
var sku skewer.SKU
var vmssType InstanceType

cache, err := skewer.NewCache(ctx, skewer.WithLocation(*template.Location), skewer.WithResourceClient(skuClient))
if err != nil {
klog.V(1).Infof("Failed to instantiate cache, err: %v", err)
return vmssType, err
}

sku, err = cache.Get(ctx, *template.Sku.Name, skewer.VirtualMachines, *template.Location)
sku, err := azCache.GetSKU(ctx, *template.Sku.Name, *template.Location)
if err != nil {
// We didn't find an exact match but this is a promo type, check for matching standard
klog.V(1).Infof("No exact match found for %s, checking standard types. Error %v", *template.Sku.Name, err)
promoRe := regexp.MustCompile(`(?i)_promo`)
skuName := promoRe.ReplaceAllString(*template.Sku.Name, "")
sku, err = cache.Get(context.Background(), skuName, skewer.VirtualMachines, *template.Location)
if skuName != *template.Sku.Name {
klog.V(1).Infof("No exact match found for %q, checking standard type %q. Error %v", *template.Sku.Name, skuName, err)
sku, err = azCache.GetSKU(ctx, skuName, *template.Location)
}
if err != nil {
return vmssType, fmt.Errorf("instance type %q not supported. Error %v", *template.Sku.Name, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func createAzureManagerInternal(configReader io.Reader, discoveryOpts cloudprovi
if cfg.VmssCacheTTL != 0 {
cacheTTL = time.Duration(cfg.VmssCacheTTL) * time.Second
}
cache, err := newAzureCache(azClient, cacheTTL, cfg.ResourceGroup, cfg.VMType)
cache, err := newAzureCache(azClient, cacheTTL, cfg.ResourceGroup, cfg.VMType, cfg.EnableDynamicInstanceList, cfg.Location)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulerframework.NodeInfo, erro
return nil, err
}

node, err := buildNodeFromTemplate(scaleSet.Name, template, scaleSet.manager.azClient.skuClient,
scaleSet.enableDynamicInstanceList)
node, err := buildNodeFromTemplate(scaleSet.Name, template, scaleSet.manager)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package azure

import (
"fmt"
compute20190701 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-07-01/compute"
"github.com/Azure/go-autorest/autorest/to"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -572,7 +571,7 @@ func TestTemplateNodeInfo(t *testing.T) {
assert.NotEmpty(t, nodeInfo.Pods)

t.Run("Checking dynamic workflow", func(t *testing.T) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, skuClient compute20190701.ResourceSkusClient) (InstanceType, error) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) {
vmssType := InstanceType{}
vmssType.VCPU = 1
vmssType.GPU = 2
Expand All @@ -586,7 +585,7 @@ func TestTemplateNodeInfo(t *testing.T) {
})

t.Run("Checking static workflow if dynamic fails", func(t *testing.T) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, skuClient compute20190701.ResourceSkusClient) (InstanceType, error) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) {
return InstanceType{}, fmt.Errorf("dynamic error exists")
}
GetVMSSTypeStatically = func(template compute.VirtualMachineScaleSet) (*InstanceType, error) {
Expand All @@ -603,7 +602,7 @@ func TestTemplateNodeInfo(t *testing.T) {
})

t.Run("Fails to find vmss instance information using static and dynamic workflow, instance not supported", func(t *testing.T) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, skuClient compute20190701.ResourceSkusClient) (InstanceType, error) {
GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) {
return InstanceType{}, fmt.Errorf("dynamic error exists")
}
GetVMSSTypeStatically = func(template compute.VirtualMachineScaleSet) (*InstanceType, error) {
Expand Down
10 changes: 4 additions & 6 deletions cluster-autoscaler/cloudprovider/azure/azure_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package azure

import (
"fmt"
compute20190701 "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-07-01/compute"
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2021-07-01/compute"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -73,8 +72,7 @@ func buildGenericLabels(template compute.VirtualMachineScaleSet, nodeName string
return result
}

func buildNodeFromTemplate(scaleSetName string,
template compute.VirtualMachineScaleSet, skuClient compute20190701.ResourceSkusClient, enableDynamicInstanceList bool) (*apiv1.Node, error) {
func buildNodeFromTemplate(scaleSetName string, template compute.VirtualMachineScaleSet, manager *AzureManager) (*apiv1.Node, error) {
node := apiv1.Node{}
nodeName := fmt.Sprintf("%s-asg-%d", scaleSetName, rand.Int63())

Expand All @@ -92,10 +90,10 @@ func buildNodeFromTemplate(scaleSetName string,

// Fetching SKU information from SKU API if enableDynamicInstanceList is true.
var dynamicErr error
if enableDynamicInstanceList {
if manager.config.EnableDynamicInstanceList {
var vmssTypeDynamic InstanceType
klog.V(1).Infof("Fetching instance information for SKU: %s from SKU API", *template.Sku.Name)
vmssTypeDynamic, dynamicErr = GetVMSSTypeDynamically(template, skuClient)
vmssTypeDynamic, dynamicErr = GetVMSSTypeDynamically(template, manager.azureCache)
if dynamicErr == nil {
vcpu = vmssTypeDynamic.VCPU
gpuCount = vmssTypeDynamic.GPU
Expand All @@ -104,7 +102,7 @@ func buildNodeFromTemplate(scaleSetName string,
klog.Errorf("Dynamically fetching of instance information from SKU api failed with error: %v", dynamicErr)
}
}
if !enableDynamicInstanceList || dynamicErr != nil {
if !manager.config.EnableDynamicInstanceList || dynamicErr != nil {
klog.V(1).Infof("Falling back to static SKU list for SKU: %s", *template.Sku.Name)
// fall-back on static list of vmss if dynamic workflow fails.
vmssTypeStatic, staticErr := GetVMSSTypeStatically(template)
Expand Down

0 comments on commit bb8e41f

Please sign in to comment.