Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore scheduler processing #6235

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,6 @@ type AutoscalingOptions struct {
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
// BypassedSchedulers are used to specify which schedulers to bypass their processing
BypassedSchedulers map[string]bool
}
39 changes: 39 additions & 0 deletions cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
)

type clearTpuRequests struct {
}

// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods
func NewClearTPURequestsPodListProcessor() *clearTpuRequests {
return &clearTpuRequests{}
}

// Process removes pods' tpu requests
func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return tpu.ClearTPURequests(pods), nil
}

func (p *clearTpuRequests) CleanUp() {
}
68 changes: 68 additions & 0 deletions cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podlistprocessor

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
)

type filterOutExpendable struct {
}

// NewFilterOutExpendablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {
return &filterOutExpendable{}
}

// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
nodes, err := context.AllNodeLister().List()
if err != nil {
return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err)
}
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff

unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff)
if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
klog.Warningf("Failed to add preempting pods to snapshot: %v", err)
return nil, err
}

return unschedulablePods, nil
}

// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
return nil
}

func (p *filterOutExpendable) CleanUp() {
}
179 changes: 179 additions & 0 deletions cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podlistprocessor

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
)

func TestFilterOutExpendable(t *testing.T) {
testCases := []struct {
name string
pods []*apiv1.Pod
wantPods []*apiv1.Pod
wantPodsInSnapshot []*apiv1.Pod
priorityCutoff int
nodes []*apiv1.Node
}{
{
name: "no pods",
},
{
name: "single non-expendable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1),
},
},
{
name: "non-expendable pods with priority >= to cutoff priority",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
priorityCutoff: 2,
},
{
name: "single expednable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, priority(2)),
},
priorityCutoff: 3,
},
{
name: "single waiting-for-low-priority-preemption pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
},
{
name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p3", 1000, 1, priority(1)),
test.BuildTestPod("p4", 1000, 1),
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
priorityCutoff: 2,
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p4", 1000, 1),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)

pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
AutoscalingOptions: config.AutoscalingOptions{
ExpendablePodsPriorityCutoff: tc.priorityCutoff,
},
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: newMockListerRegistry(tc.nodes),
},
}, tc.pods)

assert.NoError(t, err)
assert.ElementsMatch(t, tc.wantPods, pods)

var podsInSnapshot []*apiv1.Pod
nodeInfoLister := snapshot.NodeInfos()
// Get pods in snapshot
for _, n := range tc.nodes {
nodeInfo, err := nodeInfoLister.Get(n.Name)
assert.NoError(t, err)
assert.NotEqual(t, nodeInfo.Pods, nil)
for _, podInfo := range nodeInfo.Pods {
podsInSnapshot = append(podsInSnapshot, podInfo.Pod)
}
}

assert.ElementsMatch(t, tc.wantPodsInSnapshot, podsInSnapshot)
})
}
}

func priority(priority int32) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Spec.Priority = &priority
}
}
func nominatedNodeName(nodeName string) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Status.NominatedNodeName = nodeName
}
}

type mockListerRegistry struct {
kube_util.ListerRegistry
nodes []*apiv1.Node
}

func newMockListerRegistry(nodes []*apiv1.Node) *mockListerRegistry {
return &mockListerRegistry{
nodes: nodes,
}
}

func (mlr mockListerRegistry) AllNodeLister() kube_util.NodeLister {
return &mockNodeLister{nodes: mlr.nodes}
}

type mockNodeLister struct {
nodes []*apiv1.Node
}

func (mnl *mockNodeLister) List() ([]*apiv1.Node, error) {
return mnl.nodes, nil
}
func (mnl *mockNodeLister) Get(name string) (*apiv1.Node, error) {
return nil, fmt.Errorf("Unsupported operation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type defaultPodListProcessor struct {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutDaemonSetPodListProcessor(),
Expand Down
37 changes: 16 additions & 21 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
"k8s.io/utils/integer"

klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -310,6 +309,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
schedulerUnprocessed := make([]*apiv1.Pod, 0, 0)
isSchedulerProcessingIgnored := len(a.BypassedSchedulers) > 0
if isSchedulerProcessingIgnored {
schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.BypassedSchedulers)
}

// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
Expand Down Expand Up @@ -451,25 +455,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

metrics.UpdateUnschedulablePodsCount(len(unschedulablePods))

unschedulablePods = tpu.ClearTPURequests(unschedulablePods)
atwamahmoud marked this conversation as resolved.
Show resolved Hide resolved

// todo: move split and append below to separate PodListProcessor
// Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run.
// Such pods don't require scale up but should be considered during scale down.
unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(unschedulablePods, allNodes, a.ExpendablePodsPriorityCutoff)

// modify the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
for _, p := range unschedulableWaitingForLowerPriorityPreemption {
if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
// SchedulerUnprocessed might be zero here if it was disabled
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
if isSchedulerProcessingIgnored {
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
}

// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
Expand Down Expand Up @@ -515,7 +506,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}

unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)

if err != nil {
klog.Warningf("Failed to process unschedulable pods: %v", err)
}

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
Expand Down Expand Up @@ -553,7 +548,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
} else if !isSchedulerProcessingIgnored && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
Expand Down
Loading
Loading