diff --git a/cluster-autoscaler/README.md b/cluster-autoscaler/README.md index efc8106f6a95..fff2e418a9d9 100644 --- a/cluster-autoscaler/README.md +++ b/cluster-autoscaler/README.md @@ -33,6 +33,7 @@ You should also take a look at the notes and "gotchas" for your specific cloud p * [Vultr](./cloudprovider/vultr/README.md) * [TencentCloud](./cloudprovider/tencentcloud/README.md) * [Scaleway](./cloudprovider/scaleway/README.md) +* [Rancher](./cloudprovider/rancher/README.md) # Releases @@ -180,3 +181,4 @@ Supported cloud providers: * TencentCloud https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/tencentcloud/README.md * BaiduCloud https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/baiducloud/README.md * Huawei Cloud https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/huaweicloud/README.md +* Rancher https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/cloudprovider/rancher/README.md diff --git a/cluster-autoscaler/cloudprovider/builder/builder_all.go b/cluster-autoscaler/cloudprovider/builder/builder_all.go index b953ecb117f7..8bcc338d03b1 100644 --- a/cluster-autoscaler/cloudprovider/builder/builder_all.go +++ b/cluster-autoscaler/cloudprovider/builder/builder_all.go @@ -1,5 +1,5 @@ -//go:build !gce && !aws && !azure && !kubemark && !alicloud && !magnum && !digitalocean && !clusterapi && !huaweicloud && !ionoscloud && !linode && !hetzner && !bizflycloud && !brightbox && !packet && !oci && !vultr && !tencentcloud && !scaleway && !externalgrpc && !civo -// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum,!digitalocean,!clusterapi,!huaweicloud,!ionoscloud,!linode,!hetzner,!bizflycloud,!brightbox,!packet,!oci,!vultr,!tencentcloud,!scaleway,!externalgrpc,!civo +//go:build !gce && !aws && !azure && !kubemark && !alicloud && !magnum && !digitalocean && !clusterapi && !huaweicloud && !ionoscloud && !linode && !hetzner && !bizflycloud && !brightbox && !packet && !oci && !vultr && !tencentcloud && !scaleway && !externalgrpc && !civo && !rancher +// +build !gce,!aws,!azure,!kubemark,!alicloud,!magnum,!digitalocean,!clusterapi,!huaweicloud,!ionoscloud,!linode,!hetzner,!bizflycloud,!brightbox,!packet,!oci,!vultr,!tencentcloud,!scaleway,!externalgrpc,!civo,!rancher /* Copyright 2018 The Kubernetes Authors. @@ -43,6 +43,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/ovhcloud" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/packet" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/scaleway" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/tencentcloud" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/vultr" @@ -76,6 +77,7 @@ var AvailableCloudProviders = []string{ cloudprovider.TencentcloudProviderName, cloudprovider.CivoProviderName, cloudprovider.ScalewayProviderName, + cloudprovider.RancherProviderName, } // DefaultCloudProvider is GCE. @@ -133,6 +135,8 @@ func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGro return civo.BuildCivo(opts, do, rl) case cloudprovider.ScalewayProviderName: return scaleway.BuildScaleway(opts, do, rl) + case cloudprovider.RancherProviderName: + return rancher.BuildRancher(opts, do, rl) } return nil } diff --git a/cluster-autoscaler/cloudprovider/builder/builder_rancher.go b/cluster-autoscaler/cloudprovider/builder/builder_rancher.go new file mode 100644 index 000000000000..ee8019e21dce --- /dev/null +++ b/cluster-autoscaler/cloudprovider/builder/builder_rancher.go @@ -0,0 +1,43 @@ +//go:build rancher +// +build rancher + +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher" + "k8s.io/autoscaler/cluster-autoscaler/config" +) + +// AvailableCloudProviders supported by the cloud provider builder. +var AvailableCloudProviders = []string{ + cloudprovider.RancherProviderName, +} + +// DefaultCloudProvider for rancher-only build is rancher. +const DefaultCloudProvider = cloudprovider.RancherProviderName + +func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { + switch opts.CloudProviderName { + case cloudprovider.RancherProviderName: + return rancher.BuildRancher(opts, do, rl) + } + + return nil +} diff --git a/cluster-autoscaler/cloudprovider/cloud_provider.go b/cluster-autoscaler/cloudprovider/cloud_provider.go index 232702d48558..9652e47d7a98 100644 --- a/cluster-autoscaler/cloudprovider/cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/cloud_provider.go @@ -80,6 +80,8 @@ const ( ExternalGrpcProviderName = "externalgrpc" // CivoProviderName gets the provider name of civo CivoProviderName = "civo" + // RancherProviderName gets the provider name of rancher + RancherProviderName = "rancher" ) // CloudProvider contains configuration info and functions for interacting with diff --git a/cluster-autoscaler/cloudprovider/rancher/OWNERS b/cluster-autoscaler/cloudprovider/rancher/OWNERS new file mode 100644 index 000000000000..370bf629914d --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/OWNERS @@ -0,0 +1,10 @@ +approvers: +#- ctrox +#- gajicdev +#- pawelkuc +#- thirdeyenick +reviewers: +#- ctrox +#- gajicdev +#- pawelkuc +#- thirdeyenick diff --git a/cluster-autoscaler/cloudprovider/rancher/README.md b/cluster-autoscaler/cloudprovider/rancher/README.md new file mode 100644 index 000000000000..88f384b0b7a0 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/README.md @@ -0,0 +1,74 @@ +# Cluster Autoscaler for Rancher with RKE2 + +This cluster autoscaler for Rancher scales nodes in clusters which use RKE2 +provisioning (Rancher v2.6+). It uses a combination of the Rancher API and the +underlying cluster-api types of RKE2. + +## Configuration + +The `cluster-autoscaler` for Rancher needs a configuration file to work by +using `--cloud-config` parameter. An up-to-date example can be found in +[examples/config.yaml](./examples/config.yaml). + +### Permissions + +The Rancher server account provided in the `cloud-config` requires the +following permissions on the Rancher server: + +* Get/Update of the `clusters.provisioning.cattle.io` resource to autoscale +* List of `machines.cluster.x-k8s.io` in the namespace of the cluster resource + +## Running the Autoscaler + +The `cluster-autoscaler` can be run inside the RKE2 cluster, on the Rancher +server cluster or on a completely separate machine. To run it outside the RKE2 +cluster, make sure to provide a kubeconfig with `--kubeconfig`. + +To start the autoscaler with the Rancher provider, the cloud provider needs to +be specified: + +```bash +cluster-autoscaler --cloud-provider=rancher --cloud-config=config.yaml +``` + +## Enabling Autoscaling + +In order for the autoscaler to function, the RKE2 cluster needs to be +configured accordingly. The autoscaler works by adjusting the `quantity` of a +`machinePool` dynamically. For the autoscaler to know the min/max size of a +`machinePool` we need to set a few annotations using the +`machineDeploymentAnnotations` field. That field has been chosen because +updating it does not trigger a full rollout of a `machinePool`. + +```yaml +apiVersion: provisioning.cattle.io/v1 +kind: Cluster +spec: + rkeConfig: + machinePools: + - name: pool-1 + quantity: 1 + workerRole: true + machineDeploymentAnnotations: + cluster.provisioning.cattle.io/autoscaler-min-size: "1" + cluster.provisioning.cattle.io/autoscaler-max-size: "3" +``` + +Optionally in order to enable scaling a `machinePool` from and to 0 nodes, we +need to add a few more annotations to let the autoscaler know, which resources +a single node in a pool provides: + +```yaml +apiVersion: provisioning.cattle.io/v1 +kind: Cluster +spec: + rkeConfig: + machinePools: + - name: pool-1 + machineDeploymentAnnotations: + cluster.provisioning.cattle.io/autoscaler-min-size: "0" + cluster.provisioning.cattle.io/autoscaler-max-size: "3" + cluster.provisioning.cattle.io/autoscaler-resource-cpu: "1" + cluster.provisioning.cattle.io/autoscaler-resource-ephemeral-storage: 50Gi + cluster.provisioning.cattle.io/autoscaler-resource-memory: 4Gi +``` diff --git a/cluster-autoscaler/cloudprovider/rancher/examples/config.yaml b/cluster-autoscaler/cloudprovider/rancher/examples/config.yaml new file mode 100644 index 000000000000..81d4eb58091e --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/examples/config.yaml @@ -0,0 +1,9 @@ +# rancher server credentials +url: https://rancher.example.org +token: +# name and namespace of the clusters.provisioning.cattle.io resource on the +# rancher server +clusterName: my-cluster +clusterNamespace: fleet-default +# optional, will be auto-discovered if not specified +#clusterAPIVersion: v1alpha4 \ No newline at end of file diff --git a/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1/rke.go b/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1/rke.go new file mode 100644 index 000000000000..20dc47f5cfde --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1/rke.go @@ -0,0 +1,88 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// RKEMachinePool configures a RKE machine pool +type RKEMachinePool struct { + RKECommonNodeConfig `json:",inline"` + + Paused bool `json:"paused,omitempty"` + EtcdRole bool `json:"etcdRole,omitempty"` + ControlPlaneRole bool `json:"controlPlaneRole,omitempty"` + WorkerRole bool `json:"workerRole,omitempty"` + DrainBeforeDelete bool `json:"drainBeforeDelete,omitempty"` + DrainBeforeDeleteTimeout *metav1.Duration `json:"drainBeforeDeleteTimeout,omitempty"` + NodeConfig *corev1.ObjectReference `json:"machineConfigRef,omitempty" wrangler:"required"` + Name string `json:"name,omitempty" wrangler:"required"` + DisplayName string `json:"displayName,omitempty"` + Quantity *int32 `json:"quantity,omitempty"` + RollingUpdate *RKEMachinePoolRollingUpdate `json:"rollingUpdate,omitempty"` + MachineDeploymentLabels map[string]string `json:"machineDeploymentLabels,omitempty"` + MachineDeploymentAnnotations map[string]string `json:"machineDeploymentAnnotations,omitempty"` + NodeStartupTimeout *metav1.Duration `json:"nodeStartupTimeout,omitempty"` + UnhealthyNodeTimeout *metav1.Duration `json:"unhealthyNodeTimeout,omitempty"` + MaxUnhealthy *string `json:"maxUnhealthy,omitempty"` + UnhealthyRange *string `json:"unhealthyRange,omitempty"` + MachineOS string `json:"machineOS,omitempty"` +} + +// RKEMachinePoolRollingUpdate configures the rolling update of a machine pool +type RKEMachinePoolRollingUpdate struct { + // The maximum number of machines that can be unavailable during the update. + // Value can be an absolute number (ex: 5) or a percentage of desired + // machines (ex: 10%). + // Absolute number is calculated from percentage by rounding down. + // This can not be 0 if MaxSurge is 0. + // Defaults to 0. + // Example: when this is set to 30%, the old MachineSet can be scaled + // down to 70% of desired machines immediately when the rolling update + // starts. Once new machines are ready, old MachineSet can be scaled + // down further, followed by scaling up the new MachineSet, ensuring + // that the total number of machines available at all times + // during the update is at least 70% of desired machines. + // +optional + MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + + // The maximum number of machines that can be scheduled above the + // desired number of machines. + // Value can be an absolute number (ex: 5) or a percentage of + // desired machines (ex: 10%). + // This can not be 0 if MaxUnavailable is 0. + // Absolute number is calculated from percentage by rounding up. + // Defaults to 1. + // Example: when this is set to 30%, the new MachineSet can be scaled + // up immediately when the rolling update starts, such that the total + // number of old and new machines do not exceed 130% of desired + // machines. Once old machines have been killed, new MachineSet can + // be scaled up further, ensuring that total number of machines running + // at any time during the update is at most 130% of desired machines. + // +optional + MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"` +} + +// RKECommonNodeConfig contains common node configuration +type RKECommonNodeConfig struct { + Labels map[string]string `json:"labels,omitempty"` + Taints []corev1.Taint `json:"taints,omitempty"` + CloudCredentialSecretName string `json:"cloudCredentialSecretName,omitempty"` +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_clusterapi.go b/cluster-autoscaler/cloudprovider/rancher/rancher_clusterapi.go new file mode 100644 index 000000000000..953d4dbe84c1 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_clusterapi.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" +) + +const ( + clusterAPIGroup = "cluster.x-k8s.io" + machineDeleteAnnotationKey = clusterAPIGroup + "/delete-machine" + machinePhaseProvisioning = "Provisioning" + machinePhasePending = "Pending" + machinePhaseDeleting = "Deleting" + machineDeploymentNameLabelKey = clusterAPIGroup + "/deployment-name" + machineResourceName = "machines" +) + +func getAPIGroupPreferredVersion(client discovery.DiscoveryInterface, apiGroup string) (string, error) { + groupList, err := client.ServerGroups() + if err != nil { + return "", fmt.Errorf("failed to get ServerGroups: %v", err) + } + + for _, group := range groupList.Groups { + if group.Name == apiGroup { + return group.PreferredVersion.Version, nil + } + } + + return "", fmt.Errorf("failed to find API group %q", apiGroup) +} + +func machineGVR(version string) schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: clusterAPIGroup, + Version: version, + Resource: machineResourceName, + } +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_config.go b/cluster-autoscaler/cloudprovider/rancher/rancher_config.go new file mode 100644 index 000000000000..8ac79db5227e --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_config.go @@ -0,0 +1,46 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "fmt" + "os" + + "gopkg.in/yaml.v2" +) + +type cloudConfig struct { + URL string `yaml:"url"` + Token string `yaml:"token"` + ClusterName string `yaml:"clusterName"` + ClusterNamespace string `yaml:"clusterNamespace"` + ClusterAPIVersion string `yaml:"clusterAPIVersion"` +} + +func newConfig(file string) (*cloudConfig, error) { + b, err := os.ReadFile(file) + if err != nil { + return nil, fmt.Errorf("unable to read cloud config file: %w", err) + } + + config := &cloudConfig{} + if err := yaml.Unmarshal(b, config); err != nil { + return nil, fmt.Errorf("unable to unmarshal config file: %w", err) + } + + return config, nil +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_config_test.go b/cluster-autoscaler/cloudprovider/rancher/rancher_config_test.go new file mode 100644 index 000000000000..05e9f60882f3 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_config_test.go @@ -0,0 +1,42 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import "testing" + +func TestNewConfig(t *testing.T) { + cfg, err := newConfig("./examples/config.yaml") + if err != nil { + t.Fatal(err) + } + + if len(cfg.URL) == 0 { + t.Fatal("expected url to be set") + } + + if len(cfg.Token) == 0 { + t.Fatal("expected token to be set") + } + + if len(cfg.ClusterName) == 0 { + t.Fatal("expected cluster name to be set") + } + + if len(cfg.ClusterNamespace) == 0 { + t.Fatal("expected cluster namespace to be set") + } +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup.go b/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup.go new file mode 100644 index 000000000000..84b8f778e05b --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup.go @@ -0,0 +1,491 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + provisioningv1 "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1" + "k8s.io/autoscaler/cluster-autoscaler/config" + klog "k8s.io/klog/v2" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/utils/pointer" +) + +// nodeGroup implements nodeGroup for rancher machine pools. +type nodeGroup struct { + provider *RancherCloudProvider + name string + labels map[string]string + taints []corev1.Taint + minSize int + maxSize int + resources corev1.ResourceList + replicas int +} + +type node struct { + instance cloudprovider.Instance + machine unstructured.Unstructured +} + +var ( + // errMissingMinSizeAnnotation is the error returned when a machine pool does + // not have the min size annotations attached. + errMissingMinSizeAnnotation = errors.New("missing min size annotation") + + // errMissingMaxSizeAnnotation is the error returned when a machine pool does + // not have the max size annotations attached. + errMissingMaxSizeAnnotation = errors.New("missing max size annotation") + + // errMissingResourceAnnotation is the error returned when a machine pool does + // not have all the resource annotations attached. + errMissingResourceAnnotation = errors.New("missing resource annotation") +) + +const podCapacity = 110 + +// Id returns node group id/name. +func (ng *nodeGroup) Id() string { + return ng.name +} + +// MinSize returns minimum size of the node group. +func (ng *nodeGroup) MinSize() int { + return ng.minSize +} + +// MaxSize returns maximum size of the node group. +func (ng *nodeGroup) MaxSize() int { + return ng.maxSize +} + +// Debug returns a debug string for the node group. +func (ng *nodeGroup) Debug() string { + return fmt.Sprintf("%s (%d:%d)", ng.Id(), ng.MinSize(), ng.MaxSize()) +} + +// Nodes returns a list of all nodes that belong to this node group. +func (ng *nodeGroup) Nodes() ([]cloudprovider.Instance, error) { + nodes, err := ng.nodes() + if err != nil { + return nil, err + } + + instances := make([]cloudprovider.Instance, 0, len(nodes)) + for _, node := range nodes { + instances = append(instances, node.instance) + } + + return instances, nil +} + +// DeleteNodes deletes the specified nodes from the node group. +func (ng *nodeGroup) DeleteNodes(toDelete []*corev1.Node) error { + if ng.replicas-len(toDelete) < ng.MinSize() { + return fmt.Errorf("node group size would be below minimum size - desired: %d, min: %d", + ng.replicas-len(toDelete), ng.MinSize()) + } + + for _, del := range toDelete { + node, err := ng.findNodeByProviderID(rke2ProviderIDPrefix + del.Name) + if err != nil { + return err + } + + klog.V(4).Infof("marking machine for deletion: %v", node.instance.Id) + + if err := node.markMachineForDeletion(ng); err != nil { + return fmt.Errorf("unable to mark machine %s for deletion: %w", del.Name, err) + } + + if err := ng.setSize(ng.replicas - 1); err != nil { + // rollback deletion mark + _ = node.unmarkMachineForDeletion(ng) + return fmt.Errorf("unable to set node group size: %w", err) + } + } + + return nil +} + +func (ng *nodeGroup) findNodeByProviderID(providerID string) (*node, error) { + nodes, err := ng.nodes() + if err != nil { + return nil, err + } + + for _, node := range nodes { + if node.instance.Id == providerID { + return &node, nil + } + } + + return nil, fmt.Errorf("node with providerID %s not found in node group %s", providerID, ng.name) +} + +// IncreaseSize increases NodeGroup size. +func (ng *nodeGroup) IncreaseSize(delta int) error { + if delta <= 0 { + return fmt.Errorf("size increase must be positive") + } + + newSize := ng.replicas + delta + if newSize > ng.MaxSize() { + return fmt.Errorf("size increase too large, desired: %d max: %d", newSize, ng.MaxSize()) + } + + return ng.setSize(newSize) +} + +// TargetSize returns the current TARGET size of the node group. It is possible that the +// number is different from the number of nodes registered in Kubernetes. +func (ng *nodeGroup) TargetSize() (int, error) { + return ng.replicas, nil +} + +// DecreaseTargetSize decreases the target size of the node group. This function +// doesn't permit to delete any existing node and can be used only to reduce the +// request for new nodes that have not been yet fulfilled. Delta should be negative. +func (ng *nodeGroup) DecreaseTargetSize(delta int) error { + if delta >= 0 { + return fmt.Errorf("size decrease must be negative") + } + + nodes, err := ng.Nodes() + if err != nil { + return fmt.Errorf("failed to get node group nodes: %w", err) + } + + if ng.replicas+delta < len(nodes) { + return fmt.Errorf("attempt to delete existing nodes targetSize: %d delta: %d existingNodes: %d", + ng.replicas, delta, len(nodes)) + } + + return ng.setSize(ng.replicas + delta) +} + +// TemplateNodeInfo returns a node template for this node group. +func (ng *nodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s-%d", ng.provider.config.ClusterName, ng.Id(), rand.Int63()), + Labels: ng.labels, + }, + Spec: corev1.NodeSpec{ + Taints: ng.taints, + }, + Status: corev1.NodeStatus{ + Capacity: ng.resources, + Conditions: cloudprovider.BuildReadyConditions(), + }, + } + + node.Status.Capacity[corev1.ResourcePods] = *resource.NewQuantity(podCapacity, resource.DecimalSI) + + node.Status.Allocatable = node.Status.Capacity + + // Setup node info template + nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ng.Id())) + nodeInfo.SetNode(node) + + return nodeInfo, nil +} + +// Exist checks if the node group really exists on the cloud provider side. +func (ng *nodeGroup) Exist() bool { + return ng.Id() != "" +} + +// Create creates the node group on the cloud provider side. +func (ng *nodeGroup) Create() (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// Delete deletes the node group on the cloud provider side. +func (ng *nodeGroup) Delete() error { + return cloudprovider.ErrNotImplemented +} + +// Autoprovisioned returns true if the node group is autoprovisioned. +func (ng *nodeGroup) Autoprovisioned() bool { + return false +} + +// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular +// NodeGroup. Returning a nil will result in using default options. +func (ng *nodeGroup) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { + return nil, cloudprovider.ErrNotImplemented +} + +func (ng *nodeGroup) setSize(size int) error { + machinePools, err := ng.provider.getMachinePools() + if err != nil { + return err + } + + found := false + for i := range machinePools { + if machinePools[i].Name == ng.name { + machinePools[i].Quantity = pointer.Int32Ptr(int32(size)) + found = true + break + } + } + + if !found { + return fmt.Errorf("unable to set size of group %s of cluster %s: group not found", + ng.name, ng.provider.config.ClusterName) + } + + if err := ng.provider.updateMachinePools(machinePools); err != nil { + return err + } + + ng.replicas = size + return nil +} + +// nodes returns all nodes of this node group that have a provider ID set by +// getting the underlying machines and extracting the providerID, which +// corresponds to the name of the k8s node object. +func (ng *nodeGroup) nodes() ([]node, error) { + machines, err := ng.machines() + if err != nil { + return nil, err + } + + nodes := make([]node, 0, len(machines)) + for _, machine := range machines { + phase, found, err := unstructured.NestedString(machine.UnstructuredContent(), "status", "phase") + if err != nil { + return nil, err + } + + if !found { + return nil, fmt.Errorf("machine %s/%s does not have status.phase field", machine.GetName(), machine.GetNamespace()) + } + + providerID, found, err := unstructured.NestedString(machine.UnstructuredContent(), "spec", "providerID") + if err != nil { + return nil, err + } + + if !found { + if phase == machinePhaseProvisioning { + // if the provider ID is missing during provisioning, we + // ignore this node to avoid errors in the autoscaler. + continue + } + + return nil, fmt.Errorf("could not find providerID in machine: %s/%s", machine.GetName(), machine.GetNamespace()) + } + + state := cloudprovider.InstanceRunning + + switch phase { + case machinePhasePending, machinePhaseProvisioning: + state = cloudprovider.InstanceCreating + case machinePhaseDeleting: + state = cloudprovider.InstanceDeleting + } + + nodes = append(nodes, node{ + machine: machine, + instance: cloudprovider.Instance{ + Id: providerID, + Status: &cloudprovider.InstanceStatus{ + State: state, + }, + }, + }) + } + + return nodes, nil +} + +// machines returns the unstructured objects of all cluster-api machines in a +// node group. The machines are found using the deployment name label. +func (ng *nodeGroup) machines() ([]unstructured.Unstructured, error) { + machinesList, err := ng.provider.client.Resource(machineGVR(ng.provider.config.ClusterAPIVersion)). + Namespace(ng.provider.config.ClusterNamespace).List( + context.TODO(), metav1.ListOptions{ + // we find all machines belonging to an rke2 machinePool by the + // deployment name, since it is just - + LabelSelector: fmt.Sprintf("%s=%s-%s", machineDeploymentNameLabelKey, ng.provider.config.ClusterName, ng.name), + }, + ) + + return machinesList.Items, err +} + +// markMachineForDeletion sets an annotation on the cluster-api machine +// object, inidicating that this node is a candidate to be removed on scale +// down of the controlling resource (machineSet/machineDeployment). +func (n *node) markMachineForDeletion(ng *nodeGroup) error { + u, err := ng.provider.client.Resource(machineGVR(ng.provider.config.ClusterAPIVersion)).Namespace(n.machine.GetNamespace()). + Get(context.TODO(), n.machine.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + u = u.DeepCopy() + + annotations := u.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} + } + + annotations[machineDeleteAnnotationKey] = time.Now().String() + u.SetAnnotations(annotations) + + _, err = ng.provider.client.Resource(machineGVR(ng.provider.config.ClusterAPIVersion)).Namespace(u.GetNamespace()). + Update(context.TODO(), u, metav1.UpdateOptions{}) + + return err +} + +// unmarkMachineForDeletion removes the machine delete annotation. +func (n *node) unmarkMachineForDeletion(ng *nodeGroup) error { + u, err := ng.provider.client.Resource(machineGVR(ng.provider.config.ClusterAPIVersion)).Namespace(n.machine.GetNamespace()). + Get(context.TODO(), n.machine.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + + u = u.DeepCopy() + + annotations := u.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} + } + + delete(annotations, machineDeleteAnnotationKey) + u.SetAnnotations(annotations) + + _, err = ng.provider.client.Resource(machineGVR(ng.provider.config.ClusterAPIVersion)).Namespace(u.GetNamespace()). + Update(context.TODO(), u, metav1.UpdateOptions{}) + + return err +} + +func newNodeGroupFromMachinePool(provider *RancherCloudProvider, machinePool provisioningv1.RKEMachinePool) (*nodeGroup, error) { + if machinePool.Quantity == nil { + return nil, errors.New("machine pool quantity is not set") + } + + minSize, maxSize, err := parseScalingAnnotations(machinePool.MachineDeploymentAnnotations) + if err != nil { + return nil, fmt.Errorf("error parsing scaling annotations: %w", err) + } + + resources, err := parseResourceAnnotations(machinePool.MachineDeploymentAnnotations) + if err != nil { + if !errors.Is(err, errMissingResourceAnnotation) { + return nil, fmt.Errorf("error parsing resource annotations: %w", err) + } + // if the resource labels are missing, we simply initialize an empty + // list. The autoscaler can still work but won't scale up from 0 if a + // pod requests any resources. + resources = corev1.ResourceList{} + } + + return &nodeGroup{ + provider: provider, + name: machinePool.Name, + labels: machinePool.Labels, + taints: machinePool.Taints, + minSize: minSize, + maxSize: maxSize, + replicas: int(*machinePool.Quantity), + resources: resources, + }, nil +} + +func parseResourceAnnotations(annotations map[string]string) (corev1.ResourceList, error) { + cpu, ok := annotations[resourceCPUAnnotation] + if !ok { + return nil, errMissingResourceAnnotation + } + + cpuResources, err := resource.ParseQuantity(cpu) + if err != nil { + return nil, fmt.Errorf("unable to parse cpu resources: %s", cpu) + } + memory, ok := annotations[resourceMemoryAnnotation] + if !ok { + return nil, errMissingResourceAnnotation + } + + memoryResources, err := resource.ParseQuantity(memory) + if err != nil { + return nil, fmt.Errorf("unable to parse cpu resources: %s", cpu) + } + ephemeralStorage, ok := annotations[resourceEphemeralStorageAnnotation] + if !ok { + return nil, errMissingResourceAnnotation + } + + ephemeralStorageResources, err := resource.ParseQuantity(ephemeralStorage) + if err != nil { + return nil, fmt.Errorf("unable to parse cpu resources: %s", cpu) + } + + return corev1.ResourceList{ + corev1.ResourceCPU: cpuResources, + corev1.ResourceMemory: memoryResources, + corev1.ResourceEphemeralStorage: ephemeralStorageResources, + }, nil +} + +func parseScalingAnnotations(annotations map[string]string) (int, int, error) { + min, ok := annotations[minSizeAnnotation] + if !ok { + return 0, 0, errMissingMinSizeAnnotation + } + + minSize, err := strconv.Atoi(min) + if err != nil { + return 0, 0, fmt.Errorf("unable to parse min size: %s", min) + } + + max, ok := annotations[maxSizeAnnotation] + if !ok { + return 0, 0, errMissingMaxSizeAnnotation + } + + maxSize, err := strconv.Atoi(max) + if err != nil { + return 0, 0, fmt.Errorf("unable to parse min size: %s", min) + } + + if minSize < 0 || maxSize < 0 { + return 0, 0, fmt.Errorf("invalid min or max size supplied: %v/%v", minSize, maxSize) + } + + return minSize, maxSize, nil +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup_test.go b/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup_test.go new file mode 100644 index 000000000000..9fc990a1c367 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_nodegroup_test.go @@ -0,0 +1,567 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "fmt" + "reflect" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + provisioningv1 "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1" + fakedynamic "k8s.io/client-go/dynamic/fake" + "k8s.io/utils/pointer" +) + +const ( + testCluster = "test-cluster" + testNamespace = "default" + nodeGroupDev = "dev" + nodeGroupProd = "ng-long-prod" +) + +func TestNodeGroupNodes(t *testing.T) { + tests := []struct { + name string + nodeGroup nodeGroup + expectedNodes int + expectedErrContains string + machines func() []runtime.Object + }{ + { + name: "normal", + nodeGroup: nodeGroup{name: nodeGroupDev}, + expectedNodes: 2, + machines: func() []runtime.Object { + return []runtime.Object{ + newMachine(nodeGroupDev, 0), + newMachine(nodeGroupDev, 1), + } + }, + }, + { + name: "mixed machines", + nodeGroup: nodeGroup{name: nodeGroupDev}, + expectedNodes: 3, + machines: func() []runtime.Object { + return []runtime.Object{ + newMachine(nodeGroupDev, 0), + newMachine(nodeGroupDev, 1), + newMachine(nodeGroupDev, 2), + newMachine(nodeGroupProd, 0), + newMachine(nodeGroupProd, 1), + } + }, + }, + { + name: "no matching machines", + nodeGroup: nodeGroup{name: nodeGroupDev}, + expectedNodes: 0, + machines: func() []runtime.Object { + return []runtime.Object{ + newMachine(nodeGroupProd, 0), + newMachine(nodeGroupProd, 1), + } + }, + }, + { + name: "machine without provider id", + nodeGroup: nodeGroup{name: nodeGroupDev}, + expectedNodes: 0, + expectedErrContains: "could not find providerID in machine", + machines: func() []runtime.Object { + machine := newMachine(nodeGroupDev, 0) + _ = unstructured.SetNestedMap(machine.Object, map[string]interface{}{}, "spec") + return []runtime.Object{machine} + }, + }, + { + name: "machine without provider id during provisioning", + nodeGroup: nodeGroup{name: nodeGroupDev}, + expectedNodes: 1, + machines: func() []runtime.Object { + machineProvisioning := newMachine(nodeGroupDev, 0) + _ = unstructured.SetNestedMap(machineProvisioning.Object, map[string]interface{}{}, "spec") + _ = unstructured.SetNestedField(machineProvisioning.Object, machinePhaseProvisioning, "status", "phase") + return []runtime.Object{ + machineProvisioning, + newMachine(nodeGroupDev, 1), + } + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + provider, err := setup(tc.machines()) + if err != nil { + t.Fatal(err) + } + + tc.nodeGroup.provider = provider + + nodes, err := tc.nodeGroup.Nodes() + if err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + } + if len(nodes) != tc.expectedNodes { + t.Fatalf("expected %v nodes, got %v", tc.expectedNodes, len(nodes)) + } + }) + } +} + +func TestNodeGroupDeleteNodes(t *testing.T) { + tests := []struct { + name string + nodeGroup nodeGroup + expectedTargetSize int + expectedErrContains string + machines []runtime.Object + toDelete []*corev1.Node + }{ + { + name: "delete node", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 1, + minSize: 0, + maxSize: 2, + }, + expectedTargetSize: 0, + machines: []runtime.Object{newMachine(nodeGroupDev, 0)}, + toDelete: []*corev1.Node{ + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 0)}}, + }, + }, + { + name: "delete multiple nodes", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 3, + minSize: 0, + maxSize: 3, + }, + expectedTargetSize: 1, + machines: []runtime.Object{newMachine(nodeGroupDev, 0), newMachine(nodeGroupDev, 1), newMachine(nodeGroupDev, 2)}, + toDelete: []*corev1.Node{ + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 0)}}, + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 2)}}, + }, + }, + { + name: "delete unknown node", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 1, + minSize: 0, + maxSize: 2, + }, + expectedTargetSize: 1, + expectedErrContains: fmt.Sprintf("node with providerID rke2://%s not found in node group %s", nodeName(nodeGroupDev, 42), nodeGroupDev), + machines: []runtime.Object{newMachine(nodeGroupDev, 0)}, + toDelete: []*corev1.Node{ + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 42)}}, + }, + }, + { + name: "delete more nodes than min size", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 2, + minSize: 1, + maxSize: 2, + }, + expectedTargetSize: 2, + expectedErrContains: "node group size would be below minimum size - desired: 0, min: 1", + machines: []runtime.Object{newMachine(nodeGroupDev, 0), newMachine(nodeGroupDev, 1)}, + toDelete: []*corev1.Node{ + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 0)}}, + {ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 1)}}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + provider, err := setup(tc.machines) + if err != nil { + t.Fatal(err) + } + + tc.nodeGroup.provider = provider + + if err := provider.Refresh(); err != nil { + t.Fatal(err) + } + + // store delta before deleting nodes + delta := tc.nodeGroup.replicas - tc.expectedTargetSize + + if err := tc.nodeGroup.DeleteNodes(tc.toDelete); err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + } + + targetSize, err := tc.nodeGroup.TargetSize() + if err != nil { + t.Fatal(err) + } + + if tc.expectedTargetSize != targetSize { + t.Fatalf("expected target size %v, got %v", tc.expectedTargetSize, targetSize) + } + + machines, err := tc.nodeGroup.machines() + if err != nil { + t.Fatal(err) + } + + annotationCount := 0 + for _, machine := range machines { + if _, ok := machine.GetAnnotations()[machineDeleteAnnotationKey]; ok { + annotationCount++ + } + } + if annotationCount != delta { + t.Fatalf("expected %v machines to have the deleted annotation, got %v", delta, annotationCount) + } + }) + } +} + +func TestIncreaseTargetSize(t *testing.T) { + tests := []struct { + name string + delta int + nodeGroup nodeGroup + expectedErrContains string + }{ + { + name: "valid increase", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 0, + minSize: 0, + maxSize: 2, + }, + delta: 2, + }, + { + name: "too large", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 1, + minSize: 0, + maxSize: 2, + }, + delta: 2, + expectedErrContains: "size increase too large, desired: 3 max: 2", + }, + { + name: "negative", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 2, + minSize: 0, + maxSize: 2, + }, + delta: -2, + expectedErrContains: "size increase must be positive", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + provider, err := setup(nil) + if err != nil { + t.Fatal(err) + } + + tc.nodeGroup.provider = provider + if err := tc.nodeGroup.IncreaseSize(tc.delta); err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + } + }) + } +} + +func TestDecreaseTargetSize(t *testing.T) { + tests := []struct { + name string + delta int + nodeGroup nodeGroup + expectedErrContains string + }{ + { + name: "valid decrease", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 2, + minSize: 0, + maxSize: 2, + }, + delta: -2, + }, + { + name: "too large", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 1, + minSize: 0, + maxSize: 2, + }, + delta: -2, + expectedErrContains: "attempt to delete existing nodes targetSize: 1 delta: -2 existingNodes: 0", + }, + { + name: "positive", + nodeGroup: nodeGroup{ + name: nodeGroupDev, + replicas: 2, + minSize: 0, + maxSize: 2, + }, + delta: 2, + expectedErrContains: "size decrease must be negative", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + provider, err := setup(nil) + if err != nil { + t.Fatal(err) + } + + tc.nodeGroup.provider = provider + if err := tc.nodeGroup.DecreaseTargetSize(tc.delta); err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + } + }) + } +} + +func TestTemplateNodeInfo(t *testing.T) { + provider, err := setup(nil) + if err != nil { + t.Fatal(err) + } + + ng := nodeGroup{ + name: nodeGroupDev, + replicas: 2, + minSize: 0, + maxSize: 2, + provider: provider, + resources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + corev1.ResourceEphemeralStorage: resource.MustParse("100Gi"), + }, + } + + nodeInfo, err := ng.TemplateNodeInfo() + if err != nil { + t.Fatal(err) + } + + if nodeInfo.Allocatable.MilliCPU != ng.resources.Cpu().MilliValue() { + t.Fatalf("expected nodeInfo to have %v MilliCPU, got %v", + ng.resources.Cpu().MilliValue(), nodeInfo.Allocatable.MilliCPU) + } + + if nodeInfo.Allocatable.Memory != ng.resources.Memory().Value() { + t.Fatalf("expected nodeInfo to have %v Memory, got %v", + ng.resources.Memory().Value(), nodeInfo.Allocatable.Memory) + } + + if nodeInfo.Allocatable.EphemeralStorage != ng.resources.StorageEphemeral().Value() { + t.Fatalf("expected nodeInfo to have %v ephemeral storage, got %v", + ng.resources.StorageEphemeral().Value(), nodeInfo.Allocatable.EphemeralStorage) + } +} + +func TestNewNodeGroupFromMachinePool(t *testing.T) { + provider, err := setup(nil) + if err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + machinePool provisioningv1.RKEMachinePool + expectedErrContains string + expectedResources corev1.ResourceList + }{ + { + name: "valid", + machinePool: provisioningv1.RKEMachinePool{ + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + MachineDeploymentAnnotations: map[string]string{ + minSizeAnnotation: "0", + maxSizeAnnotation: "3", + resourceCPUAnnotation: "2", + resourceMemoryAnnotation: "4Gi", + resourceEphemeralStorageAnnotation: "50Gi", + }, + }, + expectedResources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + corev1.ResourceEphemeralStorage: resource.MustParse("50Gi"), + }, + }, + { + name: "missing size annotations", + expectedErrContains: "missing min size annotation", + machinePool: provisioningv1.RKEMachinePool{ + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + MachineDeploymentAnnotations: map[string]string{ + resourceCPUAnnotation: "2", + resourceMemoryAnnotation: "4Gi", + resourceEphemeralStorageAnnotation: "50Gi", + }, + }, + }, + { + name: "missing resource annotations", + machinePool: provisioningv1.RKEMachinePool{ + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + MachineDeploymentAnnotations: map[string]string{ + minSizeAnnotation: "0", + maxSizeAnnotation: "3", + }, + }, + expectedResources: corev1.ResourceList{}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ng, err := newNodeGroupFromMachinePool(provider, tc.machinePool) + if err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + return + } + + if ng.replicas != int(*tc.machinePool.Quantity) { + t.Fatalf("expected nodegroup replicas %v, got %v", ng.replicas, tc.machinePool.Quantity) + } + + if !reflect.DeepEqual(tc.expectedResources, ng.resources) { + t.Fatalf("expected resources %v do not match node group resources %v", tc.expectedResources, ng.resources) + } + }) + } +} + +func setup(machines []runtime.Object) (*RancherCloudProvider, error) { + config := &cloudConfig{ + ClusterName: testCluster, + ClusterNamespace: testNamespace, + ClusterAPIVersion: "v1alpha4", + } + + machinePools := []provisioningv1.RKEMachinePool{ + { + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + MachineDeploymentAnnotations: map[string]string{ + minSizeAnnotation: "0", + maxSizeAnnotation: "3", + resourceCPUAnnotation: "2", + resourceMemoryAnnotation: "4Gi", + resourceEphemeralStorageAnnotation: "50Gi", + }, + }, + { + Name: nodeGroupProd, + Quantity: pointer.Int32(3), + MachineDeploymentAnnotations: map[string]string{ + minSizeAnnotation: "0", + maxSizeAnnotation: "3", + resourceCPUAnnotation: "2", + resourceMemoryAnnotation: "4Gi", + resourceEphemeralStorageAnnotation: "50Gi", + }, + }, + } + + pools, err := machinePoolsToUnstructured(machinePools) + if err != nil { + return nil, err + } + + return &RancherCloudProvider{ + resourceLimiter: &cloudprovider.ResourceLimiter{}, + client: fakedynamic.NewSimpleDynamicClientWithCustomListKinds( + runtime.NewScheme(), + map[schema.GroupVersionResource]string{ + machineGVR(config.ClusterAPIVersion): "kindList", + }, + append(machines, newCluster(testCluster, testNamespace, pools))..., + ), + config: config, + }, nil +} + +func newMachine(nodeGroupName string, num int) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "Machine", + "apiVersion": "cluster.x-k8s.io/v1alpha4", + "metadata": map[string]interface{}{ + "name": nodeName(nodeGroupName, num), + "namespace": testNamespace, + "labels": map[string]interface{}{ + machineDeploymentNameLabelKey: fmt.Sprintf("%s-%s", testCluster, nodeGroupName), + }, + }, + "spec": map[string]interface{}{ + "clusterName": testCluster, + "providerID": rke2ProviderIDPrefix + nodeName(nodeGroupName, num), + }, + "status": map[string]interface{}{ + "phase": "Running", + }, + }, + } +} + +func nodeName(nodeGroupName string, num int) string { + return fmt.Sprintf("%s-%s-123456-%v", testCluster, nodeGroupName, num) +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_provider.go b/cluster-autoscaler/cloudprovider/rancher/rancher_provider.go new file mode 100644 index 000000000000..6e1126b6e00f --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_provider.go @@ -0,0 +1,312 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" + autoscalererrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + + provisioningv1 "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1" + klog "k8s.io/klog/v2" +) + +const ( + // providerName is the cloud provider name for rancher + providerName = "rancher" + + // rke2ProviderID identifies nodes that are using RKE2 + rke2ProviderID = "rke2" + rke2ProviderIDPrefix = rke2ProviderID + "://" + + rancherProvisioningGroup = "provisioning.cattle.io" + rancherProvisioningVersion = "v1" + rancherLocalClusterPath = "/k8s/clusters/local" + + minSizeAnnotation = "cluster.provisioning.cattle.io/autoscaler-min-size" + maxSizeAnnotation = "cluster.provisioning.cattle.io/autoscaler-max-size" + resourceCPUAnnotation = "cluster.provisioning.cattle.io/autoscaler-resource-cpu" + resourceMemoryAnnotation = "cluster.provisioning.cattle.io/autoscaler-resource-memory" + resourceEphemeralStorageAnnotation = "cluster.provisioning.cattle.io/autoscaler-resource-ephemeral-storage" +) + +// RancherCloudProvider implements CloudProvider interface for rancher +type RancherCloudProvider struct { + resourceLimiter *cloudprovider.ResourceLimiter + client dynamic.Interface + nodeGroups []*nodeGroup + config *cloudConfig +} + +// BuildRancher builds rancher cloud provider. +func BuildRancher(opts config.AutoscalingOptions, _ cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { + provider, err := newRancherCloudProvider(opts.CloudConfig, rl) + if err != nil { + klog.Fatalf("failed to create rancher cloud provider: %v", err) + } + return provider +} + +func newRancherCloudProvider(cloudConfig string, resourceLimiter *cloudprovider.ResourceLimiter) (*RancherCloudProvider, error) { + config, err := newConfig(cloudConfig) + if err != nil { + return nil, fmt.Errorf("unable to create cloud config: %w", err) + } + + restConfig := &rest.Config{ + Host: config.URL, + APIPath: rancherLocalClusterPath, + BearerToken: config.Token, + } + + client, err := dynamic.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("unable to create dynamic client: %w", err) + } + + discovery, err := discovery.NewDiscoveryClientForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("unable to create discovery client: %w", err) + } + + if config.ClusterAPIVersion == "" { + // automatically discover cluster API version + clusterAPIVersion, err := getAPIGroupPreferredVersion(discovery, clusterAPIGroup) + if err != nil { + return nil, err + } + + config.ClusterAPIVersion = clusterAPIVersion + } + + return &RancherCloudProvider{ + resourceLimiter: resourceLimiter, + client: client, + config: config, + }, nil +} + +// Name returns name of the cloud provider. +func (provider *RancherCloudProvider) Name() string { + return providerName +} + +// GPULabel returns the label added to nodes with GPU resource. +func (provider *RancherCloudProvider) GPULabel() string { + return "" +} + +// GetAvailableGPUTypes return all available GPU types cloud provider supports +func (provider *RancherCloudProvider) GetAvailableGPUTypes() map[string]struct{} { + // TODO: implement GPU support + return nil +} + +// NodeGroups returns all node groups configured for this cloud provider. +func (provider *RancherCloudProvider) NodeGroups() []cloudprovider.NodeGroup { + nodeGroups := make([]cloudprovider.NodeGroup, len(provider.nodeGroups)) + for i, ng := range provider.nodeGroups { + nodeGroups[i] = ng + } + return nodeGroups +} + +// Pricing returns pricing model for this cloud provider or error if not available. +func (provider *RancherCloudProvider) Pricing() (cloudprovider.PricingModel, autoscalererrors.AutoscalerError) { + return nil, cloudprovider.ErrNotImplemented +} + +// NodeGroupForNode returns the node group for the given node. +func (provider *RancherCloudProvider) NodeGroupForNode(node *corev1.Node) (cloudprovider.NodeGroup, error) { + // skip nodes that are not managed by rke2. + if !strings.HasPrefix(node.Spec.ProviderID, rke2ProviderID) { + return nil, nil + } + + for _, group := range provider.nodeGroups { + // the node name is expected to have the following format: + // --- + // so we trim the cluster name and then cut off the last two parts to + // leave us with the node group name + parts := strings.Split(strings.TrimPrefix(node.Name, provider.config.ClusterName), "-") + if len(parts) < 4 { + return nil, fmt.Errorf("unable to get node group name out of node %s: unexpected node name format", node.Name) + } + groupName := strings.Join(parts[1:len(parts)-2], "-") + if group.name == groupName { + return group, nil + } + } + + // if node is not in one of our scalable nodeGroups, we return nil so it + // won't be processed further by the CA. + return nil, nil +} + +// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider. +// Implementation optional. +func (provider *RancherCloudProvider) GetAvailableMachineTypes() ([]string, error) { + return []string{}, cloudprovider.ErrNotImplemented +} + +// NewNodeGroup builds a theoretical node group based on the node definition provided. +func (provider *RancherCloudProvider) NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string, + taints []corev1.Taint, + extraResources map[string]resource.Quantity) (cloudprovider.NodeGroup, error) { + return nil, cloudprovider.ErrNotImplemented +} + +// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.). +func (provider *RancherCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { + return provider.resourceLimiter, nil +} + +// Refresh is called before every main loop and can be used to dynamically update cloud provider state. +// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). +func (provider *RancherCloudProvider) Refresh() error { + nodeGroups, err := provider.scalableNodeGroups() + if err != nil { + return fmt.Errorf("unable to get node groups from cluster: %w", err) + } + + provider.nodeGroups = nodeGroups + return nil +} + +// Cleanup cleans up all resources before the cloud provider is removed +func (provider *RancherCloudProvider) Cleanup() error { + return nil +} + +func (provider *RancherCloudProvider) scalableNodeGroups() ([]*nodeGroup, error) { + var result []*nodeGroup + + pools, err := provider.getMachinePools() + if err != nil { + return nil, err + } + + for _, pool := range pools { + nodeGroup, err := newNodeGroupFromMachinePool(provider, pool) + if err != nil { + if isNotScalable(err) { + klog.V(4).Infof("ignoring machine pool %s as it does not have min/max annotations", pool.Name) + continue + } + + return nil, fmt.Errorf("error getting node group from machine pool: %w", err) + } + + klog.V(4).Infof("scalable node group found: %s", nodeGroup.Debug()) + + result = append(result, nodeGroup) + } + + return result, err +} + +func clusterGVR() schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: rancherProvisioningGroup, + Version: rancherProvisioningVersion, + Resource: "clusters", + } +} + +func (provider *RancherCloudProvider) getMachinePools() ([]provisioningv1.RKEMachinePool, error) { + res, err := provider.client.Resource(clusterGVR()). + Namespace(provider.config.ClusterNamespace). + Get(context.TODO(), provider.config.ClusterName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("error getting cluster: %w", err) + } + + machinePools, ok, err := unstructured.NestedFieldNoCopy(res.Object, "spec", "rkeConfig", "machinePools") + if !ok { + return nil, fmt.Errorf("unable to find machinePools of cluster %s", provider.config.ClusterName) + } + if err != nil { + return nil, err + } + + data, err := json.Marshal(machinePools) + if err != nil { + return nil, err + } + + var pools []provisioningv1.RKEMachinePool + err = json.Unmarshal(data, &pools) + + return pools, err +} + +func (provider *RancherCloudProvider) updateMachinePools(machinePools []provisioningv1.RKEMachinePool) error { + cluster, err := provider.client.Resource(clusterGVR()). + Namespace(provider.config.ClusterNamespace). + Get(context.TODO(), provider.config.ClusterName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error getting cluster: %w", err) + } + + pools, err := machinePoolsToUnstructured(machinePools) + if err != nil { + return err + } + + if err := unstructured.SetNestedSlice(cluster.Object, pools, "spec", "rkeConfig", "machinePools"); err != nil { + return err + } + + _, err = provider.client.Resource(clusterGVR()).Namespace(provider.config.ClusterNamespace). + Update(context.TODO(), &unstructured.Unstructured{Object: cluster.Object}, metav1.UpdateOptions{}) + return err +} + +// converts machinePools into a usable form for the unstructured client. +// unstructured.SetNestedSlice expects types produced by json.Unmarshal(), +// so we marshal and unmarshal again before passing it on. +func machinePoolsToUnstructured(machinePools []provisioningv1.RKEMachinePool) ([]interface{}, error) { + data, err := json.Marshal(machinePools) + if err != nil { + return nil, err + } + + var pools []interface{} + if err := json.Unmarshal(data, &pools); err != nil { + return nil, err + } + + return pools, nil +} + +func isNotScalable(err error) bool { + return errors.Is(err, errMissingMinSizeAnnotation) || errors.Is(err, errMissingMaxSizeAnnotation) +} diff --git a/cluster-autoscaler/cloudprovider/rancher/rancher_provider_test.go b/cluster-autoscaler/cloudprovider/rancher/rancher_provider_test.go new file mode 100644 index 000000000000..8b101f2e4d6d --- /dev/null +++ b/cluster-autoscaler/cloudprovider/rancher/rancher_provider_test.go @@ -0,0 +1,213 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rancher + +import ( + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + provisioningv1 "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/rancher/provisioning.cattle.io/v1" + fakedynamic "k8s.io/client-go/dynamic/fake" + "k8s.io/utils/pointer" +) + +func TestNodeGroups(t *testing.T) { + tests := []struct { + name string + machinePools []provisioningv1.RKEMachinePool + expectedGroups int + expectedErrContains string + expectedResources corev1.ResourceList + clusterNameOverride string + }{ + { + name: "normal", + machinePools: []provisioningv1.RKEMachinePool{ + { + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + MachineDeploymentAnnotations: map[string]string{ + minSizeAnnotation: "0", + maxSizeAnnotation: "3", + }, + }, + }, + expectedGroups: 1, + }, + { + name: "without size annotations", + machinePools: []provisioningv1.RKEMachinePool{ + { + Name: nodeGroupDev, + Quantity: pointer.Int32(1), + }, + }, + expectedGroups: 0, + }, + { + name: "missing quantity", + expectedGroups: 0, + expectedErrContains: "machine pool quantity is not set", + }, + { + name: "missing cluster", + expectedGroups: 0, + expectedErrContains: "clusters.provisioning.cattle.io \"some-other-cluster\" not found", + clusterNameOverride: "some-other-cluster", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pools, err := machinePoolsToUnstructured(tc.machinePools) + if err != nil { + t.Fatal(err) + } + + config := &cloudConfig{ + ClusterName: "test-cluster", + ClusterNamespace: "default", + } + + cluster := newCluster(config.ClusterName, config.ClusterNamespace, pools) + + if tc.clusterNameOverride != "" { + config.ClusterName = tc.clusterNameOverride + } + + provider := RancherCloudProvider{ + resourceLimiter: &cloudprovider.ResourceLimiter{}, + client: fakedynamic.NewSimpleDynamicClientWithCustomListKinds( + runtime.NewScheme(), + map[schema.GroupVersionResource]string{ + clusterGVR(): "kindList", + }, + cluster, + ), + config: config, + } + + if err := provider.Refresh(); err != nil { + if tc.expectedErrContains == "" || !strings.Contains(err.Error(), tc.expectedErrContains) { + t.Fatalf("expected err to contain %q, got %q", tc.expectedErrContains, err) + } + } + + if len(provider.NodeGroups()) != tc.expectedGroups { + t.Fatalf("expected %q groups, got %q", tc.expectedGroups, len(provider.NodeGroups())) + } + }) + } +} + +func TestNodeGroupForNode(t *testing.T) { + provider, err := setup(nil) + if err != nil { + t.Fatal(err) + } + + if err := provider.Refresh(); err != nil { + t.Fatal(err) + } + + tests := []struct { + name string + node *corev1.Node + nodeGroupId string + expectedNodeGroupNil bool + }{ + { + name: "match dev", + node: &corev1.Node{ + ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 0)}, + Spec: corev1.NodeSpec{ + ProviderID: rke2ProviderIDPrefix + nodeName(nodeGroupDev, 0), + }, + }, + nodeGroupId: nodeGroupDev, + }, + { + name: "match prod", + node: &corev1.Node{ + ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupProd, 0)}, + Spec: corev1.NodeSpec{ + ProviderID: rke2ProviderIDPrefix + nodeName(nodeGroupProd, 0), + }, + }, + nodeGroupId: nodeGroupProd, + }, + { + name: "not rke2 node", + node: &corev1.Node{ + ObjectMeta: v1.ObjectMeta{Name: nodeName(nodeGroupDev, 0)}, + Spec: corev1.NodeSpec{ + ProviderID: "whatever://" + nodeName(nodeGroupDev, 0), + }, + }, + expectedNodeGroupNil: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ng, err := provider.NodeGroupForNode(tc.node) + if err != nil { + t.Fatal(err) + } + + if !tc.expectedNodeGroupNil { + if ng == nil { + t.Fatalf("expected node group from node %s", tc.node.Name) + } + + if tc.nodeGroupId != ng.Id() { + t.Fatalf("expected node group id %s, got %s", tc.nodeGroupId, ng.Id()) + } + } else { + if ng != nil { + t.Fatalf("expected node group to be nil, got %v", ng) + } + } + }) + } + +} + +func newCluster(name, namespace string, machinePools interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": "Cluster", + "apiVersion": rancherProvisioningGroup + "/" + rancherProvisioningVersion, + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + }, + "spec": map[string]interface{}{ + "rkeConfig": map[string]interface{}{ + "machinePools": machinePools, + }, + }, + "status": map[string]interface{}{}, + }, + } +}