Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA: DRA integration MVP #7530

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
21 changes: 10 additions & 11 deletions cluster-autoscaler/core/scaledown/eligibility/eligibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,16 @@ func getTestCases(ignoreDaemonSetsUtilization bool, suffix string, now time.Time
wantUnremovable: []*simulator.UnremovableNode{},
scaleDownUnready: true,
},
// TODO(DRA): Uncomment once PredicateSnapshot DRA logic is implemented.
//{
// desc: "Node is filtered out because of DRA issues if DRA is enabled",
// nodes: []*apiv1.Node{regularNode},
// pods: []*apiv1.Pod{smallPod},
// draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
// draEnabled: true,
// wantUnneeded: []string{},
// wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.UnexpectedError}},
// scaleDownUnready: true,
//},
{
desc: "Node is filtered out because of DRA issues if DRA is enabled",
nodes: []*apiv1.Node{regularNode},
pods: []*apiv1.Pod{smallPod},
draSnapshot: drasnapshot.NewSnapshot(nil, map[string][]*resourceapi.ResourceSlice{"regular": {regularNodeIncompleteResourceSlice}}, nil, nil),
draEnabled: true,
wantUnneeded: []string{},
wantUnremovable: []*simulator.UnremovableNode{{Node: regularNode, Reason: simulator.UnexpectedError}},
scaleDownUnready: true,
},
}

finalTestCases := []testCase{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type ClusterSnapshot interface {
// name. Returns nil if predicates pass, or a non-nil error specifying why they didn't otherwise. The error Type() can be
// checked against SchedulingInternalError to distinguish failing predicates from unexpected errors. Doesn't mutate the snapshot.
CheckPredicates(pod *apiv1.Pod, nodeName string) SchedulingError

// TODO(DRA): Move unschedulable Pods inside ClusterSnapshot (since their DRA objects are already here), refactor PodListProcessor.
}

// ClusterSnapshotStore is the "low-level" part of ClusterSnapshot, responsible for storing the snapshot state and mutating it directly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ limitations under the License.
package predicate

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
drautils "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// PredicateSnapshot implements ClusterSnapshot on top of a ClusterSnapshotStore by using
Expand Down Expand Up @@ -50,6 +55,9 @@ func (s *PredicateSnapshot) GetNodeInfo(nodeName string) (*framework.NodeInfo, e
if err != nil {
return nil, err
}
if s.draEnabled {
return s.ClusterSnapshotStore.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo)
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil), nil
}

Expand All @@ -61,26 +69,77 @@ func (s *PredicateSnapshot) ListNodeInfos() ([]*framework.NodeInfo, error) {
}
var result []*framework.NodeInfo
for _, schedNodeInfo := range schedNodeInfos {
result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil))
if s.draEnabled {
nodeInfo, err := s.ClusterSnapshotStore.DraSnapshot().WrapSchedulerNodeInfo(schedNodeInfo)
if err != nil {
return nil, err
}
result = append(result, nodeInfo)
} else {
result = append(result, framework.WrapSchedulerNodeInfo(schedNodeInfo, nil, nil))
}
}
return result, nil
}

// AddNodeInfo adds the provided internal NodeInfo to the snapshot.
func (s *PredicateSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
if s.draEnabled {
if len(nodeInfo.LocalResourceSlices) > 0 {
if err := s.ClusterSnapshotStore.DraSnapshot().AddNodeResourceSlices(nodeInfo.Node().Name, nodeInfo.LocalResourceSlices); err != nil {
return fmt.Errorf("couldn't add ResourceSlices to DRA snapshot: %v", err)
}
}

for _, podInfo := range nodeInfo.Pods() {
if len(podInfo.NeededResourceClaims) > 0 {
if err := s.modifyResourceClaimsForNewPod(podInfo); err != nil {
return err
}
if err := s.verifyScheduledPodResourceClaims(podInfo.Pod, nodeInfo.Node()); err != nil {
return err
}
}
}
}

return s.ClusterSnapshotStore.AddSchedulerNodeInfo(nodeInfo.ToScheduler())
}

// RemoveNodeInfo removes a NodeInfo matching the provided nodeName from the snapshot.
func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error {
if s.draEnabled {
nodeInfo, err := s.GetNodeInfo(nodeName)
if err != nil {
return err
}

s.ClusterSnapshotStore.DraSnapshot().RemoveNodeResourceSlices(nodeName)

for _, pod := range nodeInfo.Pods() {
s.ClusterSnapshotStore.DraSnapshot().RemovePodOwnedClaims(pod.Pod)
}
}

return s.ClusterSnapshotStore.RemoveSchedulerNodeInfo(nodeName)
}

// SchedulePod adds pod to the snapshot and schedules it to given node.
func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
node, cycleState, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName)
if schedErr != nil {
return schedErr
}

if s.draEnabled && len(pod.Spec.ResourceClaims) > 0 {
if err := s.modifyResourceClaimsForScheduledPod(pod, node, cycleState); err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
if err := s.verifyScheduledPodResourceClaims(pod, node); err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
}

if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
Expand All @@ -89,10 +148,20 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster

// SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function.
func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
node, cycleState, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
if schedErr != nil {
return "", schedErr
}

if s.draEnabled && len(pod.Spec.ResourceClaims) > 0 {
if err := s.modifyResourceClaimsForScheduledPod(pod, node, cycleState); err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
if err := s.verifyScheduledPodResourceClaims(pod, node); err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
}

if err := s.ClusterSnapshotStore.ForceAddPod(pod, node.Name); err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
Expand All @@ -101,6 +170,30 @@ func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNode

