Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabh-11 committed Dec 24, 2024
1 parent 56d80ac commit f3774f4
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 76 deletions.
138 changes: 68 additions & 70 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,8 @@ func (mcm *mcmCloudProvider) GetNodeGpuConfig(*apiv1.Node) *cloudprovider.GpuCon
return nil
}

// Ref contains a reference to the name of the machine-deployment.
type Ref struct {
Name string
Namespace string
}

// ReferenceFromProviderID extracts the Ref from providerId. It returns corresponding machine-name to providerid.
func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) {
func ReferenceFromProviderID(m *McmManager, id string) (*types.NamespacedName, error) {
machines, err := m.machineLister.Machines(m.namespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("Could not list machines due to error: %s", err)
Expand All @@ -262,15 +256,15 @@ func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) {
klog.V(4).Infof("No machine found for node ID %q", id)
return nil, nil
}
return &Ref{
return &types.NamespacedName{
Name: Name,
Namespace: Namespace,
}, nil
}

// MachineDeployment implements NodeGroup interface.
type MachineDeployment struct {
Ref
types.NamespacedName

mcmManager *McmManager

Expand All @@ -280,90 +274,90 @@ type MachineDeployment struct {
}

// MaxSize returns maximum size of the node group.
func (machinedeployment *MachineDeployment) MaxSize() int {
return machinedeployment.maxSize
func (machineDeployment *MachineDeployment) MaxSize() int {
return machineDeployment.maxSize
}

// MinSize returns minimum size of the node group.
func (machinedeployment *MachineDeployment) MinSize() int {
return machinedeployment.minSize
func (machineDeployment *MachineDeployment) MinSize() int {
return machineDeployment.minSize
}

// TargetSize returns the current TARGET size of the node group. It is possible that the
// number is different from the number of nodes registered in Kubernetes.
func (machinedeployment *MachineDeployment) TargetSize() (int, error) {
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
func (machineDeployment *MachineDeployment) TargetSize() (int, error) {
size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment)
return int(size), err
}

// Exist checks if the node group really exists on the cloud provider side. Allows to tell the
// theoretical node group from the real one.
// TODO: Implement this to check if machine-deployment really exists.
func (machinedeployment *MachineDeployment) Exist() bool {
func (machineDeployment *MachineDeployment) Exist() bool {
return true
}

// Create creates the node group on the cloud provider side.
func (machinedeployment *MachineDeployment) Create() (cloudprovider.NodeGroup, error) {
func (machineDeployment *MachineDeployment) Create() (cloudprovider.NodeGroup, error) {
return nil, cloudprovider.ErrAlreadyExist
}

// Autoprovisioned returns true if the node group is autoprovisioned.
func (machinedeployment *MachineDeployment) Autoprovisioned() bool {
func (machineDeployment *MachineDeployment) Autoprovisioned() bool {
return false
}

// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
func (machinedeployment *MachineDeployment) Delete() error {
func (machineDeployment *MachineDeployment) Delete() error {
return cloudprovider.ErrNotImplemented
}

// IncreaseSize of the Machinedeployment.
func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error {
klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", machinedeployment.Name, delta)
func (machineDeployment *MachineDeployment) IncreaseSize(delta int) error {
klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", machineDeployment.Name, delta)
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
machineDeployment.scalingMutex.Lock()
defer machineDeployment.scalingMutex.Unlock()
size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment)
if err != nil {
return err
}
targetSize := int(size) + delta
if targetSize > machinedeployment.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, machinedeployment.MaxSize())
if targetSize > machineDeployment.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, machineDeployment.MaxSize())
}
return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) {
return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(targetSize))
}, "MachineDeployment", "update", machinedeployment.Name)
return machineDeployment.mcmManager.retry(func(ctx context.Context) (bool, error) {
return machineDeployment.mcmManager.SetMachineDeploymentSize(ctx, machineDeployment, int64(targetSize))
}, "MachineDeployment", "update", machineDeployment.Name)
}

// DecreaseTargetSize decreases the target size of the node group. This function
// doesn't permit to delete any existing node and can be used only to reduce the
// request for new nodes that have not been yet fulfilled. Delta should be negative.
// It is assumed that cloud provider will not delete the existing nodes if the size
// when there is an option to just decrease the target.
func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error {
klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", machinedeployment.Name, delta)
func (machineDeployment *MachineDeployment) DecreaseTargetSize(delta int) error {
klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", machineDeployment.Name, delta)
if delta >= 0 {
return fmt.Errorf("size decrease size must be negative")
}
machinedeployment.scalingMutex.Lock()
defer machinedeployment.scalingMutex.Unlock()
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
machineDeployment.scalingMutex.Lock()
defer machineDeployment.scalingMutex.Unlock()
size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment)
if err != nil {
return err
}
decreaseAmount := int(size) + delta
if decreaseAmount < machinedeployment.minSize {
klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta))
decreaseAmount = machinedeployment.minSize
if decreaseAmount < machineDeployment.minSize {
klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machineDeployment.minSize, machineDeployment.Name, size+int64(delta))
decreaseAmount = machineDeployment.minSize
}
return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) {
return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(decreaseAmount))
}, "MachineDeployment", "update", machinedeployment.Name)
return machineDeployment.mcmManager.retry(func(ctx context.Context) (bool, error) {
return machineDeployment.mcmManager.SetMachineDeploymentSize(ctx, machineDeployment, int64(decreaseAmount))
}, "MachineDeployment", "update", machineDeployment.Name)
}

// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment
Expand All @@ -379,20 +373,20 @@ func (machineDeployment *MachineDeployment) Refresh() error {
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name)
return nil
}
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)
markedMachines := getMachinesMarkedByCAForDeletion(mcd)
machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name)
if err != nil {
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error())
return err
}
var incorrectlyMarkedMachines []*Ref
var incorrectlyMarkedMachines []*types.NamespacedName
for _, machine := range machines {
// no need to reset priority for machines already in termination or failed phase
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed {
continue
}
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) {
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace})
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &types.NamespacedName{Name: machine.Name, Namespace: machine.Namespace})
}
}
var updatedMarkedMachines []string
Expand All @@ -416,51 +410,51 @@ func (machineDeployment *MachineDeployment) Refresh() error {

// Belongs returns true if the given node belongs to the NodeGroup.
// TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list.
func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) {
ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID)
func (machineDeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) {
ref, err := ReferenceFromProviderID(machineDeployment.mcmManager, node.Spec.ProviderID)
if err != nil {
return false, err
}
targetMd, err := machinedeployment.mcmManager.GetMachineDeploymentForMachine(ref)
targetMd, err := machineDeployment.mcmManager.GetMachineDeploymentForMachine(ref)
if err != nil {
return false, err
}
if targetMd == nil {
return false, fmt.Errorf("%s doesn't belong to a known MachinDeployment", node.Name)
}
if targetMd.Id() != machinedeployment.Id() {
if targetMd.Id() != machineDeployment.Id() {
return false, nil
}
return true, nil
}

// DeleteNodes deletes the nodes from the group. It is expected that this method will not be called
// for nodes which are not part of ANY machine deployment.
func (machinedeployment *MachineDeployment) DeleteNodes(nodes []*apiv1.Node) error {
func (machineDeployment *MachineDeployment) DeleteNodes(nodes []*apiv1.Node) error {
nodeNames := getNodeNames(nodes)
klog.V(0).Infof("Received request to delete nodes:- %v", nodeNames)
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment)
size, err := machineDeployment.mcmManager.GetMachineDeploymentSize(machineDeployment)
if err != nil {
return err
}
if int(size) <= machinedeployment.MinSize() {
if int(size) <= machineDeployment.MinSize() {
return fmt.Errorf("min size reached, nodes will not be deleted")
}
machines := make([]*Ref, 0, len(nodes))
machines := make([]*types.NamespacedName, 0, len(nodes))
for _, node := range nodes {
belongs, err := machinedeployment.Belongs(node)
belongs, err := machineDeployment.Belongs(node)
if err != nil {
return err
} else if !belongs {
return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, machinedeployment.Id())
return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, machineDeployment.Id())
}
ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID)
ref, err := ReferenceFromProviderID(machineDeployment.mcmManager, node.Spec.ProviderID)
if err != nil {
return fmt.Errorf("couldn't find the machine-name from provider-id %s", node.Spec.ProviderID)
}
machines = append(machines, ref)
}
return machinedeployment.mcmManager.DeleteMachines(machines)
return machineDeployment.mcmManager.DeleteMachines(machines)
}

