Skip to content

Commit

Permalink
Merge pull request #1757 from feiskyer/cluster-autoscaler-release-1.2
Browse files Browse the repository at this point in the history
Cluster Autoscaler 1.2 : cherry pick of #1738
  • Loading branch information
k8s-ci-robot authored Mar 6, 2019
2 parents 45b0afd + b2e254a commit 796710d
Show file tree
Hide file tree
Showing 15 changed files with 96 additions and 81 deletions.
7 changes: 6 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ func (aws *awsCloudProvider) Refresh() error {
return aws.awsManager.Refresh()
}

// AwsRef contains a reference to some entity in AWS/GKE world.
// GetInstanceID gets the instance ID for the specified node.
func (aws *awsCloudProvider) GetInstanceID(node *apiv1.Node) string {
return node.Spec.ProviderID
}

// AwsRef contains a reference to some entity in AWS world.
type AwsRef struct {
Name string
}
Expand Down
11 changes: 6 additions & 5 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -129,7 +130,7 @@ func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
}

indexes = append(indexes, index)
indexToVM[index] = "azure://" + *instance.ID
indexToVM[index] = "azure://" + strings.ToLower(*instance.ID)
}

sortedIndexes := sort.IntSlice(indexes)
Expand Down Expand Up @@ -234,7 +235,7 @@ func (as *AgentPool) GetVirtualMachines() (instances []compute.VirtualMachine, e

tags := *instance.Tags
vmPoolName := tags["poolName"]
if *vmPoolName != as.Id() {
if vmPoolName == nil || !strings.EqualFold(*vmPoolName, as.Id()) {
continue
}

Expand Down Expand Up @@ -284,7 +285,7 @@ func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) {
if targetAsg == nil {
return false, fmt.Errorf("%s doesn't belong to a known agent pool", node.Name)
}
if targetAsg.Id() != as.Id() {
if !strings.EqualFold(targetAsg.Id(), as.Id()) {
return false, nil
}
return true, nil
Expand All @@ -307,7 +308,7 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
return err
}

if asg != commonAsg {
if !strings.EqualFold(asg.Id(), commonAsg.Id()) {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same node pool (%q)", instance.GetKey(), commonAsg)
}
}
Expand Down Expand Up @@ -390,7 +391,7 @@ func (as *AgentPool) Nodes() ([]string, error) {
}

// To keep consistent with providerID from kubernetes cloud provider, do not convert ID to lower case.
name := "azure://" + *instance.ID
name := "azure://" + strings.ToLower(*instance.ID)
nodes = append(nodes, name)
}

Expand Down
43 changes: 30 additions & 13 deletions cluster-autoscaler/cloudprovider/azure/azure_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"regexp"
"strings"
"sync"
"time"

Expand All @@ -28,7 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
)

var virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/Microsoft.Compute/virtualMachines/(.+)$`)
var virtualMachineRE = regexp.MustCompile(`^azure://(?:.*)/providers/microsoft.compute/virtualmachines/(.+)$`)

