Skip to content

Commit

Permalink
Merge pull request kubernetes#4676 from mboersma/cluster-api-machinep…
Browse files Browse the repository at this point in the history
…ools

[clusterapi] Add support for MachinePools
  • Loading branch information
k8s-ci-robot authored Apr 12, 2023
2 parents 973426d + 17d2bd9 commit 1a37590
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 25 deletions.
15 changes: 9 additions & 6 deletions cluster-autoscaler/cloudprovider/clusterapi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ cluster-autoscaler --cloud-provider=clusterapi \

To enable the automatic scaling of components in your cluster-api managed
cloud there are a few annotations you need to provide. These annotations
must be applied to either [MachineSet](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/machine-set.html)
or [MachineDeployment](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/machine-deployment.html)
must be applied to either [MachineSet](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/machine-set.html), [MachineDeployment](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/machine-deployment.html), or [MachinePool](https://cluster-api.sigs.k8s.io/developer/architecture/controllers/machine-pool.html)
resources depending on the type of cluster-api mechanism that you are using.

There are two annotations that control how a cluster resource should be scaled:
Expand All @@ -185,9 +184,13 @@ There are two annotations that control how a cluster resource should be scaled:
the maximum number of nodes for the associated resource group. The autoscaler
will not scale the group above this number.

The autoscaler will monitor any `MachineSet` or `MachineDeployment` containing
The autoscaler will monitor any `MachineSet`, `MachineDeployment`, or `MachinePool` containing
both of these annotations.

> Note: `MachinePool` support in cluster-autoscaler requires a provider implementation
> that supports the new "MachinePool Machines" feature. MachinePools in Cluster API are
> considered an [experimental feature](https://cluster-api.sigs.k8s.io/tasks/experimental-features/experimental-features.html#active-experimental-features) and are not enabled by default.
### Scale from zero support

The Cluster API community has defined an opt-in method for infrastructure
Expand Down Expand Up @@ -256,7 +259,7 @@ rules:
#### Pre-defined labels and taints on nodes scaled from zero

To provide labels or taint information for scale from zero, the optional
capacity annotations may be supplied as a comma separated list, as
capacity annotations may be supplied as a comma separated list, as
demonstrated in the example below:

```yaml
Expand Down Expand Up @@ -347,12 +350,12 @@ spec:
class: "quick-start"
version: v1.24.0
controlPlane:
replicas: 1
replicas: 1
workers:
machineDeployments:
- class: default-worker
name: linux
## replicas field is not set.
## replicas field is not set.
## replicas: 1
```

Expand Down
151 changes: 138 additions & 13 deletions cluster-autoscaler/cloudprovider/clusterapi/clusterapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,31 @@ import (
)

const (
machineProviderIDIndex = "machineProviderIDIndex"
nodeProviderIDIndex = "nodeProviderIDIndex"
defaultCAPIGroup = "cluster.x-k8s.io"
machineProviderIDIndex = "machineProviderIDIndex"
machinePoolProviderIDIndex = "machinePoolProviderIDIndex"
nodeProviderIDIndex = "nodeProviderIDIndex"
defaultCAPIGroup = "cluster.x-k8s.io"
// CAPIGroupEnvVar contains the environment variable name which allows overriding defaultCAPIGroup.
CAPIGroupEnvVar = "CAPI_GROUP"
// CAPIVersionEnvVar contains the environment variable name which allows overriding the Cluster API group version.
CAPIVersionEnvVar = "CAPI_VERSION"
resourceNameMachine = "machines"
resourceNameMachineSet = "machinesets"
resourceNameMachineDeployment = "machinedeployments"
resourceNameMachinePool = "machinepools"
failedMachinePrefix = "failed-machine-"
pendingMachinePrefix = "pending-machine-"
machineTemplateKind = "MachineTemplate"
machineDeploymentKind = "MachineDeployment"
machineSetKind = "MachineSet"
machinePoolKind = "MachinePool"
machineKind = "Machine"
autoDiscovererTypeClusterAPI = "clusterapi"
autoDiscovererClusterNameKey = "clusterName"
autoDiscovererNamespaceKey = "namespace"
)

// machineController watches for Nodes, Machines, MachineSets and
// machineController watches for Nodes, Machines, MachinePools, MachineSets, and
// MachineDeployments as they are added, updated and deleted on the
// cluster. Additionally, it adds indices to the node informers to
// satisfy lookup by node.Spec.ProviderID.
Expand All @@ -73,11 +76,14 @@ type machineController struct {
machineDeploymentInformer informers.GenericInformer
machineInformer informers.GenericInformer
machineSetInformer informers.GenericInformer
machinePoolInformer informers.GenericInformer
nodeInformer cache.SharedIndexInformer
managementClient dynamic.Interface
managementScaleClient scale.ScalesGetter
machineSetResource schema.GroupVersionResource
machineResource schema.GroupVersionResource
machinePoolResource schema.GroupVersionResource
machinePoolsAvailable bool
machineDeploymentResource schema.GroupVersionResource
machineDeploymentsAvailable bool
accessLock sync.Mutex
Expand All @@ -88,6 +94,29 @@ type machineController struct {
stopChannel <-chan struct{}
}

func indexMachinePoolByProviderID(obj interface{}) ([]string, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, nil
}

providerIDList, found, err := unstructured.NestedStringSlice(u.UnstructuredContent(), "spec", "providerIDList")
if err != nil || !found {
return nil, nil
}
if len(providerIDList) == 0 {
return nil, nil
}

normalizedProviderIDs := make([]string, len(providerIDList))

for i, s := range providerIDList {
normalizedProviderIDs[i] = string(normalizedProviderString(s))
}

return normalizedProviderIDs, nil
}

func indexMachineByProviderID(obj interface{}) ([]string, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand Down Expand Up @@ -188,6 +217,9 @@ func (c *machineController) run() error {
if c.machineDeploymentsAvailable {
syncFuncs = append(syncFuncs, c.machineDeploymentInformer.Informer().HasSynced)
}
if c.machinePoolsAvailable {
syncFuncs = append(syncFuncs, c.machinePoolInformer.Informer().HasSynced)
}

klog.V(4).Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(c.stopChannel, syncFuncs...) {
Expand All @@ -198,31 +230,39 @@ func (c *machineController) run() error {
}

func (c *machineController) findScalableResourceByProviderID(providerID normalizedProviderID) (*unstructured.Unstructured, error) {
// Check for a MachinePool first to simplify the logic afterward.
if c.machinePoolsAvailable {
machinePool, err := c.findMachinePoolByProviderID(providerID)
if err != nil {
return nil, err
}
if machinePool != nil {
return machinePool, nil
}
}

// Check for a MachineSet.
machine, err := c.findMachineByProviderID(providerID)
if err != nil {
return nil, err
}

if machine == nil {
return nil, nil
}

machineSet, err := c.findMachineOwner(machine)
if err != nil {
return nil, err
}

if machineSet == nil {
return nil, nil
}

// Check for a MachineDeployment.
if c.machineDeploymentsAvailable {
machineDeployment, err := c.findMachineSetOwner(machineSet)
if err != nil {
return nil, err
}

// If a matching machineDeployment was found return it
if machineDeployment != nil {
return machineDeployment, nil
}
Expand All @@ -231,6 +271,28 @@ func (c *machineController) findScalableResourceByProviderID(providerID normaliz
return machineSet, nil
}

// findMachinePoolByProviderID finds machine pools matching providerID. A
// DeepCopy() of the object is returned on success.
func (c *machineController) findMachinePoolByProviderID(providerID normalizedProviderID) (*unstructured.Unstructured, error) {
objs, err := c.machinePoolInformer.Informer().GetIndexer().ByIndex(machinePoolProviderIDIndex, string(providerID))
if err != nil {
return nil, err
}

switch n := len(objs); {
case n > 1:
return nil, fmt.Errorf("internal error; expected len==1, got %v", n)
case n == 1:
u, ok := objs[0].(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("internal error; unexpected type %T", objs[0])
}
return u.DeepCopy(), nil
default:
return nil, nil
}
}

// findMachineByProviderID finds machine matching providerID. A
// DeepCopy() of the object is returned on success.
func (c *machineController) findMachineByProviderID(providerID normalizedProviderID) (*unstructured.Unstructured, error) {
Expand Down Expand Up @@ -337,7 +399,7 @@ func getCAPIVersion() string {
}

// newMachineController constructs a controller that watches Nodes,
// Machines and MachineSet as they are added, updated and deleted on
// Machines, MachinePools, MachineDeployments, and MachineSets as they are added, updated, and deleted on
// the cluster.
func newMachineController(
managementClient dynamic.Interface,
Expand Down Expand Up @@ -395,6 +457,32 @@ func newMachineController(
return nil, fmt.Errorf("failed to add event handler for resource %q: %v", resourceNameMachineSet, err)
}

var gvrMachinePool schema.GroupVersionResource
var machinePoolInformer informers.GenericInformer

machinePoolsAvailable, err := groupVersionHasResource(managementDiscoveryClient,
fmt.Sprintf("%s/%s", CAPIGroup, CAPIVersion), resourceNameMachinePool)
if err != nil {
return nil, fmt.Errorf("failed to validate if resource %q is available for group %q: %v",
resourceNameMachinePool, fmt.Sprintf("%s/%s", CAPIGroup, CAPIVersion), err)
}

if machinePoolsAvailable {
gvrMachinePool = schema.GroupVersionResource{
Group: CAPIGroup,
Version: CAPIVersion,
Resource: resourceNameMachinePool,
}
machinePoolInformer = managementInformerFactory.ForResource(gvrMachinePool)
machinePoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})

if err := machinePoolInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
machinePoolProviderIDIndex: indexMachinePoolByProviderID,
}); err != nil {
return nil, fmt.Errorf("cannot add machine pool indexer: %v", err)
}
}

gvrMachine := schema.GroupVersionResource{
Group: CAPIGroup,
Version: CAPIVersion,
Expand Down Expand Up @@ -429,10 +517,13 @@ func newMachineController(
machineDeploymentInformer: machineDeploymentInformer,
machineInformer: machineInformer,
machineSetInformer: machineSetInformer,
machinePoolInformer: machinePoolInformer,
nodeInformer: nodeInformer,
managementClient: managementClient,
managementScaleClient: managementScaleClient,
machineSetResource: gvrMachineSet,
machinePoolResource: gvrMachinePool,
machinePoolsAvailable: machinePoolsAvailable,
machineResource: gvrMachine,
machineDeploymentResource: gvrMachineDeployment,
machineDeploymentsAvailable: machineDeploymentAvailable,
Expand Down Expand Up @@ -475,12 +566,37 @@ func getAPIGroupPreferredVersion(client discovery.DiscoveryInterface, APIGroup s
}

func (c *machineController) scalableResourceProviderIDs(scalableResource *unstructured.Unstructured) ([]string, error) {
if scalableResource.GetKind() == machinePoolKind {
return c.findMachinePoolProviderIDs(scalableResource)
}
return c.findScalableResourceProviderIDs(scalableResource)
}

func (c *machineController) findMachinePoolProviderIDs(scalableResource *unstructured.Unstructured) ([]string, error) {
var providerIDs []string

providerIDList, found, err := unstructured.NestedStringSlice(scalableResource.UnstructuredContent(), "spec", "providerIDList")
if err != nil {
return nil, err
}
if found {
providerIDs = providerIDList
} else {
klog.Warningf("Machine Pool %q has no providerIDList", scalableResource.GetName())
}

klog.V(4).Infof("nodegroup %s has %d nodes: %v", scalableResource.GetName(), len(providerIDs), providerIDs)
return providerIDs, nil
}

func (c *machineController) findScalableResourceProviderIDs(scalableResource *unstructured.Unstructured) ([]string, error) {
var providerIDs []string

machines, err := c.listMachinesForScalableResource(scalableResource)
if err != nil {
return nil, fmt.Errorf("error listing machines: %v", err)
}

var providerIDs []string
for _, machine := range machines {
providerID, found, err := unstructured.NestedString(machine.UnstructuredContent(), "spec", "providerID")
if err != nil {
Expand Down Expand Up @@ -550,8 +666,7 @@ func (c *machineController) scalableResourceProviderIDs(scalableResource *unstru
}
}

klog.V(4).Infof("nodegroup %s has nodes %v", scalableResource.GetName(), providerIDs)

klog.V(4).Infof("nodegroup %s has %d nodes: %v", scalableResource.GetName(), len(providerIDs), providerIDs)
return providerIDs, nil
}

Expand Down Expand Up @@ -666,6 +781,16 @@ func (c *machineController) listScalableResources() ([]*unstructured.Unstructure

scalableResources = append(scalableResources, machineDeployments...)
}

if c.machinePoolsAvailable {
machinePools, err := c.listResources(c.machinePoolInformer.Lister())
if err != nil {
return nil, err
}

scalableResources = append(scalableResources, machinePools...)
}

return scalableResources, nil
}

Expand Down
Loading

0 comments on commit 1a37590

Please sign in to comment.