Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA: DRA integration MVP #7530

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
16 changes: 12 additions & 4 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,11 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
upcomingNodeInfosPerNg, err := getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups)
if err != nil {
return err
}
for nodeGroupName, upcomingNodeInfos := range upcomingNodeInfosPerNg {
nodeGroup := nodeGroups[nodeGroupName]
if nodeGroup == nil {
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
Expand Down Expand Up @@ -1008,7 +1012,7 @@ func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
return found && oldest.Add(unschedulablePodWithGpuTimeBuffer).After(currentTime)
}

func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*framework.NodeInfo) map[string][]*framework.NodeInfo {
func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*framework.NodeInfo) (map[string][]*framework.NodeInfo, error) {
upcomingNodes := make(map[string][]*framework.NodeInfo)
for nodeGroup, numberOfNodes := range upcomingCounts {
nodeTemplate, found := nodeInfos[nodeGroup]
Expand All @@ -1027,11 +1031,15 @@ func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*f
// Ensure new nodes have different names because nodeName
// will be used as a map key. Also deep copy pods (daemonsets &
// any pods added by cloud provider on template).
nodes = append(nodes, simulator.NodeInfoSanitizedDeepCopy(nodeTemplate, fmt.Sprintf("upcoming-%d", i)))
freshNodeInfo, err := simulator.NodeInfoSanitizedDeepCopy(nodeTemplate, fmt.Sprintf("upcoming-%d", i))
if err != nil {
return nil, err
}
nodes = append(nodes, freshNodeInfo)
}
upcomingNodes[nodeGroup] = nodes
}
return upcomingNodes
return upcomingNodes, nil
}

func calculateCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
Expand Down
5 changes: 4 additions & 1 deletion cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
estimationState *estimationState,
template *framework.NodeInfo,
) error {
newNodeInfo := core_utils.NodeInfoSanitizedDeepCopy(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
newNodeInfo, err := core_utils.NodeInfoSanitizedDeepCopy(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
if err != nil {
return err
}
if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil {
return err
}
Expand Down
166 changes: 166 additions & 0 deletions cluster-autoscaler/simulator/dynamicresources/utils/sanitize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"fmt"

v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
)

// SanitizeNodeResourceSlices can be used to duplicate node-local ResourceSlices attached to a Node, when duplicating the Node. The new slices
// are pointed to newNodeName, and nameSuffix is appended to all pool names (pool names have to be unique within a driver, so we can't
// leave them as-is when duplicating). Returns a map of all pool names (without the suffix) that can be used with SanitizePodResourceClaims().
// Returns an error if any of the slices isn't node-local.
func SanitizeNodeResourceSlices(nodeLocalSlices []*resourceapi.ResourceSlice, newNodeName, nameSuffix string) (newSlices []*resourceapi.ResourceSlice, oldPoolNames map[string]bool, err error) {
oldPoolNames = map[string]bool{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for _, slice := range nodeLocalSlices {
if slice.Spec.NodeName == "" {
return nil, nil, fmt.Errorf("can't sanitize slice %s because it isn't node-local", slice.Name)
}
sliceCopy := slice.DeepCopy()
sliceCopy.UID = uuid.NewUUID()
sliceCopy.Name = fmt.Sprintf("%s-%s", slice.Name, nameSuffix)
sliceCopy.Spec.Pool.Name = fmt.Sprintf("%s-%s", slice.Spec.Pool.Name, nameSuffix)
sliceCopy.Spec.NodeName = newNodeName

oldPoolNames[slice.Spec.Pool.Name] = true
newSlices = append(newSlices, sliceCopy)
}
return newSlices, oldPoolNames, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function looks okay to me. Normally one cannot just append something to a name to create a new name because of length limits, but here this isn't an issue because it's all in-memory.


// SanitizePodResourceClaims can be used to duplicate ResourceClaims needed by a Pod, when duplicating the Pod.
// - ResourceClaims owned by oldOwner are duplicated and sanitized, to be owned by a duplicate pod - newOwner.
// - ResourceClaims not owned by oldOwner are returned unchanged in the result. They are shared claims not bound to the
// lifecycle of the duplicated pod, so they shouldn't be duplicated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about updating the ReservedFor so that the new pod is listed if (and only if) the old one was? Or is that something that will be covered elsewhere?

// - Works for unallocated claims (e.g. if the pod being duplicated isn't scheduled).
// - Works for claims allocated on a single node that is being duplicated (e.g. if the pod being duplicated is a scheduled DS pod).
// The name of the old node and its pools have to be provided in this case. Such allocated claims are pointed to newNodeName,
// and nameSuffix is appended to all pool names in allocation results, to match the pool names of the new, duplicated node.
// - Returns an error if any of the allocated claims is not node-local on oldNodeName. Such allocations can't be sanitized, the only
// option is to clear the allocation and run scheduler filters&reserve to get a new allocation when duplicating a pod.
func SanitizePodResourceClaims(newOwner, oldOwner *v1.Pod, claims []*resourceapi.ResourceClaim, nameSuffix, oldNodeName, newNodeName string, oldNodePoolNames map[string]bool) ([]*resourceapi.ResourceClaim, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newNodeName before oldNodeName for consistency with newOwner oldOwner

var result []*resourceapi.ResourceClaim
for _, claim := range claims {
if ownerName, ownerUid := ClaimOwningPod(claim); ownerName != oldOwner.Name || ownerUid != oldOwner.UID {
// Only claims owned by the pod are bound to its lifecycle. The lifecycle of other claims is independent, and they're most likely shared
// by multiple pods. They shouldn't be sanitized or duplicated - just add unchanged to the result.
result = append(result, claim)
continue
}

claimCopy := claim.DeepCopy()
claimCopy.UID = uuid.NewUUID()
claimCopy.Name = fmt.Sprintf("%s-%s", claim.Name, nameSuffix)
claimCopy.OwnerReferences = []metav1.OwnerReference{PodClaimOwnerReference(newOwner)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if claim has multiple owners, one of which is the oldOwner? In this case we would override other owners, although I am not sure what the behavior should be.


if claimCopy.Status.Allocation == nil {
// Unallocated claim - just clear the consumer reservations to be sure, and we're done.
claimCopy.Status.ReservedFor = []resourceapi.ResourceClaimConsumerReference{}
result = append(result, claimCopy)
continue
}

if singleNodeSelector := nodeSelectorSingleNode(claimCopy.Status.Allocation.NodeSelector); singleNodeSelector == "" || singleNodeSelector != oldNodeName {
// This claim most likely has an allocation available on more than a single Node. We can't sanitize it, and it shouldn't be duplicated as we'd
// have multiple claims allocating the same devices.
return nil, fmt.Errorf("claim %s/%s is allocated, but not node-local on %s - can't be sanitized", claim.Namespace, claim.Name, oldNodeName)
}

var sanitizedAllocations []resourceapi.DeviceRequestAllocationResult
for _, devAlloc := range claim.Status.Allocation.Devices.Results {
// It's possible to have both node-local and global allocations in a single resource claim. Make sure that all allocations were node-local on the old node.
if !oldNodePoolNames[devAlloc.Pool] {
return nil, fmt.Errorf("claim %s/%s has an allocation %s, from a pool that isn't node-local on %s - can't be sanitized", claim.Namespace, claim.Name, devAlloc.Request, oldNodeName)
}
devAlloc.Pool = fmt.Sprintf("%s-%s", devAlloc.Pool, nameSuffix)
sanitizedAllocations = append(sanitizedAllocations, devAlloc)
}

claimCopy.Status.Allocation.Devices.Results = sanitizedAllocations
claimCopy.Status.Allocation.NodeSelector = createNodeSelectorSingleNode(newNodeName)
claimCopy.Status.ReservedFor = []resourceapi.ResourceClaimConsumerReference{PodClaimConsumerReference(newOwner)}

result = append(result, claimCopy)
}

return result, nil
}

// SanitizeResourceClaimRefs returns a duplicate of the provided pod, with nameSuffix appended to all pod-owned ResourceClaim names
// referenced in the Pod object. Names of ResourceClaims not owned by the pod are not changed.
func SanitizeResourceClaimRefs(pod *v1.Pod, nameSuffix string) *v1.Pod {
podCopy := pod.DeepCopy()

var sanitizedClaimStatuses []v1.PodResourceClaimStatus
for _, claimStatus := range podCopy.Status.ResourceClaimStatuses {
if claimStatus.ResourceClaimName != nil {
newClaimName := fmt.Sprintf("%s-%s", *claimStatus.ResourceClaimName, nameSuffix)
claimStatus.ResourceClaimName = &newClaimName
}
sanitizedClaimStatuses = append(sanitizedClaimStatuses, claimStatus)
}
podCopy.Status.ResourceClaimStatuses = sanitizedClaimStatuses

return podCopy
}

func nodeSelectorSingleNode(selector *v1.NodeSelector) string {
if selector == nil {
// Nil selector means all nodes, so not a single node.
return ""
}
if len(selector.NodeSelectorTerms) != 1 {
// Selector for a single node doesn't need multiple ORed terms.
return ""
}
term := selector.NodeSelectorTerms[0]
if len(term.MatchExpressions) > 0 {
// Selector for a single node doesn't need expression matching.
return ""
}
if len(term.MatchFields) != 1 {
// Selector for a single node should have just 1 matchFields entry for its nodeName.
return ""
}
matchField := term.MatchFields[0]
if matchField.Key != "metadata.name" || matchField.Operator != v1.NodeSelectorOpIn || len(matchField.Values) != 1 {
// Selector for a single node should have operator In with 1 value - the node name.
return ""
}
return matchField.Values[0]
}

func createNodeSelectorSingleNode(nodeName string) *v1.NodeSelector {
return &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchFields: []v1.NodeSelectorRequirement{
{
Key: "metadata.name",
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
},
},
},
},
}
}
Loading