Skip to content

Commit

Permalink
Merge pull request #3440 from DataDog/vmssvm-refresh-jitter
Browse files Browse the repository at this point in the history
Azure: optional jitter on initial VMSS VM cache refresh
  • Loading branch information
k8s-ci-robot authored Aug 20, 2020
2 parents 5c07277 + c168eed commit f26eba8
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 19 deletions.
8 changes: 8 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ In addition, cluster-autoscaler exposes a `AZURE_VMSS_CACHE_TTL` environment var
| ----------- | ------- | -------------------- | ----------------- |
| VmssCacheTTL | 15 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |

The `AZURE_VMSS_VMS_CACHE_TTL` environment variable affects the `GetScaleSetVms` (VMSS VM List) calls rate. The default value is 300 seconds.
A configurable jitter (`AZURE_VMSS_VMS_CACHE_JITTER` environment variable, default 0) expresses the maximum number of second that will be subtracted from that initial VMSS cache TTL after a new VMSS is discovered by the cluster-autoscaler: this can prevent a dogpile effect on clusters having many VMSS.

| Config Name | Default | Environment Variable | Cloud Config File |
| ----------- | ------- | -------------------- | ----------------- |
| vmssVmsCacheTTL | 300 | AZURE_VMSS_VMS_CACHE_TTL | vmssVmsCacheTTL |
| vmssVmsCacheJitter | 0 | AZURE_VMSS_VMS_CACHE_JITTER | vmssVmsCacheJitter |

When using K8s 1.18 or higher, it is also recommended to configure backoff and retries on the client as described [here](#rate-limit-and-back-off-retries)

### Standard deployment
Expand Down
20 changes: 20 additions & 0 deletions cluster-autoscaler/cloudprovider/azure/azure_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ type Config struct {
// VMSS metadata cache TTL in seconds, only applies for vmss type
VmssCacheTTL int64 `json:"vmssCacheTTL" yaml:"vmssCacheTTL"`

// VMSS instances cache TTL in seconds, only applies for vmss type
VmssVmsCacheTTL int64 `json:"vmssVmsCacheTTL" yaml:"vmssVmsCacheTTL"`

// Jitter in seconds subtracted from the VMSS cache TTL before the first refresh
VmssVmsCacheJitter int `json:"vmssVmsCacheJitter" yaml:"vmssVmsCacheJitter"`

// number of latest deployments that will not be deleted
MaxDeploymentsCount int64 `json:"maxDeploymentsCount" yaml:"maxDeploymentsCount"`

Expand Down Expand Up @@ -339,6 +345,20 @@ func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.Node
}
}

if vmssVmsCacheTTL := os.Getenv("AZURE_VMSS_VMS_CACHE_TTL"); vmssVmsCacheTTL != "" {
cfg.VmssVmsCacheTTL, err = strconv.ParseInt(vmssVmsCacheTTL, 10, 0)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_VMSS_VMS_CACHE_TTL %q: %v", vmssVmsCacheTTL, err)
}
}

if vmssVmsCacheJitter := os.Getenv("AZURE_VMSS_VMS_CACHE_JITTER"); vmssVmsCacheJitter != "" {
cfg.VmssVmsCacheJitter, err = strconv.Atoi(vmssVmsCacheJitter)
if err != nil {
return nil, fmt.Errorf("failed to parse AZURE_VMSS_VMS_CACHE_JITTER %q: %v", vmssVmsCacheJitter, err)
}
}