func getNodeNames(nodes []*apiv1.Node) interface{} {
Expand All @@ -472,20 +466,20 @@ func getNodeNames(nodes []*apiv1.Node) interface{} {
}

// Id returns machinedeployment id.
func (machinedeployment *MachineDeployment) Id() string {
return machinedeployment.Name
func (machineDeployment *MachineDeployment) Id() string {
return machineDeployment.Name
}

// Debug returns a debug string for the Asg.
func (machinedeployment *MachineDeployment) Debug() string {
return fmt.Sprintf("%s (%d:%d)", machinedeployment.Id(), machinedeployment.MinSize(), machinedeployment.MaxSize())
func (machineDeployment *MachineDeployment) Debug() string {
return fmt.Sprintf("%s (%d:%d)", machineDeployment.Id(), machineDeployment.MinSize(), machineDeployment.MaxSize())
}

// Nodes returns a list of all nodes that belong to this node group.
func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, error) {
instances, err := machinedeployment.mcmManager.GetInstancesForMachineDeployment(machinedeployment)
func (machineDeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, error) {
instances, err := machineDeployment.mcmManager.GetInstancesForMachineDeployment(machineDeployment)
if err != nil {
return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", machinedeployment.Name, err)
return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", machineDeployment.Name, err)
}
erroneousInstanceInfos := make([]string, 0, len(instances))
for _, instance := range instances {
Expand All @@ -502,9 +496,9 @@ func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, e
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
// Implementation optional.
func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
func (machineDeployment *MachineDeployment) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
options := defaults
mcdAnnotations, err := machinedeployment.mcmManager.GetMachineDeploymentAnnotations(machinedeployment.Name)
mcdAnnotations, err := machineDeployment.mcmManager.GetMachineDeploymentAnnotations(machineDeployment.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -538,25 +532,25 @@ func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroup
}

// TemplateNodeInfo returns a node template for this node group.
func (machinedeployment *MachineDeployment) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
func (machineDeployment *MachineDeployment) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {

nodeTemplate, err := machinedeployment.mcmManager.GetMachineDeploymentNodeTemplate(machinedeployment)
nodeTemplate, err := machineDeployment.mcmManager.GetMachineDeploymentNodeTemplate(machineDeployment)
if err != nil {
return nil, err
}

node, err := machinedeployment.mcmManager.buildNodeFromTemplate(machinedeployment.Name, nodeTemplate)
node, err := machineDeployment.mcmManager.buildNodeFromTemplate(machineDeployment.Name, nodeTemplate)
if err != nil {
return nil, err
}

nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(machinedeployment.Name))
nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(machineDeployment.Name))
nodeInfo.SetNode(node)
return nodeInfo, nil
}

// AtomicIncreaseSize is not implemented.
func (machinedeployment *MachineDeployment) AtomicIncreaseSize(delta int) error {
func (machineDeployment *MachineDeployment) AtomicIncreaseSize(delta int) error {
return cloudprovider.ErrNotImplemented
}

Expand All @@ -573,13 +567,17 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach
return machinedeployment, nil
}

func getMachinesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) sets.Set[string] {
return sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...)
}

func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment {
return &MachineDeployment{
mcmManager: mcmManager,
minSize: minSize,
maxSize: maxSize,
scalingMutex: sync.Mutex{},
Ref: Ref{
NamespacedName: types.NamespacedName{
Name: name,
Namespace: namespace,
},
Expand Down
11 changes: 5 additions & 6 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"flag"
"fmt"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
v1appslister "k8s.io/client-go/listers/apps/v1"
"k8s.io/utils/pointer"
"maps"
Expand Down Expand Up @@ -370,7 +369,7 @@ func CreateMcmManager(discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*M
}

// GetMachineDeploymentForMachine returns the MachineDeployment for the Machine object.
func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeployment, error) {
func (m *McmManager) GetMachineDeploymentForMachine(machine *types.NamespacedName) (*MachineDeployment, error) {
if machine.Name == "" {
// Considering the possibility when Machine has been deleted but due to cached Node object it appears here.
return nil, fmt.Errorf("Node does not Exists")
Expand Down Expand Up @@ -455,7 +454,7 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy
}

// DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment.
func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) error {
if len(targetMachineRefs) == 0 {
return nil
}
Expand All @@ -475,7 +474,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
if !isRollingUpdateFinished(md) {
return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name)
}
markedMachines := sets.New(strings.Split(md.Annotations[machinesMarkedByCAForDeletion], ",")...)
markedMachines := getMachinesMarkedByCAForDeletion(md)
// update priorities of machines to be deleted except the ones already in termination to 1
machinesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs)
if err != nil {
Expand All @@ -493,7 +492,7 @@ func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error {
}

// resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue
func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
func (m *McmManager) resetPriorityForMachines(mcRefs []*types.NamespacedName) error {
var collectiveError error
for _, mcRef := range mcRefs {
machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name)
Expand Down Expand Up @@ -524,7 +523,7 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error {
}

// prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) ([]string, error) {
func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.NamespacedName) ([]string, error) {
var expectedToTerminateMachineNodePairs = make(map[string]string)
var machinesMarkedWithPrio1 []string
for _, machineRef := range targetMachineRefs {
Expand Down

0 comments on commit f3774f4

Please sign in to comment.