type asgCache struct {
registeredAsgs []cloudprovider.NodeGroup
Expand Down Expand Up @@ -63,7 +64,7 @@ func (m *asgCache) Register(asg cloudprovider.NodeGroup) bool {
defer m.mutex.Unlock()

for i := range m.registeredAsgs {
if existing := m.registeredAsgs[i]; existing.Id() == asg.Id() {
if existing := m.registeredAsgs[i]; strings.EqualFold(existing.Id(), asg.Id()) {
if reflect.DeepEqual(existing, asg) {
return false
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func (m *asgCache) Unregister(asg cloudprovider.NodeGroup) bool {
updated := make([]cloudprovider.NodeGroup, 0, len(m.registeredAsgs))
changed := false
for _, existing := range m.registeredAsgs {
if existing.Id() == asg.Id() {
if strings.EqualFold(existing.Id(), asg.Id()) {
glog.V(1).Infof("Unregistered ASG %s", asg.Id())
changed = true
continue
Expand All @@ -117,42 +118,46 @@ func (m *asgCache) FindForInstance(instance *azureRef, vmType string) (cloudprov
m.mutex.Lock()
defer m.mutex.Unlock()

if m.notInRegisteredAsg[*instance] {
inst := azureRef{Name: strings.ToLower(instance.Name)}
if m.notInRegisteredAsg[inst] {
// We already know we don't own this instance. Return early and avoid
// additional calls.
return nil, nil
}

if vmType == vmTypeVMSS {
// Omit virtual machines not managed by vmss.
if ok := virtualMachineRE.Match([]byte(instance.Name)); ok {
if ok := virtualMachineRE.Match([]byte(inst.Name)); ok {
glog.V(3).Infof("Instance %q is not managed by vmss, omit it in autoscaler", instance.Name)
m.notInRegisteredAsg[*instance] = true
m.notInRegisteredAsg[inst] = true
return nil, nil
}
}

if vmType == vmTypeStandard {
// Omit virtual machines with providerID not in Azure resource ID format.
if ok := virtualMachineRE.Match([]byte(instance.Name)); !ok {
if ok := virtualMachineRE.Match([]byte(inst.Name)); !ok {
glog.V(3).Infof("Instance %q is not in Azure resource ID format, omit it in autoscaler", instance.Name)
m.notInRegisteredAsg[*instance] = true
m.notInRegisteredAsg[inst] = true
return nil, nil
}
}

if asg, found := m.instanceToAsg[*instance]; found {
// Look up caches for the instance.
if asg := m.getInstanceFromCache(inst.Name); asg != nil {
return asg, nil
}

// Not found, regenerate the cache and try again.
if err := m.regenerate(); err != nil {
return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err)
return nil, fmt.Errorf("error while looking for ASG for instance %q, error: %v", instance.Name, err)
}
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
if asg := m.getInstanceFromCache(inst.Name); asg != nil {
return asg, nil
}

m.notInRegisteredAsg[*instance] = true
// Add the instance to notInRegisteredAsg since it's unknown from Azure.
m.notInRegisteredAsg[inst] = true
return nil, nil
}

Expand All @@ -179,3 +184,15 @@ func (m *asgCache) regenerate() error {
m.instanceToAsg = newCache
return nil
}

// Get node group from cache. nil would be return if not found.
// Should be call with lock protected.
func (m *asgCache) getInstanceFromCache(providerID string) cloudprovider.NodeGroup {
for instanceID, asg := range m.instanceToAsg {
if strings.EqualFold(instanceID.GetKey(), providerID) {
return asg
}
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package azure

import (
"strings"

"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -105,6 +107,11 @@ func (azure *AzureCloudProvider) Refresh() error {
return azure.azureManager.Refresh()
}

// GetInstanceID gets the instance ID for the specified node.
func (azure *AzureCloudProvider) GetInstanceID(node *apiv1.Node) string {
return strings.ToLower(node.Spec.ProviderID)
}

// azureRef contains a reference to some entity in Azure world.
type azureRef struct {
Name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (agentPool *ContainerServiceAgentPool) GetAKSAgentPool(agentProfiles *[]con
for _, value := range *agentProfiles {
profileName := *value.Name
glog.V(5).Infof("AKS AgentPool profile name: %s", profileName)
if profileName == (agentPool.azureRef.Name) {
if strings.EqualFold(profileName, agentPool.azureRef.Name) {
return &value
}
}
Expand All @@ -92,7 +92,7 @@ func (agentPool *ContainerServiceAgentPool) GetACSAgentPool(agentProfiles *[]con
for _, value := range *agentProfiles {
profileName := *value.Name
glog.V(5).Infof("ACS AgentPool profile name: %s", profileName)
if profileName == (agentPool.azureRef.Name) {
if strings.EqualFold(profileName, agentPool.azureRef.Name) {
return &value
}
}
Expand All @@ -105,7 +105,7 @@ func (agentPool *ContainerServiceAgentPool) GetACSAgentPool(agentProfiles *[]con
profileName := *value.Name
poolName := agentPool.azureRef.Name + "pool0"
glog.V(5).Infof("Workaround match check - ACS AgentPool Profile: %s <=> Poolname: %s", profileName, poolName)
if profileName == poolName {
if strings.EqualFold(profileName, poolName) {
return &value
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func (agentPool *ContainerServiceAgentPool) SetNodeCount(count int) (err error)
func (agentPool *ContainerServiceAgentPool) GetProviderID(name string) string {
//TODO: come with a generic way to make it work with provider id formats
// in different version of k8s.
return "azure://" + name
return "azure://" + strings.ToLower(name)
}

//GetName extracts the name of the node (a format which underlying cloud service understands)
Expand All @@ -267,7 +267,7 @@ func (agentPool *ContainerServiceAgentPool) GetName(providerID string) (string,
return "", err
}
for _, vm := range vms {
if strings.Compare(*vm.ID, providerID) == 0 {
if strings.EqualFold(*vm.ID, providerID) {
return *vm.Name, nil
}
}
Expand Down Expand Up @@ -385,7 +385,7 @@ func (agentPool *ContainerServiceAgentPool) IsContainerServiceNode(tags *map[str
poolName := (*tags)["poolName"]
if poolName != nil {
glog.V(5).Infof("Matching agentPool name: %s with tag name: %s", agentPool.azureRef.Name, *poolName)
if *poolName == agentPool.azureRef.Name {
if strings.EqualFold(*poolName, agentPool.azureRef.Name) {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/azure/azure_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

const (
fakeVirtualMachineScaleSetVMID = "/subscriptions/test-subscription-id/resourceGroups/test-asg/providers/Microsoft.Compute/virtualMachineScaleSets/agents/virtualMachines/0"
fakeVirtualMachineScaleSetVMID = "/subscriptions/test-subscription-id/resourcegroups/test-asg/providers/microsoft.compute/virtualmachinescalesets/agents/virtualmachines/0"
)

// VirtualMachineScaleSetsClientMock mocks for VirtualMachineScaleSetsClient.
Expand Down
53 changes: 10 additions & 43 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package azure
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -46,8 +47,6 @@ type ScaleSet struct {
mutex sync.Mutex
lastRefresh time.Time
curSize int64
// virtualMachines holds a list of vmss instances (instanceID -> resourceID).
virtualMachines map[string]string
}

// NewScaleSet creates a new NewScaleSet.
Expand All @@ -56,11 +55,10 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager) (*ScaleSet, erro
azureRef: azureRef{
Name: spec.Name,
},
minSize: spec.MinSize,
maxSize: spec.MaxSize,
manager: az,
curSize: -1,
virtualMachines: make(map[string]string),
minSize: spec.MinSize,
maxSize: spec.MaxSize,
manager: az,
curSize: -1,
}

return scaleSet, nil
Expand Down Expand Up @@ -190,55 +188,24 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error {
// GetScaleSetVms returns list of nodes for the given scale set.
// Note that the list results is not used directly because their resource ID format
// is not consistent with Get results.
// TODO(feiskyer): use list results directly after the issue fixed in Azure VMSS API.
func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) {
ctx, cancel := getContextWithCancel()
defer cancel()

resourceGroup := scaleSet.manager.config.ResourceGroup
result, err := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSet.Name, "", "", "")
vmList, err := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSet.Name, "", "", "")
if err != nil {
glog.Errorf("VirtualMachineScaleSetVMsClient.List failed for %s: %v", scaleSet.Name, err)
return nil, err
}

instanceIDs := make([]string, 0)
for _, vm := range result {
instanceIDs = append(instanceIDs, *vm.InstanceID)
}

allVMs := make([]string, 0)
for _, instanceID := range instanceIDs {
// Get from cache first.
if v, ok := scaleSet.virtualMachines[instanceID]; ok {
allVMs = append(allVMs, v)
continue
}

// Not in cache, get from Azure API.
getCtx, getCancel := getContextWithCancel()
defer getCancel()
vm, err := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.Get(getCtx, resourceGroup, scaleSet.Name, instanceID)
if err != nil {
exists, realErr := checkResourceExistsFromError(err)
if realErr != nil {
glog.Errorf("Failed to get VirtualMachineScaleSetVM by (%s,%s), error: %v", scaleSet.Name, instanceID, err)
return nil, realErr
}

if !exists {
glog.Warningf("Couldn't find VirtualMachineScaleSetVM by (%s,%s), assuming it has been removed", scaleSet.Name, instanceID)
continue
}
}

for _, vm := range vmList {
// The resource ID is empty string, which indicates the instance may be in deleting state.
if len(*vm.ID) == 0 {
continue
}

// Save into cache.
scaleSet.virtualMachines[instanceID] = *vm.ID
allVMs = append(allVMs, *vm.ID)
}

Expand Down Expand Up @@ -288,7 +255,7 @@ func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) {
if targetAsg == nil {
return false, fmt.Errorf("%s doesn't belong to a known scale set", node.Name)
}
if targetAsg.Id() != scaleSet.Id() {
if !strings.EqualFold(targetAsg.Id(), scaleSet.Id()) {
return false, nil
}
return true, nil
Expand All @@ -314,7 +281,7 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error {
return err
}

if asg != commonAsg {
if !strings.EqualFold(asg.Id(), commonAsg.Id()) {
return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg)
}

Expand Down Expand Up @@ -468,7 +435,7 @@ func (scaleSet *ScaleSet) Nodes() ([]string, error) {

result := make([]string, 0, len(vms))
for i := range vms {
name := "azure://" + vms[i]
name := "azure://" + strings.ToLower(vms[i])
result = append(result, name)
}

Expand Down
Loading

0 comments on commit 796710d

Please sign in to comment.