Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use getvmss api for spot instances in azure #6470

Merged
15 changes: 15 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const (
rateLimitWriteQPSEnvVar = "RATE_LIMIT_WRITE_QPS"
rateLimitWriteBucketsEnvVar = "RATE_LIMIT_WRITE_BUCKETS"

// VmssSizeRefreshPeriodDefault in seconds
VmssSizeRefreshPeriodDefault = 30

// auth methods
authMethodPrincipal = "principal"
authMethodCLI = "cli"
Expand Down Expand Up @@ -128,6 +131,9 @@ type Config struct {
// Jitter in seconds subtracted from the VMSS cache TTL before the first refresh
VmssVmsCacheJitter int `json:"vmssVmsCacheJitter" yaml:"vmssVmsCacheJitter"`

// GetVmssSizeRefreshPeriod (seconds) defines how frequently to call GET VMSS API to fetch VMSS info per nodegroup instance
GetVmssSizeRefreshPeriod int `json:"getVmssSizeRefreshPeriod" yaml:"getVmssSizeRefreshPeriod"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this qualifies as a nit, but maybe rename all of these properties that start w/ "get" so that they don't confuse readers that they are getter funcs. I think "vmssGet...|VmssGet..." instead would work without regressing the semantics for human readability?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not quite applicable for this case. GetVMSS is the actual function we're calling. switching the order around almost makes it sound (to me) like we're calling a function that gets the size rather than the VMSS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like I missed this, but GetVmssSizeRefreshPeriod being an int introduces an inconsistency with the fork, where it is time.Duration. This time it is not too bad, but let us all be careful next time. I could see cases like this getting worse if there are more dependencies.


// number of latest deployments that will not be deleted
MaxDeploymentsCount int64 `json:"maxDeploymentsCount" yaml:"maxDeploymentsCount"`

Expand Down Expand Up @@ -256,6 +262,15 @@ func BuildAzureConfig(configReader io.Reader) (*Config, error) {
cfg.EnableDynamicInstanceList = dynamicInstanceListDefault
}

if getVmssSizeRefreshPeriod := os.Getenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD"); getVmssSizeRefreshPeriod != "" {
cfg.GetVmssSizeRefreshPeriod, err = strconv.Atoi(getVmssSizeRefreshPeriod)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD %q: %v", getVmssSizeRefreshPeriod, err)
}
} else {
cfg.GetVmssSizeRefreshPeriod = VmssSizeRefreshPeriodDefault
}
Comment on lines +265 to +272
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic uses either environment variable or default, never the value from cloud config file. If we don't support setting this from cloud config file - need to remove from docs. If we need to support it - should update the logic. (It looks like the same applies to the EnableDynamicInstanceList above?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Looks like we're doing the same for others as well like - vmssVmsCacheTTL, vmssVmsCacheTTL, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, those lack the "else" part, will preserve config file setting in the absence of env override

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the cloud config file settings will already have been primed into the cfg object? If so this should work

...
} else if cfg.GetVmssSizeRefreshPeriod != 0 {
         cfg.GetVmssSizeRefreshPeriod = VmssSizeRefreshPeriodDefault
}

Is that what you're thinking @tallaxes?

Copy link
Contributor

@comtalyst comtalyst Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, line 127 will result in it being taken from cloud config file already.

GetVmssSizeRefreshPeriod int `json:"getVmssSizeRefreshPeriod" yaml:"getVmssSizeRefreshPeriod"`

Line 155-162 should take care of the parsing:

if configReader != nil {
	body, err := ioutil.ReadAll(configReader)
	if err != nil {
		return nil, fmt.Errorf("failed to read config: %v", err)
	}
	err = json.Unmarshal(body, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to unmarshal config body: %v", err)
	}
}

Then, on line 163 and below, the environment variables will be used only when cloud config file does not exist.

else {
	cfg.Cloud = os.Getenv("ARM_CLOUD")
	...
}

The problem I am seeing is when cloud config file exists, but does not contain getVmssSizeRefreshPeriod, and we set AZURE_GET_VMSS_SIZE_REFRESH_PERIOD, then this AZURE_GET_VMSS_SIZE_REFRESH_PERIOD will never be used and this configuration will goes to default.

My proposal: consider each variable individually rather than entire file at once when determining whether to use an environment variable. But not in this PR.

Copy link
Contributor

@rakechill rakechill Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To your second point:

My proposal: consider each variable individually rather than entire file at once when determining whether to use an environment variable. But not in this PR.

Do you mean:

  1. read from config file fully and populate cfg
  2. then for each variable, check to see if it is already set in cfg
  3. if it's not, check to see if the corresponding env var is set + set in cfg if it is
  4. finally, if no cfg or env var is set, set it to default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assumption here is that people may set values with a mix of cloud config file and env vars?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true, right?

The logic is:

if env var is set AND not empty string, then convert string to int and set cfg field
else: use default

It is true (right?, you may help me check again). All these environment variable parsing, from line 163 to 314, is encased in a giant else statement. It will reach this part only if configReader == nil, which, if we track it down, indicates that the user never provided the path to the config file.

if configReader != nil {
	body, err := ioutil.ReadAll(configReader)
	if err != nil {
		return nil, fmt.Errorf("failed to read config: %v", err)
	}
	err = json.Unmarshal(body, cfg)
	if err != nil {
		return nil, fmt.Errorf("failed to unmarshal config body: %v", err)
	}
}
Then, on line 163 and below, the environment variables will be used only when cloud config file does not exist.

else {
	cfg.Cloud = os.Getenv("ARM_CLOUD")
	...
	// What we are looking at
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean:

  1. read from config file fully and populate cfg
  2. then for each variable, check to see if it is already set in cfg
  3. if it's not, check to see if the corresponding env var is set + set in cfg if it is
  4. finally, if no cfg or env var is set, set it to default

That's one way to go with it---with config file > env > default.
Another way is env > config file > default.

I actually prefer the latter more, given env being more delicate(?) than the config file, as well as having a use case I see in AKS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I'm going to leave the logic as is.

However, want to explicitly call out that we are not handling a key case for most of these variables:

  • If config is not nil, but doesn't set a field
  • the env var for that field will never be picked up
  • the default for that field (if present) will never be picked up

@comtalyst is aiming to address this scenario in his PR to defork cloud-provider-azure

It's a rather large PR so we may determine that this logic is better to be merged separately. Either way, I think it's fine to keep this field using the same structure as the others and address this in a follow-up PR.


if enableVmssFlex := os.Getenv("AZURE_ENABLE_VMSS_FLEX"); enableVmssFlex != "" {
cfg.EnableVmssFlex, err = strconv.ParseBool(enableVmssFlex)
if err != nil {
Expand Down
39 changes: 25 additions & 14 deletions cluster-autoscaler/cloudprovider/azure/azure_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
VmssCacheTTL: 100,
VmssVmsCacheTTL: 110,
VmssVmsCacheJitter: 90,
GetVmssSizeRefreshPeriod: 30,
MaxDeploymentsCount: 8,
CloudProviderBackoff: true,
CloudProviderBackoffRetries: 1,
Expand Down Expand Up @@ -480,6 +481,14 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err)
})

t.Run("invalid int for AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", func(t *testing.T) {
t.Setenv("AZURE_GET_VMSS_SIZE_REFRESH_PERIOD", "invalidint")
manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient)
expectedErr := fmt.Errorf("failed to parse AZURE_GET_VMSS_SIZE_REFRESH_PERIOD \"invalidint\": strconv.Atoi: parsing \"invalidint\": invalid syntax")
assert.Nil(t, manager)
assert.Equal(t, expectedErr, err, "Return err does not match, expected: %v, actual: %v", expectedErr, err)
})

t.Run("invalid int for AZURE_MAX_DEPLOYMENT_COUNT", func(t *testing.T) {
t.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "invalidint")
manager, err := createAzureManagerInternal(nil, cloudprovider.NodeGroupDiscoveryOptions{}, mockAzClient)
Expand Down Expand Up @@ -685,13 +694,14 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second,
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down Expand Up @@ -732,13 +742,14 @@ func TestGetFilteredAutoscalingGroupsVmssWithConfiguredSizes(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
minSize: minVal,
maxSize: maxVal,
manager: manager,
enableForceDelete: manager.config.EnableForceDelete,
curSize: 3,
sizeRefreshPeriod: manager.azureCache.refreshInterval,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second,
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down
70 changes: 59 additions & 11 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,29 @@ type ScaleSet struct {
minSize int
maxSize int

enableForceDelete bool

sizeMutex sync.Mutex
curSize int64

enableForceDelete bool
enableDynamicInstanceList bool

lastSizeRefresh time.Time
// curSize tracks (and caches) the number of VMs in this ScaleSet.
// It is periodically updated from vmss.Sku.Capacity, with VMSS itself coming
// either from azure.Cache (which periodically does VMSS.List)
// or from direct VMSS.Get (used for Spot).
curSize int64
// lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity.
// Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize.
lastSizeRefresh time.Time
// sizeRefreshPeriod is how often curSize is refreshed from vmss.Sku.Capacity.
// (Set from azureCache.refreshInterval = VmssCacheTTL or [defaultMetadataCache]refreshInterval = 1min)
sizeRefreshPeriod time.Duration
// getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used (only spot instances).
// (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s,
// or override from autoscalerProfile.GetVmssSizeRefreshPeriod)
getVmssSizeRefreshPeriod time.Duration

instancesRefreshPeriod time.Duration
instancesRefreshJitter int

sizeMutex sync.Mutex
instanceMutex sync.Mutex
instanceCache []cloudprovider.Instance
lastInstanceRefresh time.Time
Expand Down Expand Up @@ -98,6 +108,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) (
scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod
}

if az.config.GetVmssSizeRefreshPeriod != 0 {
scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.config.GetVmssSizeRefreshPeriod) * time.Second
} else {
scaleSet.getVmssSizeRefreshPeriod = time.Duration(VmssSizeRefreshPeriodDefault) * time.Second
}

return scaleSet, nil
}

Expand Down Expand Up @@ -157,17 +173,43 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) {
scaleSet.sizeMutex.Lock()
defer scaleSet.sizeMutex.Unlock()

if scaleSet.lastSizeRefresh.Add(scaleSet.sizeRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize)
return scaleSet.curSize, nil
}

set, err := scaleSet.getVMSSFromCache()
if err != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err)
return -1, err
}

effectiveSizeRefreshPeriod := scaleSet.sizeRefreshPeriod

// If the scale set is Spot, we want to have a more fresh view of the Sku.Capacity field.
// This is because evictions can happen at any given point in time,
// even before VMs are materialized as nodes. We should be able to
// react to those and have the autoscaler readjust the goal again to force restoration.
// Taking into account only if orchestrationMode == Uniform because flex mode can have
// combination of spot and regular vms
if isSpot(&set) {
effectiveSizeRefreshPeriod = scaleSet.getVmssSizeRefreshPeriod
}

if scaleSet.lastSizeRefresh.Add(effectiveSizeRefreshPeriod).After(time.Now()) {
klog.V(3).Infof("VMSS: %s, returning in-memory size: %d", scaleSet.Name, scaleSet.curSize)
return scaleSet.curSize, nil
}

// If the scale set is on Spot, make a GET VMSS call to fetch more updated fresh info
if isSpot(&set) {
ctx, cancel := getContextWithCancel()
defer cancel()

var rerr *retry.Error
set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup,
scaleSet.Name)
if rerr != nil {
klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, rerr)
return -1, err
}
}

vmssSizeMutex.Lock()
curSize := *set.Sku.Capacity
vmssSizeMutex.Unlock()
Expand All @@ -184,6 +226,12 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) {
return scaleSet.curSize, nil
}

func isSpot(vmss *compute.VirtualMachineScaleSet) bool {
return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil &&
vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot
}

// GetScaleSetSize gets Scale Set size.
func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) {
return scaleSet.getCurSize()
Expand Down
63 changes: 33 additions & 30 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,44 +168,47 @@ func TestTargetSize(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible}

expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", compute.Uniform)
expectedVMSSVMs := newTestVMSSVMList(3)
expectedVMs := newTestVMList(3)
spotScaleSet := newTestVMSSList(5, "spot-vmss", "eastus", compute.Uniform)[0]
spotScaleSet.VirtualMachineProfile = &compute.VirtualMachineScaleSetVMProfile{
Priority: compute.Spot,
}
expectedScaleSets = append(expectedScaleSets, spotScaleSet)

for _, orchMode := range orchestrationModes {
provider := newTestProvider(t)
mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient
provider := newTestProvider(t)
mockVMSSClient := mockvmssclient.NewMockInterface(ctrl)
mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient
mockVMClient := mockvmclient.NewMockInterface(ctrl)
mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes()
provider.azureManager.azClient.virtualMachinesClient = mockVMClient

if orchMode == compute.Uniform {
err := provider.azureManager.forceRefresh()
assert.NoError(t, err)

mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl)
mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes()
provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient
// non-spot nodepool
registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)

} else {
provider.azureManager.config.EnableVmssFlex = true
mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes()
}
targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)

err := provider.azureManager.forceRefresh()
assert.NoError(t, err)
// Register a spot nodepool
spotregistered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "spot-vmss"))
assert.True(t, spotregistered)
assert.Equal(t, len(provider.NodeGroups()), 2)

registered := provider.azureManager.RegisterNodeGroup(
newTestScaleSet(provider.azureManager, "test-asg"))
assert.True(t, registered)
assert.Equal(t, len(provider.NodeGroups()), 1)
// mock getvmss call for spotnode pool returning different capacity
spotScaleSet.Sku.Capacity = to.Int64Ptr(1)
mockVMSSClient.EXPECT().Get(gomock.Any(), provider.azureManager.config.ResourceGroup, "spot-vmss").Return(spotScaleSet, nil).Times(1)

targetSize, err := provider.NodeGroups()[0].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 3, targetSize)
}
targetSize, err = provider.NodeGroups()[1].TargetSize()
assert.NoError(t, err)
assert.Equal(t, 1, targetSize)
}

func TestIncreaseSize(t *testing.T) {
Expand Down
Loading