// UnschedulePod removes the given Pod from the given Node inside the snapshot.
func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, nodeName string) error {
if s.draEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As DraSnapshot is in ClusterSnapshotStore, shouldn't PredicateSnapshot delegate this logic to ClusterSnapshotStore? I don't think it applies to all methods, but here it stood up the most for me.

nodeInfo, err := s.GetNodeInfo(nodeName)
if err != nil {
return err
}

var foundPod *apiv1.Pod
for _, pod := range nodeInfo.Pods() {
if pod.Namespace == namespace && pod.Name == podName {
foundPod = pod.Pod
break
}
}
if foundPod == nil {
return fmt.Errorf("pod %s/%s not found on node %s", namespace, podName, nodeName)
}

if len(foundPod.Spec.ResourceClaims) > 0 {
if err := s.ClusterSnapshotStore.DraSnapshot().UnreservePodClaims(foundPod); err != nil {
return err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#7530

Modifying the state of DRA objects during scheduling simulations (6th commit). This is tied to the ResourceClaim utils, as a lot of them are used here. Is the logic correct? One question that comes to mind - if we remove the last pod reserving a shared claim, should it be deallocated?

Yes, it should be treated as deallocated. Otherwise you cannot move such a pod from one node to another during a simulated scale down. It would be forced to stay on the original node because that is where the claim is allocated.

The rest of the logic looks okay to me.

}
}

return s.ClusterSnapshotStore.ForceRemovePod(namespace, podName, nodeName)
}

Expand All @@ -109,3 +202,55 @@ func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clu
_, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName)
return err
}

// verifyScheduledPodResourceClaims verifies that all needed claims are tracked in the DRA snapshot, allocated, and available on the Node.
func (s *PredicateSnapshot) verifyScheduledPodResourceClaims(pod *apiv1.Pod, node *apiv1.Node) error {
claims, err := s.ClusterSnapshotStore.DraSnapshot().PodClaims(pod)
if err != nil {
return fmt.Errorf("couldn't obtain pod %s/%s claims: %v", pod.Namespace, pod.Name, err)
}
for _, claim := range claims {
if available, err := drautils.ClaimAvailableOnNode(claim, node); err != nil || !available {
return fmt.Errorf("pod %s/%s needs claim %s to schedule, but it isn't available on node %s (allocated: %v, available: %v, err: %v)", pod.Namespace, pod.Name, claim.Name, node.Name, drautils.ClaimAllocated(claim), available, err)
}
}
return nil
}

func (s *PredicateSnapshot) modifyResourceClaimsForScheduledPod(pod *apiv1.Pod, node *apiv1.Node, postFilterState *schedulerframework.CycleState) error {
// We need to run the scheduler Reserve phase to allocate the appropriate ResourceClaims in the DRA snapshot. The allocations are
// actually computed and cached in the Filter phase, and Reserve only grabs them from the cycle state. So this should be quick, but
// it needs the cycle state from after running the Filter phase.
if err := s.pluginRunner.RunReserveOnNode(pod, node.Name, postFilterState); err != nil {
return fmt.Errorf("error while trying to run Reserve node %s for pod %s/%s: %v", node.Name, pod.Namespace, pod.Name, err)
}

// The pod isn't added to the ReservedFor field of the claim during the Reserve phase (it happens later, in PreBind). We can just do it
// manually here. It shouldn't fail, it only fails if ReservedFor is at max length already, but that is checked during the Filter phase.
if err := s.ClusterSnapshotStore.DraSnapshot().ReservePodClaims(pod); err != nil {
return fmt.Errorf("couldn't add pod %s/%s reservations to claims, this shouldn't happen: %v", pod.Namespace, pod.Name, err)
}
return nil
}

func (s *PredicateSnapshot) modifyResourceClaimsForNewPod(podInfo *framework.PodInfo) error {
// PodInfo.NeededResourceClaims contains both Pod-owned and shared ResourceClaims that the Pod needs to schedule. When a new Pod is added to
// ClusterSnapshot, we need to add the Pod-owned claims to the DRA snapshot. The shared ones should already be in the DRA snapshot,
// so we don't add them. The claims should already be allocated in the provided PodInfo.
var podOwnedClaims []*resourceapi.ResourceClaim
for _, claim := range podInfo.NeededResourceClaims {
if ownerName, _ := drautils.ClaimOwningPod(claim); ownerName != "" {
podOwnedClaims = append(podOwnedClaims, claim)
}
}
if err := s.ClusterSnapshotStore.DraSnapshot().AddClaims(podOwnedClaims); err != nil {
return fmt.Errorf("couldn't add ResourceSlices for pod %s/%s to DRA snapshot: %v", podInfo.Namespace, podInfo.Name, err)
}

// The Pod-owned claims should already be reserved for the Pod after sanitization, but we need to add the reservation for the new Pod
// to the shared claims. This can fail if doing so would exceed the max reservation limit for a claim.
if err := s.ClusterSnapshotStore.DraSnapshot().ReservePodClaims(podInfo.Pod); err != nil {
return fmt.Errorf("couldn't add pod %s/%s reservations to claims, this shouldn't happen: %v", podInfo.Namespace, podInfo.Name, err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/stretchr/testify/assert"
)

// TODO(DRA): Add DRA-specific test cases.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh please do.


var snapshots = map[string]func() (clustersnapshot.ClusterSnapshot, error){
"basic": func() (clustersnapshot.ClusterSnapshot, error) {
fwHandle, err := framework.NewTestFrameworkHandle()
Expand Down