if threshold := os.Getenv("AZURE_MAX_DEPLOYMENT_COUNT"); threshold != "" {
cfg.MaxDeploymentsCount, err = strconv.ParseInt(threshold, 10, 0)
if err != nil {
Expand Down
36 changes: 26 additions & 10 deletions cluster-autoscaler/cloudprovider/azure/azure_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ const validAzureCfg = `{
"routeTableName": "fakeName",
"primaryAvailabilitySetName": "fakeName",
"vmssCacheTTL": 60,
"vmssVmsCacheTTL": 240,
"vmssVmsCacheJitter": 120,
"maxDeploymentsCount": 8,
"cloudProviderRateLimit": false,
"routeRateLimit": {
Expand All @@ -75,6 +77,8 @@ const validAzureCfgForStandardVMType = `{
"routeTableName": "fakeName",
"primaryAvailabilitySetName": "fakeName",
"vmssCacheTTL": 60,
"vmssVmsCacheTTL": 240,
"vmssVmsCacheJitter": 120,
"maxDeploymentsCount": 8,
"cloudProviderRateLimit": false,
"routeRateLimit": {
Expand Down Expand Up @@ -121,6 +125,8 @@ const validAzureCfgForStandardVMTypeWithoutDeploymentParameters = `{
"routeTableName": "fakeName",
"primaryAvailabilitySetName": "fakeName",
"vmssCacheTTL": 60,
"vmssVmsCacheTTL": 240,
"vmssVmsCacheJitter": 120,
"maxDeploymentsCount": 8,
"cloudProviderRateLimit": false,
"routeRateLimit": {
Expand All @@ -145,6 +151,8 @@ func TestCreateAzureManagerValidConfig(t *testing.T) {
AADClientID: "fakeId",
AADClientSecret: "fakeId",
VmssCacheTTL: 60,
VmssVmsCacheTTL: 240,
VmssVmsCacheJitter: 120,
MaxDeploymentsCount: 8,
CloudProviderRateLimitConfig: CloudProviderRateLimitConfig{
RateLimitConfig: azclients.RateLimitConfig{
Expand Down Expand Up @@ -215,6 +223,8 @@ func TestCreateAzureManagerValidConfigForStandardVMType(t *testing.T) {
AADClientID: "fakeId",
AADClientSecret: "fakeId",
VmssCacheTTL: 60,
VmssVmsCacheTTL: 240,
VmssVmsCacheJitter: 120,
MaxDeploymentsCount: 8,
CloudProviderRateLimitConfig: CloudProviderRateLimitConfig{
RateLimitConfig: azclients.RateLimitConfig{
Expand Down Expand Up @@ -369,6 +379,8 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
UseManagedIdentityExtension: true,
UserAssignedIdentityID: "UserAssignedIdentityID",
VmssCacheTTL: 100,
VmssVmsCacheTTL: 110,
VmssVmsCacheJitter: 90,
MaxDeploymentsCount: 8,
CloudProviderBackoff: true,
CloudProviderBackoffRetries: 1,
Expand Down Expand Up @@ -444,6 +456,8 @@ func TestCreateAzureManagerWithNilConfig(t *testing.T) {
os.Setenv("ARM_USE_MANAGED_IDENTITY_EXTENSION", "true")
os.Setenv("ARM_USER_ASSIGNED_IDENTITY_ID", "UserAssignedIdentityID")
os.Setenv("AZURE_VMSS_CACHE_TTL", "100")
os.Setenv("AZURE_VMSS_VMS_CACHE_TTL", "110")
os.Setenv("AZURE_VMSS_VMS_CACHE_JITTER", "90")
os.Setenv("AZURE_MAX_DEPLOYMENT_COUNT", "8")
os.Setenv("ENABLE_BACKOFF", "true")
os.Setenv("BACKOFF_RETRIES", "1")
Expand Down Expand Up @@ -748,11 +762,12 @@ func TestListScalesets(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: 5,
maxSize: 50,
manager: manager,
curSize: 3,
sizeRefreshPeriod: defaultVmssSizeRefreshPeriod,
minSize: 5,
maxSize: 50,
manager: manager,
curSize: 3,
sizeRefreshPeriod: defaultVmssSizeRefreshPeriod,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
}},
},
{
Expand Down Expand Up @@ -854,11 +869,12 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) {
azureRef: azureRef{
Name: vmssName,
},
minSize: minVal,
maxSize: maxVal,
manager: manager,
curSize: 3,
sizeRefreshPeriod: defaultVmssSizeRefreshPeriod,
minSize: minVal,
maxSize: maxVal,
manager: manager,
curSize: 3,
sizeRefreshPeriod: defaultVmssSizeRefreshPeriod,
instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod,
}}
assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs)
}
Expand Down
37 changes: 28 additions & 9 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
)

var (
defaultVmssSizeRefreshPeriod = 15 * time.Second
vmssInstancesRefreshPeriod = 5 * time.Minute
vmssContextTimeout = 3 * time.Minute
vmssSizeMutex sync.Mutex
defaultVmssSizeRefreshPeriod = 15 * time.Second
defaultVmssInstancesRefreshPeriod = 5 * time.Minute
vmssContextTimeout = 3 * time.Minute
vmssSizeMutex sync.Mutex
)

var scaleSetStatusCache struct {
Expand Down Expand Up @@ -83,6 +83,9 @@ type ScaleSet struct {
lastSizeRefresh time.Time
sizeRefreshPeriod time.Duration

instancesRefreshPeriod time.Duration
instancesRefreshJitter int

instanceMutex sync.Mutex
instanceCache []cloudprovider.Instance
lastInstanceRefresh time.Time
Expand All @@ -98,6 +101,8 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) (
maxSize: spec.MaxSize,
manager: az,
curSize: curSize,

instancesRefreshJitter: az.config.VmssVmsCacheJitter,
}

if az.config.VmssCacheTTL != 0 {
Expand All @@ -106,6 +111,12 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64) (
scaleSet.sizeRefreshPeriod = defaultVmssSizeRefreshPeriod
}

if az.config.VmssVmsCacheTTL != 0 {
scaleSet.instancesRefreshPeriod = time.Duration(az.config.VmssVmsCacheTTL) * time.Second
} else {
scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod
}

return scaleSet, nil
}

Expand Down Expand Up @@ -682,25 +693,33 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) {
defer scaleSet.instanceMutex.Unlock()

if int64(len(scaleSet.instanceCache)) == curSize &&
scaleSet.lastInstanceRefresh.Add(vmssInstancesRefreshPeriod).After(time.Now()) {
scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) {
klog.V(4).Infof("Nodes: returns with curSize %d", curSize)
return scaleSet.instanceCache, nil
}

klog.V(4).Infof("Nodes: starts to get VMSS VMs")

lastRefresh := time.Now()
if scaleSet.lastInstanceRefresh.IsZero() && scaleSet.instancesRefreshJitter > 0 {
// new VMSS: spread future refreshs
splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1)
lastRefresh = time.Now().Add(-time.Second * time.Duration(splay))
}

vms, rerr := scaleSet.GetScaleSetVms()
if rerr != nil {
if isAzureRequestsThrottled(rerr) {
// Log a warning and update the instance refresh time so that it would retry after next vmssInstancesRefreshPeriod.
// Log a warning and update the instance refresh time so that it would retry after next scaleSet.instanceRefreshPeriod.
klog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr)
scaleSet.lastInstanceRefresh = time.Now()
scaleSet.lastInstanceRefresh = lastRefresh
return scaleSet.instanceCache, nil
}
return nil, rerr.Error()
}

scaleSet.instanceCache = buildInstanceCache(vms)
scaleSet.lastInstanceRefresh = time.Now()
scaleSet.lastInstanceRefresh = lastRefresh
klog.V(4).Infof("Nodes: returns")
return scaleSet.instanceCache, nil
}
Expand Down Expand Up @@ -765,7 +784,7 @@ func instanceStatusFromVM(vm compute.VirtualMachineScaleSetVM) *cloudprovider.In
func (scaleSet *ScaleSet) invalidateInstanceCache() {
scaleSet.instanceMutex.Lock()
// Set the instanceCache as outdated.
scaleSet.lastInstanceRefresh = time.Now().Add(-1 * vmssInstancesRefreshPeriod)
scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod)
scaleSet.instanceMutex.Unlock()
}

Expand Down

0 comments on commit f26eba8

Please sign in to comment.