diff --git a/cluster-autoscaler/cloudprovider/alicloud/alicloud_manager.go b/cluster-autoscaler/cloudprovider/alicloud/alicloud_manager.go index 2b124cde5f4e..b33ed6962244 100644 --- a/cluster-autoscaler/cloudprovider/alicloud/alicloud_manager.go +++ b/cluster-autoscaler/cloudprovider/alicloud/alicloud_manager.go @@ -19,17 +19,17 @@ package alicloud import ( "errors" "fmt" - "gopkg.in/gcfg.v1" "io" + "math/rand" + "time" + + "gopkg.in/gcfg.v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/alicloud/alibaba-cloud-sdk-go/services/ess" klog "k8s.io/klog/v2" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "math/rand" - "time" ) const ( @@ -237,13 +237,13 @@ func (m *AliCloudManager) buildNodeFromTemplate(sg *Asg, template *sgTemplate) ( func buildGenericLabels(template *sgTemplate, nodeName string) map[string]string { result := make(map[string]string) - result[kubeletapis.LabelArch] = cloudprovider.DefaultArch - result[kubeletapis.LabelOS] = cloudprovider.DefaultOS + result[apiv1.LabelArchStable] = cloudprovider.DefaultArch + result[apiv1.LabelOSStable] = cloudprovider.DefaultOS - result[apiv1.LabelInstanceType] = template.InstanceType.instanceTypeID + result[apiv1.LabelInstanceTypeStable] = template.InstanceType.instanceTypeID - result[apiv1.LabelZoneRegion] = template.Region - result[apiv1.LabelZoneFailureDomain] = template.Zone + result[apiv1.LabelZoneRegionStable] = template.Region + result[apiv1.LabelZoneFailureDomainStable] = template.Zone result[apiv1.LabelHostname] = nodeName // append custom node labels diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 90ed68e6ef6e..5ce68fd8ce34 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -41,7 +41,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" klog "k8s.io/klog/v2" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" provider_aws "k8s.io/legacy-cloud-providers/aws" ) @@ -368,15 +367,14 @@ func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*ap func buildGenericLabels(template *asgTemplate, nodeName string) map[string]string { result := make(map[string]string) - // TODO: extract it somehow - result[kubeletapis.LabelArch] = template.InstanceType.Architecture + result[apiv1.LabelArchStable] = template.InstanceType.Architecture - result[kubeletapis.LabelOS] = cloudprovider.DefaultOS + result[apiv1.LabelOSStable] = cloudprovider.DefaultOS - result[apiv1.LabelInstanceType] = template.InstanceType.InstanceType + result[apiv1.LabelInstanceTypeStable] = template.InstanceType.InstanceType - result[apiv1.LabelZoneRegion] = template.Region - result[apiv1.LabelZoneFailureDomain] = template.Zone + result[apiv1.LabelZoneRegionStable] = template.Region + result[apiv1.LabelZoneFailureDomainStable] = template.Zone result[apiv1.LabelHostname] = nodeName return result } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 2e52217d4cdb..ef94dd6afb0c 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -38,7 +38,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" provider_aws "k8s.io/legacy-cloud-providers/aws" ) @@ -84,9 +83,8 @@ func TestBuildGenericLabels(t *testing.T) { assert.Equal(t, "us-east-1", labels[apiv1.LabelZoneRegion]) assert.Equal(t, "sillyname", labels[apiv1.LabelHostname]) assert.Equal(t, "c4.large", labels[apiv1.LabelInstanceType]) - assert.Equal(t, cloudprovider.DefaultArch, labels[kubeletapis.LabelArch]) assert.Equal(t, cloudprovider.DefaultArch, labels[apiv1.LabelArchStable]) - assert.Equal(t, cloudprovider.DefaultOS, labels[kubeletapis.LabelOS]) + assert.Equal(t, cloudprovider.DefaultOS, labels[apiv1.LabelOSStable]) } func TestExtractAllocatableResourcesFromAsg(t *testing.T) { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_template.go b/cluster-autoscaler/cloudprovider/azure/azure_template.go index 8e45ce82f16f..4467fd247b9c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_template.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_template.go @@ -18,6 +18,10 @@ package azure import ( "fmt" + "math/rand" + "regexp" + "strings" + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-12-01/compute" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -26,10 +30,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" cloudvolume "k8s.io/cloud-provider/volume" "k8s.io/klog/v2" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - "math/rand" - "regexp" - "strings" ) func buildInstanceOS(template compute.VirtualMachineScaleSet) string { @@ -44,14 +44,11 @@ func buildInstanceOS(template compute.VirtualMachineScaleSet) string { func buildGenericLabels(template compute.VirtualMachineScaleSet, nodeName string) map[string]string { result := make(map[string]string) - result[kubeletapis.LabelArch] = cloudprovider.DefaultArch result[apiv1.LabelArchStable] = cloudprovider.DefaultArch - - result[kubeletapis.LabelOS] = buildInstanceOS(template) result[apiv1.LabelOSStable] = buildInstanceOS(template) - result[apiv1.LabelInstanceType] = *template.Sku.Name - result[apiv1.LabelZoneRegion] = strings.ToLower(*template.Location) + result[apiv1.LabelInstanceTypeStable] = *template.Sku.Name + result[apiv1.LabelZoneRegionStable] = strings.ToLower(*template.Location) if template.Zones != nil && len(*template.Zones) > 0 { failureDomains := make([]string, len(*template.Zones)) @@ -59,9 +56,9 @@ func buildGenericLabels(template compute.VirtualMachineScaleSet, nodeName string failureDomains[k] = strings.ToLower(*template.Location) + "-" + v } - result[apiv1.LabelZoneFailureDomain] = strings.Join(failureDomains[:], cloudvolume.LabelMultiZoneDelimiter) + result[apiv1.LabelZoneFailureDomainStable] = strings.Join(failureDomains[:], cloudvolume.LabelMultiZoneDelimiter) } else { - result[apiv1.LabelZoneFailureDomain] = "0" + result[apiv1.LabelZoneFailureDomainStable] = "0" } result[apiv1.LabelHostname] = nodeName diff --git a/cluster-autoscaler/cloudprovider/gce/templates.go b/cluster-autoscaler/cloudprovider/gce/templates.go index fc03881d428f..aff03f886c3f 100644 --- a/cluster-autoscaler/cloudprovider/gce/templates.go +++ b/cluster-autoscaler/cloudprovider/gce/templates.go @@ -22,15 +22,13 @@ import ( "regexp" "strings" + "github.com/ghodss/yaml" gce "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" - - "github.com/ghodss/yaml" klog "k8s.io/klog/v2" ) @@ -203,20 +201,15 @@ func BuildGenericLabels(ref GceRef, machineType string, nodeName string, os Oper } // TODO: extract it somehow - result[kubeletapis.LabelArch] = cloudprovider.DefaultArch result[apiv1.LabelArchStable] = cloudprovider.DefaultArch - result[kubeletapis.LabelOS] = string(os) result[apiv1.LabelOSStable] = string(os) - result[apiv1.LabelInstanceType] = machineType result[apiv1.LabelInstanceTypeStable] = machineType ix := strings.LastIndex(ref.Zone, "-") if ix == -1 { return nil, fmt.Errorf("unexpected zone: %s", ref.Zone) } - result[apiv1.LabelZoneRegion] = ref.Zone[:ix] result[apiv1.LabelZoneRegionStable] = ref.Zone[:ix] - result[apiv1.LabelZoneFailureDomain] = ref.Zone result[apiv1.LabelZoneFailureDomainStable] = ref.Zone result[gceCSITopologyKeyZone] = ref.Zone result[apiv1.LabelHostname] = nodeName diff --git a/cluster-autoscaler/cloudprovider/gce/templates_test.go b/cluster-autoscaler/cloudprovider/gce/templates_test.go index 012eb0d6ec2c..5525c656ecc2 100644 --- a/cluster-autoscaler/cloudprovider/gce/templates_test.go +++ b/cluster-autoscaler/cloudprovider/gce/templates_test.go @@ -25,13 +25,11 @@ import ( gpuUtils "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/units" + "github.com/stretchr/testify/assert" gce "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" quota "k8s.io/kubernetes/pkg/quota/v1" - - "github.com/stretchr/testify/assert" ) func TestBuildNodeFromTemplateSetsResources(t *testing.T) { @@ -166,16 +164,11 @@ func TestBuildGenericLabels(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { expectedLabels := map[string]string{ - apiv1.LabelZoneRegion: "us-central1", apiv1.LabelZoneRegionStable: "us-central1", - apiv1.LabelZoneFailureDomain: "us-central1-b", apiv1.LabelZoneFailureDomainStable: "us-central1-b", gceCSITopologyKeyZone: "us-central1-b", apiv1.LabelHostname: "sillyname", - apiv1.LabelInstanceType: "n1-standard-8", apiv1.LabelInstanceTypeStable: "n1-standard-8", - kubeletapis.LabelArch: cloudprovider.DefaultArch, - kubeletapis.LabelOS: tc.expectedOsLabel, apiv1.LabelArchStable: cloudprovider.DefaultArch, apiv1.LabelOSStable: tc.expectedOsLabel, } diff --git a/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud_service_manager.go b/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud_service_manager.go new file mode 100644 index 000000000000..23d1741e96a3 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud_service_manager.go @@ -0,0 +1,586 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package huaweicloud + +import ( + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/core/sdktime" + huaweicloudsdkas "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/as/v1" + huaweicloudsdkasmodel "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/as/v1/model" + huaweicloudsdkecs "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2" + huaweicloudsdkecsmodel "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + "k8s.io/klog/v2" +) + +// ElasticCloudServerService represents the elastic cloud server interfaces. +// It should contains all request against elastic cloud server service. +type ElasticCloudServerService interface { + // DeleteServers deletes a group of server by ID. + DeleteServers(serverIDs []string) error +} + +// AutoScalingService represents the auto scaling service interfaces. +// It should contains all request against auto scaling service. +type AutoScalingService interface { + // ListScalingGroups list all scaling groups. + ListScalingGroups() ([]AutoScalingGroup, error) + + // GetDesireInstanceNumber gets the desire instance number of specific auto scaling group. + GetDesireInstanceNumber(groupID string) (int, error) + + // GetInstances gets the instances in an auto scaling group. + GetInstances(groupID string) ([]cloudprovider.Instance, error) + + // IncreaseSizeInstance increases the instance number of specific auto scaling group. + // The delta should be non-negative. + // IncreaseSizeInstance wait until instance number is updated. + IncreaseSizeInstance(groupID string, delta int) error + + // GetAsgForInstance returns auto scaling group for the given instance. + GetAsgForInstance(instanceID string) (*AutoScalingGroup, error) + + // RegisterAsg registers auto scaling group to manager + RegisterAsg(asg *AutoScalingGroup) + + // DeleteScalingInstances is used to delete instances from auto scaling group by instanceIDs. + DeleteScalingInstances(groupID string, instanceIds []string) error + + // Get default auto scaling group template + getAsgTemplate(groupID string) (*asgTemplate, error) + + // buildNodeFromTemplate returns template from instance flavor + buildNodeFromTemplate(asgName string, template *asgTemplate) (*apiv1.Node, error) +} + +// CloudServiceManager represents the cloud service interfaces. +// It should contains all requests against cloud services. +type CloudServiceManager interface { + // ElasticCloudServerService represents the elastic cloud server interfaces. + ElasticCloudServerService + + // AutoScalingService represents the auto scaling service interfaces. + AutoScalingService +} + +type cloudServiceManager struct { + cloudConfig *CloudConfig + getECSClientFunc func() *huaweicloudsdkecs.EcsClient + getASClientFunc func() *huaweicloudsdkas.AsClient + asgs *autoScalingGroupCache +} + +type asgTemplate struct { + name string + vcpu int64 + ram int64 + gpu int64 + region string + zone string + tags map[string]string +} + +func newCloudServiceManager(cloudConfig *CloudConfig) *cloudServiceManager { + csm := &cloudServiceManager{ + cloudConfig: cloudConfig, + getECSClientFunc: cloudConfig.getECSClient, + getASClientFunc: cloudConfig.getASClient, + asgs: newAutoScalingGroupCache(), + } + + csm.asgs.generateCache(csm) + + return csm +} + +func (csm *cloudServiceManager) GetAsgForInstance(instanceID string) (*AutoScalingGroup, error) { + return csm.asgs.FindForInstance(instanceID, csm) +} + +func (csm *cloudServiceManager) RegisterAsg(asg *AutoScalingGroup) { + csm.asgs.Register(asg) +} + +// DeleteServers deletes a group of server by ID. +func (csm *cloudServiceManager) DeleteServers(serverIDs []string) error { + ecsClient := csm.getECSClientFunc() + if ecsClient == nil { + return fmt.Errorf("failed to delete servers due to can not get ecs client") + } + + servers := make([]huaweicloudsdkecsmodel.ServerId, 0, len(serverIDs)) + for i := range serverIDs { + s := huaweicloudsdkecsmodel.ServerId{ + Id: serverIDs[i], + } + servers = append(servers, s) + } + + deletePublicIP := false + deleteVolume := false + opts := &huaweicloudsdkecsmodel.DeleteServersRequest{ + Body: &huaweicloudsdkecsmodel.DeleteServersRequestBody{ + DeletePublicip: &deletePublicIP, + DeleteVolume: &deleteVolume, + Servers: servers, + }, + } + deleteResponse, err := ecsClient.DeleteServers(opts) + if err != nil { + return fmt.Errorf("failed to delete servers. error: %v", err) + } + jobID := deleteResponse.JobId + + err = wait.Poll(5*time.Second, 300*time.Second, func() (bool, error) { + showJobOpts := &huaweicloudsdkecsmodel.ShowJobRequest{ + JobId: *jobID, + } + showJobResponse, err := ecsClient.ShowJob(showJobOpts) + if err != nil { + return false, err + } + + jobStatusEnum := huaweicloudsdkecsmodel.GetShowJobResponseStatusEnum() + if *showJobResponse.Status == jobStatusEnum.FAIL { + errCode := *showJobResponse.ErrorCode + failReason := *showJobResponse.FailReason + return false, fmt.Errorf("job failed. error code: %s, error msg: %s", errCode, failReason) + } else if *showJobResponse.Status == jobStatusEnum.SUCCESS { + return true, nil + } + + return true, nil + }) + + if err != nil { + klog.Warningf("failed to delete servers, error: %v", err) + return err + } + + return nil +} + +func (csm *cloudServiceManager) GetDesireInstanceNumber(groupID string) (int, error) { + Instances, err := csm.GetInstances(groupID) + if err != nil { + klog.Errorf("failed to list scaling group instances. group: %s, error: %v", groupID, err) + return 0, fmt.Errorf("failed to get instance list") + } + + // Get desire instance number by total instance number minus the one which is in deleting state. + desireInstanceNumber := len(Instances) + for _, instance := range Instances { + if instance.Status.State == cloudprovider.InstanceDeleting { + desireInstanceNumber-- + } + } + + return desireInstanceNumber, nil +} + +func (csm *cloudServiceManager) GetInstances(groupID string) ([]cloudprovider.Instance, error) { + asClient := csm.getASClientFunc() + if asClient == nil { + return nil, fmt.Errorf("failed to list scaling groups due to can not get as client") + } + + // SDK 'ListScalingInstances' only return no more than 20 instances. + // If there is a need in the future, need to retrieve by pages. + opts := &huaweicloudsdkasmodel.ListScalingInstancesRequest{ + ScalingGroupId: groupID, + } + response, err := asClient.ListScalingInstances(opts) + if err != nil { + klog.Errorf("failed to list scaling group instances. group: %s, error: %v", groupID, err) + return nil, err + } + if response == nil || response.ScalingGroupInstances == nil { + klog.Infof("no instance in scaling group: %s", groupID) + return nil, nil + } + + instances := make([]cloudprovider.Instance, 0, len(*response.ScalingGroupInstances)) + for _, sgi := range *response.ScalingGroupInstances { + // When a new instance joining to the scaling group, the instance id maybe empty(nil). + if sgi.InstanceId == nil { + klog.Infof("ignore instance without instance id, maybe instance is joining.") + continue + } + instance := cloudprovider.Instance{ + Id: *sgi.InstanceId, + Status: csm.transformInstanceState(*sgi.LifeCycleState, *sgi.HealthStatus), + } + instances = append(instances, instance) + } + + return instances, nil +} + +func (csm *cloudServiceManager) DeleteScalingInstances(groupID string, instanceIds []string) error { + asClient := csm.getASClientFunc() + + instanceDelete := "yes" + opts := &huaweicloudsdkasmodel.UpdateScalingGroupInstanceRequest{ + ScalingGroupId: groupID, + Body: &huaweicloudsdkasmodel.UpdateScalingGroupInstanceRequestBody{ + InstancesId: instanceIds, + InstanceDelete: &instanceDelete, + Action: huaweicloudsdkasmodel.GetUpdateScalingGroupInstanceRequestBodyActionEnum().REMOVE, + }, + } + + _, err := asClient.UpdateScalingGroupInstance(opts) + + if err != nil { + klog.Errorf("failed to delete scaling instances. group: %s, error: %v", groupID, err) + return err + } + + return nil +} + +// IncreaseSizeInstance increases a scaling group's instance size. +// The workflow works as follows: +// 1. create scaling policy with scheduled type. +// 2. execute the scaling policy immediately(not waiting the policy's launch time). +// 3. wait for the instance number be increased and remove the scaling policy. +func (csm *cloudServiceManager) IncreaseSizeInstance(groupID string, delta int) error { + originalInstanceSize, err := csm.GetDesireInstanceNumber(groupID) + if err != nil { + return err + } + + // create a scaling policy + launchTime := sdktime.SdkTime(time.Now().Add(time.Hour)) + addOperation := huaweicloudsdkasmodel.GetScalingPolicyActionOperationEnum().ADD + instanceNum := int32(delta) + opts := &huaweicloudsdkasmodel.CreateScalingPolicyRequest{ + Body: &huaweicloudsdkasmodel.CreateScalingPolicyRequestBody{ + // It's not mandatory for AS service to set a unique policy name. + ScalingPolicyName: "huaweicloudautoscaler", + ScalingGroupId: groupID, + ScalingPolicyType: huaweicloudsdkasmodel.GetCreateScalingPolicyRequestBodyScalingPolicyTypeEnum().SCHEDULED, + ScheduledPolicy: &huaweicloudsdkasmodel.ScheduledPolicy{ + LaunchTime: &launchTime, + }, + ScalingPolicyAction: &huaweicloudsdkasmodel.ScalingPolicyAction{ + Operation: &addOperation, + InstanceNumber: &instanceNum, + }, + }, + } + + spID, err := csm.createScalingPolicy(opts) + if err != nil { + return err + } + + // make sure scaling policy will be cleaned up. + deletePolicyOps := &huaweicloudsdkasmodel.DeleteScalingPolicyRequest{ + ScalingPolicyId: spID, + } + defer csm.deleteScalingPolicy(deletePolicyOps) + + // execute policy immediately + executeAction := huaweicloudsdkasmodel.GetExecuteScalingPolicyRequestBodyActionEnum() + executeOpts := &huaweicloudsdkasmodel.ExecuteScalingPolicyRequest{ + ScalingPolicyId: spID, + Body: &huaweicloudsdkasmodel.ExecuteScalingPolicyRequestBody{ + Action: &executeAction.EXECUTE, + }, + } + err = csm.executeScalingPolicy(executeOpts) + if err != nil { + return err + } + + // wait for instance number indeed be increased + return wait.Poll(5*time.Second, 300*time.Second, func() (done bool, err error) { + currentInstanceSize, err := csm.GetDesireInstanceNumber(groupID) + if err != nil { + return false, err + } + + if currentInstanceSize == originalInstanceSize+delta { + return true, nil + } + klog.V(1).Infof("waiting instance increase from %d to %d, now is: %d", originalInstanceSize, originalInstanceSize+delta, currentInstanceSize) + + return false, nil + }) +} + +func (csm *cloudServiceManager) ListScalingGroups() ([]AutoScalingGroup, error) { + asClient := csm.getASClientFunc() + if asClient == nil { + return nil, fmt.Errorf("failed to list scaling groups due to can not get as client") + } + + // requiredState := huaweicloudsdkasmodel.GetListScalingGroupsRequestScalingGroupStatusEnum().INSERVICE + opts := &huaweicloudsdkasmodel.ListScalingGroupsRequest{ + // ScalingGroupStatus: &requiredState, + } + response, err := asClient.ListScalingGroups(opts) + if err != nil { + klog.Errorf("failed to list scaling groups. error: %v", err) + return nil, err + } + + if response == nil || response.ScalingGroups == nil { + klog.Info("no scaling group yet.") + return nil, nil + } + + autoScalingGroups := make([]AutoScalingGroup, 0, len(*response.ScalingGroups)) + for _, sg := range *response.ScalingGroups { + autoScalingGroup := newAutoScalingGroup(csm, sg) + autoScalingGroups = append(autoScalingGroups, autoScalingGroup) + klog.Infof("found autoscaling group: %s", autoScalingGroup.groupName) + } + + return autoScalingGroups, nil +} + +func (csm *cloudServiceManager) transformInstanceState(lifeCycleState huaweicloudsdkasmodel.ScalingGroupInstanceLifeCycleState, + healthStatus huaweicloudsdkasmodel.ScalingGroupInstanceHealthStatus) *cloudprovider.InstanceStatus { + instanceStatus := &cloudprovider.InstanceStatus{} + + lifeCycleStateEnum := huaweicloudsdkasmodel.GetScalingGroupInstanceLifeCycleStateEnum() + switch lifeCycleState { + case lifeCycleStateEnum.INSERVICE: + instanceStatus.State = cloudprovider.InstanceRunning + case lifeCycleStateEnum.PENDING: + instanceStatus.State = cloudprovider.InstanceCreating + case lifeCycleStateEnum.PENDING_WAIT: + instanceStatus.State = cloudprovider.InstanceCreating + case lifeCycleStateEnum.REMOVING: + instanceStatus.State = cloudprovider.InstanceDeleting + case lifeCycleStateEnum.REMOVING_WAIT: + instanceStatus.State = cloudprovider.InstanceDeleting + default: + instanceStatus.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorMessage: fmt.Sprintf("invalid instance lifecycle state: %v", lifeCycleState), + } + return instanceStatus + } + + healthStatusEnum := huaweicloudsdkasmodel.GetScalingGroupInstanceHealthStatusEnum() + switch healthStatus { + case healthStatusEnum.NORMAL: + case healthStatusEnum.INITAILIZING: + case healthStatusEnum.ERROR: + instanceStatus.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorMessage: fmt.Sprintf("%v", healthStatus), + } + return instanceStatus + default: + instanceStatus.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorMessage: fmt.Sprintf("invalid instance health state: %v", healthStatus), + } + return instanceStatus + } + + return instanceStatus +} + +func (csm *cloudServiceManager) createScalingPolicy(opts *huaweicloudsdkasmodel.CreateScalingPolicyRequest) (scalingPolicyID string, err error) { + asClient := csm.getASClientFunc() + if asClient == nil { + return "", fmt.Errorf("failed to get as client") + } + + response, err := asClient.CreateScalingPolicy(opts) + if err != nil { + klog.Warningf("create scaling policy failed. policy: %s, error: %v", opts.String(), err) + return "", err + } + + klog.V(1).Infof("create scaling policy succeed. policy id: %s", *(response.ScalingPolicyId)) + + return *(response.ScalingPolicyId), nil +} + +func (csm *cloudServiceManager) executeScalingPolicy(opts *huaweicloudsdkasmodel.ExecuteScalingPolicyRequest) error { + asClient := csm.getASClientFunc() + if asClient == nil { + return fmt.Errorf("failed to get as client") + } + + _, err := asClient.ExecuteScalingPolicy(opts) + if err != nil { + klog.Warningf("execute scaling policy failed. policy id: %s, error: %v", opts.ScalingPolicyId, err) + return err + } + + klog.V(1).Infof("execute scaling policy succeed. policy id: %s", opts.ScalingPolicyId) + return nil +} + +func (csm *cloudServiceManager) deleteScalingPolicy(opts *huaweicloudsdkasmodel.DeleteScalingPolicyRequest) error { + asClient := csm.getASClientFunc() + if asClient == nil { + return fmt.Errorf("failed to get as client") + } + + _, err := asClient.DeleteScalingPolicy(opts) + if err != nil { + klog.Warningf("failed to delete scaling policy. policy id: %s, error: %v", opts.ScalingPolicyId, err) + return err + } + + klog.V(1).Infof("delete scaling policy succeed. policy id: %s", opts.ScalingPolicyId) + return nil +} + +func (csm *cloudServiceManager) getScalingGroupByID(groupID string) (*huaweicloudsdkasmodel.ScalingGroups, error) { + asClient := csm.getASClientFunc() + opts := &huaweicloudsdkasmodel.ShowScalingGroupRequest{ + ScalingGroupId: groupID, + } + response, err := asClient.ShowScalingGroup(opts) + if err != nil { + klog.Errorf("failed to show scaling group info. group: %s, error: %v", groupID, err) + return nil, err + } + if response == nil || response.ScalingGroup == nil { + return nil, fmt.Errorf("no scaling group found: %s", groupID) + } + + return response.ScalingGroup, nil +} + +func (csm *cloudServiceManager) getScalingGroupConfigByID(groupID, configID string) (*huaweicloudsdkasmodel.ScalingConfiguration, error) { + asClient := csm.getASClientFunc() + opts := &huaweicloudsdkasmodel.ShowScalingConfigRequest{ + ScalingConfigurationId: configID, + } + response, err := asClient.ShowScalingConfig(opts) + if err != nil { + klog.Errorf("failed to show scaling group config. config id: %s, error: %v", configID, err) + return nil, err + } + if response == nil || response.ScalingConfiguration == nil { + return nil, fmt.Errorf("no scaling configuration found, groupID: %s, configID: %s", groupID, configID) + } + return response.ScalingConfiguration, nil +} + +func (csm *cloudServiceManager) listFlavors(az string) (*[]huaweicloudsdkecsmodel.Flavor, error) { + ecsClient := csm.getECSClientFunc() + opts := &huaweicloudsdkecsmodel.ListFlavorsRequest{ + AvailabilityZone: &az, + } + response, err := ecsClient.ListFlavors(opts) + if err != nil { + klog.Errorf("failed to list flavors. availability zone: %s", az) + return nil, err + } + + return response.Flavors, nil +} + +func (csm *cloudServiceManager) getAsgTemplate(groupID string) (*asgTemplate, error) { + sg, err := csm.getScalingGroupByID(groupID) + if err != nil { + klog.Errorf("failed to get ASG by id:%s,because of %s", groupID, err.Error()) + return nil, err + } + + configuration, err := csm.getScalingGroupConfigByID(groupID, *sg.ScalingConfigurationId) + + for _, az := range *sg.AvailableZones { + flavors, err := csm.listFlavors(az) + if err != nil { + klog.Errorf("failed to list flavors, available zone is: %s, error: %v", az, err) + return nil, err + } + + for _, flavor := range *flavors { + if !strings.EqualFold(flavor.Name, *configuration.InstanceConfig.FlavorRef) { + continue + } + + vcpus, _ := strconv.ParseInt(flavor.Vcpus, 10, 64) + return &asgTemplate{ + name: flavor.Name, + vcpu: vcpus, + ram: int64(flavor.Ram), + zone: az, + }, nil + } + } + return nil, nil +} + +func (csm *cloudServiceManager) buildNodeFromTemplate(asgName string, template *asgTemplate) (*apiv1.Node, error) { + node := apiv1.Node{} + nodeName := fmt.Sprintf("%s-asg-%d", asgName, rand.Int63()) + + node.ObjectMeta = metav1.ObjectMeta{ + Name: nodeName, + SelfLink: fmt.Sprintf("/api/v1/nodes/%s", nodeName), + Labels: map[string]string{}, + } + + node.Status = apiv1.NodeStatus{ + Capacity: apiv1.ResourceList{}, + } + + node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI) + node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.vcpu, resource.DecimalSI) + node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.gpu, resource.DecimalSI) + node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.ram*1024*1024, resource.DecimalSI) + + node.Status.Allocatable = node.Status.Capacity + + node.Labels = cloudprovider.JoinStringMaps(node.Labels, buildGenericLabels(template, nodeName)) + + node.Status.Conditions = cloudprovider.BuildReadyConditions() + return &node, nil +} + +func buildGenericLabels(template *asgTemplate, nodeName string) map[string]string { + result := make(map[string]string) + result[apiv1.LabelArchStable] = cloudprovider.DefaultArch + result[apiv1.LabelOSStable] = cloudprovider.DefaultOS + + result[apiv1.LabelInstanceTypeStable] = template.name + + result[apiv1.LabelTopologyRegion] = template.region + result[apiv1.LabelTopologyZone] = template.zone + result[apiv1.LabelHostname] = nodeName + + // append custom node labels + for key, value := range template.tags { + result[key] = value + } + + return result +} diff --git a/cluster-autoscaler/core/utils/utils.go b/cluster-autoscaler/core/utils/utils.go index 0f81bb41f6bb..e7c0ec9477ab 100644 --- a/cluster-autoscaler/core/utils/utils.go +++ b/cluster-autoscaler/core/utils/utils.go @@ -22,6 +22,8 @@ import ( "reflect" "time" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -172,6 +174,8 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap return nil, errors.ToAutoscalerError(errors.CloudProviderError, err) } + updateDeprecatedTemplateLabels(baseNodeInfo) + pods, err := daemonset.GetDaemonSetPodsForNode(baseNodeInfo, daemonsets, predicateChecker) if err != nil { return nil, errors.ToAutoscalerError(errors.InternalError, err) @@ -188,6 +192,31 @@ func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*ap return sanitizedNodeInfo, nil } +// UpdateDeprecatedTemplateLabels updates beta and deprecated labels from stable labels +func updateDeprecatedTemplateLabels(nodeInfo *schedulerframework.NodeInfo) { + node := nodeInfo.Node() + if v, ok := node.ObjectMeta.Labels[apiv1.LabelArchStable]; ok { + node.ObjectMeta.Labels[kubeletapis.LabelArch] = v + } + if v, ok := node.ObjectMeta.Labels[apiv1.LabelOSStable]; ok { + node.ObjectMeta.Labels[kubeletapis.LabelOS] = v + } + if v, ok := node.ObjectMeta.Labels[apiv1.LabelInstanceTypeStable]; ok { + node.ObjectMeta.Labels[apiv1.LabelInstanceType] = v + } + if v, ok := node.ObjectMeta.Labels[apiv1.LabelZoneRegionStable]; ok { + node.ObjectMeta.Labels[apiv1.LabelZoneRegion] = v + } + if v, ok := node.ObjectMeta.Labels[apiv1.LabelZoneFailureDomainStable]; ok { + node.ObjectMeta.Labels[apiv1.LabelZoneFailureDomain] = v + } +} + +// isVirtualNode determines if the node is created by virtual kubelet +func isVirtualNode(node *apiv1.Node) bool { + return node.ObjectMeta.Labels["type"] == "virtual-kubelet" +} + // FilterOutNodesFromNotAutoscaledGroups return subset of input nodes for which cloud provider does not // return autoscaled node group. func FilterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider) ([]*apiv1.Node, errors.AutoscalerError) {