From a1ab7b9e209596217fa2adad29935125b6cb5cde Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Wed, 18 Oct 2023 09:19:24 +0000 Subject: [PATCH 1/6] Add new pod list processors for clearing TPU requests & filtering out expendable pods Treat non-processed pods yet as unschedulable --- .../config/autoscaling_options.go | 8 + .../podlistprocessor/filter_out_expendable.go | 66 +++++++ .../filter_out_expendable_test.go | 179 ++++++++++++++++++ .../podlistprocessor/pod_list_processor.go | 1 + cluster-autoscaler/core/static_autoscaler.go | 35 +--- .../core/static_autoscaler_test.go | 50 +++-- cluster-autoscaler/main.go | 4 + cluster-autoscaler/metrics/metrics.go | 10 +- .../utils/kubernetes/listers.go | 17 ++ 9 files changed, 316 insertions(+), 54 deletions(-) create mode 100644 cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go create mode 100644 cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 05e6cc2fb6e8..ce38224c557a 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -277,4 +277,12 @@ type AutoscalingOptions struct { // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint // based on the latency between the CA and the api-server DynamicNodeDeleteDelayAfterTaintEnabled bool + // UnschedulablePodTimeBuffer controls when scale-ups happen so that + // the oldest unschedulable pod is older than UnschedulablePodTimeBuffer + UnschedulablePodTimeBuffer time.Duration + // UnschedulablePodWithGpuTimeBuffer specifies how old should the oldest unschedulable pod with GPU be before starting scale up. + // The idea is that nodes with GPU are very expensive and we're ready to sacrifice + // a bit more latency to wait for more pods and make a more informed scale-up decision. + UnschedulablePodWithGpuTimeBuffer time.Duration + // unschedulablePodWithGpuTimeBuffer = 30 * time.Second } diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go new file mode 100644 index 000000000000..ba50691abe50 --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" + caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + klog "k8s.io/klog/v2" +) + +type filterOutExpandable struct { +} + +// NewFilterOutExpandablePodListProcessor creates a PodListProcessor filtering out expendable pods +func NewFilterOutExpandablePodListProcessor() *filterOutExpandable { + return &filterOutExpandable{} +} + +// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot +func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + klog.V(4).Infof("Filtering out expandable pods") + nodes, err := context.AllNodeLister().List() + if err != nil { + return nil, err + } + expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff + + unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff) + if err = p.addPreemptiblePodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil { + return nil, err + } + + return unschedulablePods, nil +} + +// addPreemptiblePodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption. +// this is not strictly correct as we are not simulating preemption itself but it matches +// CA logic from before migration to scheduler framework. So let's keep it for now +func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { + for _, p := range pods { + if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { + klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) + } + } + return nil +} + +func (p *filterOutExpandable) CleanUp() { +} diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go new file mode 100644 index 000000000000..35de9db2b20f --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestFilterOutExpendable(t *testing.T) { + testCases := []struct { + name string + pods []*apiv1.Pod + wantPods []*apiv1.Pod + wantPodsInSnapshot []*apiv1.Pod + priorityCutoff int + nodes []*apiv1.Node + }{ + { + name: "no pods", + }, + { + name: "single non-expendable pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1), + }, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1), + }, + }, + { + name: "non-expendable pods with priority >= to cutoff priority", + pods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)), + test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)), + }, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)), + test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)), + }, + priorityCutoff: 2, + }, + { + name: "single expednable pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, getPrioritySetter(2)), + }, + priorityCutoff: 3, + }, + { + name: "single waiting-for-low-priority-preemption pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")), + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("node-1", 2400, 2400), + }, + wantPodsInSnapshot: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")), + }, + }, + { + name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods", + pods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)), + test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)), + test.BuildTestPod("p3", 1000, 1, getPrioritySetter(1)), + test.BuildTestPod("p4", 1000, 1), + test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")), + }, + priorityCutoff: 2, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)), + test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)), + test.BuildTestPod("p4", 1000, 1), + }, + wantPodsInSnapshot: []*apiv1.Pod{ + test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")), + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("node-1", 2400, 2400), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + processor := NewFilterOutExpandablePodListProcessor() + snapshot := clustersnapshot.NewBasicClusterSnapshot() + snapshot.AddNodes(tc.nodes) + + pods, err := processor.Process(&context.AutoscalingContext{ + ClusterSnapshot: snapshot, + AutoscalingOptions: config.AutoscalingOptions{ + ExpendablePodsPriorityCutoff: tc.priorityCutoff, + }, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: newMockListerRegistry(tc.nodes), + }, + }, tc.pods) + + assert.NoError(t, err) + assert.ElementsMatch(t, tc.wantPods, pods) + + var podsInSnapshot []*apiv1.Pod + nodeInfoLister := snapshot.NodeInfos() + // Get pods in snapshot + for _, n := range tc.nodes { + nodeInfo, err := nodeInfoLister.Get(n.Name) + assert.NoError(t, err) + assert.NotEqual(t, nodeInfo.Pods, nil) + for _, podInfo := range nodeInfo.Pods { + podsInSnapshot = append(podsInSnapshot, podInfo.Pod) + } + } + + assert.ElementsMatch(t, tc.wantPodsInSnapshot, podsInSnapshot) + }) + } +} + +func getPrioritySetter(priority int32) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Spec.Priority = &priority + } +} +func getNominatedNodeNameSetter(nodeName string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Status.NominatedNodeName = nodeName + } +} + +type mockListerRegistry struct { + kube_util.ListerRegistry + nodes []*apiv1.Node +} + +func newMockListerRegistry(nodes []*apiv1.Node) *mockListerRegistry { + return &mockListerRegistry{ + nodes: nodes, + } +} + +func (mlr mockListerRegistry) AllNodeLister() kube_util.NodeLister { + return &mockNodeLister{nodes: mlr.nodes} +} + +type mockNodeLister struct { + nodes []*apiv1.Node +} + +func (mnl *mockNodeLister) List() ([]*apiv1.Node, error) { + return mnl.nodes, nil +} +func (mnl *mockNodeLister) Get(name string) (*apiv1.Node, error) { + return nil, fmt.Errorf("Unsupported operation") +} diff --git a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go index d357d00bde18..2aea528397cc 100644 --- a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go +++ b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go @@ -32,6 +32,7 @@ type defaultPodListProcessor struct { func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor { return &defaultPodListProcessor{ processors: []pods.PodListProcessor{ + NewFilterOutExpandablePodListProcessor(), NewCurrentlyDrainedNodesPodListProcessor(), NewFilterOutSchedulablePodListProcessor(predicateChecker), NewFilterOutDaemonSetPodListProcessor(), diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index ac3103973931..9a4a9dffaa77 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -57,20 +57,12 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" "k8s.io/utils/integer" klog "k8s.io/klog/v2" ) const ( - // How old the oldest unschedulable pod should be before starting scale up. - unschedulablePodTimeBuffer = 2 * time.Second - // How old the oldest unschedulable pod with GPU should be before starting scale up. - // The idea is that nodes with GPU are very expensive and we're ready to sacrifice - // a bit more latency to wait for more pods and make a more informed scale-up decision. - unschedulablePodWithGpuTimeBuffer = 30 * time.Second - // NodeUpcomingAnnotation is an annotation CA adds to nodes which are upcoming. NodeUpcomingAnnotation = "cluster-autoscaler.k8s.io/upcoming-node" @@ -309,7 +301,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.Errorf("Failed to list pods: %v", err) return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } - originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) + originalScheduledPods, unschedulablePods, unknownPods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.UnknownPods(pods) // Update cluster resource usage metrics coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime) @@ -451,24 +443,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) - metrics.UpdateUnschedulablePodsCount(len(unschedulablePods)) - - unschedulablePods = tpu.ClearTPURequests(unschedulablePods) - - // todo: move split and append below to separate PodListProcessor - // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. - // Such pods don't require scale up but should be considered during scale down. - unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(unschedulablePods, allNodes, a.ExpendablePodsPriorityCutoff) - - // modify the snapshot simulating scheduling of pods waiting for preemption. - // this is not strictly correct as we are not simulating preemption itself but it matches - // CA logic from before migration to scheduler framework. So let's keep it for now - for _, p := range unschedulableWaitingForLowerPriorityPreemption { - if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { - klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - } + metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(unknownPods)) + // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods + unschedulablePods = append(unschedulablePods, unknownPods...) // Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet. upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes() @@ -553,7 +530,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable klog.V(1).Info("Max total nodes in cluster reached") - } else if allPodsAreNew(unschedulablePodsToHelp, currentTime) { + } else if allPodsAreNew(unschedulablePodsToHelp, currentTime, a.AutoscalingOptions.UnschedulablePodTimeBuffer, a.AutoscalingOptions.UnschedulablePodWithGpuTimeBuffer) { // The assumption here is that these pods have been created very recently and probably there // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. @@ -986,7 +963,7 @@ func (a *StaticAutoscaler) reportTaintsCount(nodes []*apiv1.Node) { } } -func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool { +func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time, unschedulablePodTimeBuffer, unschedulablePodWithGpuTimeBuffer time.Duration) bool { if core_utils.GetOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) { return true } diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 75b0db264d50..f3ebc1d4412b 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -171,11 +171,14 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, time.Now()) n3 := BuildTestNode("n3", 1000, 1000) + SetNodeReadyState(n3, true, time.Now()) n4 := BuildTestNode("n4", 1000, 1000) + n5 := BuildTestNode("n5", 1000, 1000) p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler tn := BuildTestNode("tn", 1000, 1000) tni := schedulerframework.NewNodeInfo() @@ -248,7 +251,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // MaxNodesTotal reached. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -259,10 +262,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() context.MaxNodesTotal = 10 err = autoscaler.RunOnce(time.Now().Add(time.Hour)) @@ -302,12 +305,12 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unregistered nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() provider.AddNodeGroup("ng2", 0, 10, 1) - provider.AddNode("ng2", n3) + provider.AddNode("ng2", n4) err = autoscaler.RunOnce(time.Now().Add(4 * time.Hour)) assert.NoError(t, err) @@ -315,11 +318,11 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) // Remove unregistered nodes. - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - onScaleDownMock.On("ScaleDown", "ng2", "n3").Return(nil).Once() + onScaleDownMock.On("ScaleDown", "ng2", "n4").Return(nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) @@ -329,15 +332,15 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) // Scale up to node gorup min size. - readyNodeLister.SetNodes([]*apiv1.Node{n4}) - allNodeLister.SetNodes([]*apiv1.Node{n4}) + readyNodeLister.SetNodes([]*apiv1.Node{n5}) + allNodeLister.SetNodes([]*apiv1.Node{n5}) allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil) podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up. provider.AddNodeGroup("ng3", 3, 10, 1) - provider.AddNode("ng3", n4) + provider.AddNode("ng3", n5) err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) assert.NoError(t, err) @@ -366,6 +369,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { p1 := BuildTestPod("p1", 100, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler tn1 := BuildTestNode("tn1", 100, 1000) SetNodeReadyState(tn1, true, time.Now()) @@ -457,11 +461,11 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupCreateMock.On("Create", "autoprovisioned-TN2").Return(nil).Once() - onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 1).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 2).Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(time.Hour)) assert.NoError(t, err) @@ -521,10 +525,13 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { SetNodeReadyState(n1, true, time.Now()) n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, time.Now()) + n3 := BuildTestNode("n3", 1000, 1000) + SetNodeReadyState(n3, true, time.Now()) p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + p3 := BuildTestPod("p3", 600, 100) provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { @@ -534,7 +541,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { deleteFinished <- true return ret }) - provider.AddNodeGroup("ng1", 2, 10, 2) + provider.AddNodeGroup("ng1", 3, 10, 2) provider.AddNode("ng1", n1) // broken node, that will be just hanging out there during @@ -606,10 +613,10 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() err = autoscaler.RunOnce(later.Add(time.Hour)) assert.NoError(t, err) @@ -618,11 +625,12 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Remove broken node after going over min size provider.AddNode("ng1", n2) - ng1.SetTargetSize(3) + provider.AddNode("ng1", n3) + ng1.SetTargetSize(4) - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 5276b97c5658..85c4fa6d870e 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -243,6 +243,8 @@ var ( maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") + unschedulablePodTimeBuffer = flag.Duration("unschedulable-pod-time-buffer", 2*time.Second, "How old the oldest unschedulable pod should be before starting scale up.") + unschedulablePodWithGpuTimeBuffer = flag.Duration("unschedulable-pod-with-gpu-time-buffer", 30*time.Second, "How old the oldest unschedulable pod with GPU should be before starting scale up.") ) func isFlagPassed(name string) bool { @@ -390,6 +392,8 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, + UnschedulablePodTimeBuffer: *unschedulablePodTimeBuffer, + UnschedulablePodWithGpuTimeBuffer: *unschedulablePodWithGpuTimeBuffer, } } diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 44939c46e17d..e3332383ae17 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -135,12 +135,13 @@ var ( }, []string{"node_group_type"}, ) - unschedulablePodsCount = k8smetrics.NewGauge( + // Unschedulable pod count can be from scheduler-marked-unschedulable pods or not-yet-processed pods (unknown) + unschedulablePodsCount = k8smetrics.NewGaugeVec( &k8smetrics.GaugeOpts{ Namespace: caNamespace, Name: "unschedulable_pods_count", Help: "Number of unschedulable pods in the cluster.", - }, + }, []string{"count_type"}, ) maxNodesCount = k8smetrics.NewGauge( @@ -472,8 +473,9 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) { } // UpdateUnschedulablePodsCount records number of currently unschedulable pods -func UpdateUnschedulablePodsCount(podsCount int) { - unschedulablePodsCount.Set(float64(podsCount)) +func UpdateUnschedulablePodsCount(uschedulablePodsCount, unknownPodsCount int) { + unschedulablePodsCount.WithLabelValues("unschedulable").Set(float64(uschedulablePodsCount)) + unschedulablePodsCount.WithLabelValues("unknown").Set(float64(unknownPodsCount)) } // UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index cbba21180682..b2b329f42f37 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -156,6 +156,23 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } +// UnknownPods is a helper method that returns all pods which are not yet processed by the scheduler +func UnknownPods(allPods []*apiv1.Pod) []*apiv1.Pod { + var unknownPods []*apiv1.Pod + for _, pod := range allPods { + // Make sure it's not scheduled or deleted + if pod.Spec.NodeName != "" || pod.GetDeletionTimestamp() != nil { + continue + } + // Make sure it's not unschedulable + _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") { + unknownPods = append(unknownPods, pod) + } + } + return unknownPods +} + // UnschedulablePods is a helper method that returns all unschedulable pods from given pod list. func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod From 86ab017967d233f1e85c2875d68d0db59942ca2a Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Tue, 31 Oct 2023 11:04:38 +0000 Subject: [PATCH 2/6] Fix multiple comments and update flags --- .../config/autoscaling_options.go | 12 ++--- .../podlistprocessor/clear_tpu_request.go | 39 ++++++++++++++ .../podlistprocessor/filter_out_expendable.go | 21 ++++---- .../filter_out_expendable_test.go | 34 ++++++------ .../podlistprocessor/pod_list_processor.go | 3 +- cluster-autoscaler/core/static_autoscaler.go | 27 +++++++--- cluster-autoscaler/main.go | 6 +-- cluster-autoscaler/metrics/metrics.go | 13 +++-- .../utils/kubernetes/listers.go | 54 +++++++++++++------ 9 files changed, 142 insertions(+), 67 deletions(-) create mode 100644 cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index ce38224c557a..40441206a0f7 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -277,12 +277,8 @@ type AutoscalingOptions struct { // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint // based on the latency between the CA and the api-server DynamicNodeDeleteDelayAfterTaintEnabled bool - // UnschedulablePodTimeBuffer controls when scale-ups happen so that - // the oldest unschedulable pod is older than UnschedulablePodTimeBuffer - UnschedulablePodTimeBuffer time.Duration - // UnschedulablePodWithGpuTimeBuffer specifies how old should the oldest unschedulable pod with GPU be before starting scale up. - // The idea is that nodes with GPU are very expensive and we're ready to sacrifice - // a bit more latency to wait for more pods and make a more informed scale-up decision. - UnschedulablePodWithGpuTimeBuffer time.Duration - // unschedulablePodWithGpuTimeBuffer = 30 * time.Second + //IgnoreSchedulerProcessing is used to signal whether CA will/won't wait + //for scheduler to mark pods as unschedulable and will process both marked & non-marked pods + //it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up. + IgnoreSchedulerProcessing bool } diff --git a/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go new file mode 100644 index 000000000000..f26104347eaa --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" +) + +type clearTpuRequests struct { +} + +// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods +func NewClearTPURequestsPodListProcessor() *clearTpuRequests { + return &clearTpuRequests{} +} + +// Process removes pods' tpu requests +func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + return tpu.ClearTPURequests(pods), nil +} + +func (p *clearTpuRequests) CleanUp() { +} diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index ba50691abe50..14b17b445831 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -24,17 +24,16 @@ import ( klog "k8s.io/klog/v2" ) -type filterOutExpandable struct { +type filterOutExpendable struct { } -// NewFilterOutExpandablePodListProcessor creates a PodListProcessor filtering out expendable pods -func NewFilterOutExpandablePodListProcessor() *filterOutExpandable { - return &filterOutExpandable{} +// NewFilterOutExpendablePodListProcessor creates a PodListProcessor filtering out expendable pods +func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { + return &filterOutExpendable{} } // Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot -func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { - klog.V(4).Infof("Filtering out expandable pods") +func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { nodes, err := context.AllNodeLister().List() if err != nil { return nil, err @@ -42,25 +41,25 @@ func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff) - if err = p.addPreemptiblePodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil { + if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil { return nil, err } return unschedulablePods, nil } -// addPreemptiblePodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption. +// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption. // this is not strictly correct as we are not simulating preemption itself but it matches // CA logic from before migration to scheduler framework. So let's keep it for now -func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { +func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { for _, p := range pods { if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { - klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) + klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) return caerrors.ToAutoscalerError(caerrors.InternalError, err) } } return nil } -func (p *filterOutExpandable) CleanUp() { +func (p *filterOutExpendable) CleanUp() { } diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go index 35de9db2b20f..5a286b4276de 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go @@ -53,51 +53,51 @@ func TestFilterOutExpendable(t *testing.T) { { name: "non-expendable pods with priority >= to cutoff priority", pods: []*apiv1.Pod{ - test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)), - test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)), + test.BuildTestPod("p1", 1000, 1, priority(2)), + test.BuildTestPod("p2", 1000, 1, priority(3)), }, wantPods: []*apiv1.Pod{ - test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)), - test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)), + test.BuildTestPod("p1", 1000, 1, priority(2)), + test.BuildTestPod("p2", 1000, 1, priority(3)), }, priorityCutoff: 2, }, { name: "single expednable pod", pods: []*apiv1.Pod{ - test.BuildTestPod("p", 1000, 1, getPrioritySetter(2)), + test.BuildTestPod("p", 1000, 1, priority(2)), }, priorityCutoff: 3, }, { name: "single waiting-for-low-priority-preemption pod", pods: []*apiv1.Pod{ - test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")), + test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")), }, nodes: []*apiv1.Node{ test.BuildTestNode("node-1", 2400, 2400), }, wantPodsInSnapshot: []*apiv1.Pod{ - test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")), + test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")), }, }, { name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods", pods: []*apiv1.Pod{ - test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)), - test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)), - test.BuildTestPod("p3", 1000, 1, getPrioritySetter(1)), + test.BuildTestPod("p1", 1000, 1, priority(3)), + test.BuildTestPod("p2", 1000, 1, priority(4)), + test.BuildTestPod("p3", 1000, 1, priority(1)), test.BuildTestPod("p4", 1000, 1), - test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")), + test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")), }, priorityCutoff: 2, wantPods: []*apiv1.Pod{ - test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)), - test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)), + test.BuildTestPod("p1", 1000, 1, priority(3)), + test.BuildTestPod("p2", 1000, 1, priority(4)), test.BuildTestPod("p4", 1000, 1), }, wantPodsInSnapshot: []*apiv1.Pod{ - test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")), + test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")), }, nodes: []*apiv1.Node{ test.BuildTestNode("node-1", 2400, 2400), @@ -107,7 +107,7 @@ func TestFilterOutExpendable(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - processor := NewFilterOutExpandablePodListProcessor() + processor := NewFilterOutExpendablePodListProcessor() snapshot := clustersnapshot.NewBasicClusterSnapshot() snapshot.AddNodes(tc.nodes) @@ -141,12 +141,12 @@ func TestFilterOutExpendable(t *testing.T) { } } -func getPrioritySetter(priority int32) func(*apiv1.Pod) { +func priority(priority int32) func(*apiv1.Pod) { return func(pod *apiv1.Pod) { pod.Spec.Priority = &priority } } -func getNominatedNodeNameSetter(nodeName string) func(*apiv1.Pod) { +func nominatedNodeName(nodeName string) func(*apiv1.Pod) { return func(pod *apiv1.Pod) { pod.Status.NominatedNodeName = nodeName } diff --git a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go index 2aea528397cc..ccbc5dfc8183 100644 --- a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go +++ b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go @@ -32,7 +32,8 @@ type defaultPodListProcessor struct { func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor { return &defaultPodListProcessor{ processors: []pods.PodListProcessor{ - NewFilterOutExpandablePodListProcessor(), + NewClearTPURequestsPodListProcessor(), + NewFilterOutExpendablePodListProcessor(), NewCurrentlyDrainedNodesPodListProcessor(), NewFilterOutSchedulablePodListProcessor(predicateChecker), NewFilterOutDaemonSetPodListProcessor(), diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 9a4a9dffaa77..d9a09d34ff54 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -63,6 +63,13 @@ import ( ) const ( + // How old the oldest unschedulable pod should be before starting scale up. + unschedulablePodTimeBuffer = 2 * time.Second + // How old the oldest unschedulable pod with GPU should be before starting scale up. + // The idea is that nodes with GPU are very expensive and we're ready to sacrifice + // a bit more latency to wait for more pods and make a more informed scale-up decision. + unschedulablePodWithGpuTimeBuffer = 30 * time.Second + // NodeUpcomingAnnotation is an annotation CA adds to nodes which are upcoming. NodeUpcomingAnnotation = "cluster-autoscaler.k8s.io/upcoming-node" @@ -301,7 +308,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.Errorf("Failed to list pods: %v", err) return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } - originalScheduledPods, unschedulablePods, unknownPods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.UnknownPods(pods) + originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) + schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) + if a.IgnoreSchedulerProcessing { + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods) + } // Update cluster resource usage metrics coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime) @@ -443,10 +454,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) - metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(unknownPods)) - // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods - unschedulablePods = append(unschedulablePods, unknownPods...) - + // SchedulerUnprocessed might be zero here if it was disabled + metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed)) + if a.IgnoreSchedulerProcessing { + // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods + unschedulablePods = append(unschedulablePods, schedulerUnprocessed...) + } // Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet. upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes() // For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on @@ -530,7 +543,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable klog.V(1).Info("Max total nodes in cluster reached") - } else if allPodsAreNew(unschedulablePodsToHelp, currentTime, a.AutoscalingOptions.UnschedulablePodTimeBuffer, a.AutoscalingOptions.UnschedulablePodWithGpuTimeBuffer) { + } else if !a.IgnoreSchedulerProcessing && allPodsAreNew(unschedulablePodsToHelp, currentTime) { // The assumption here is that these pods have been created very recently and probably there // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. @@ -963,7 +976,7 @@ func (a *StaticAutoscaler) reportTaintsCount(nodes []*apiv1.Node) { } } -func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time, unschedulablePodTimeBuffer, unschedulablePodWithGpuTimeBuffer time.Duration) bool { +func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool { if core_utils.GetOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) { return true } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 85c4fa6d870e..8429cb06d9aa 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -243,8 +243,7 @@ var ( maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") - unschedulablePodTimeBuffer = flag.Duration("unschedulable-pod-time-buffer", 2*time.Second, "How old the oldest unschedulable pod should be before starting scale up.") - unschedulablePodWithGpuTimeBuffer = flag.Duration("unschedulable-pod-with-gpu-time-buffer", 30*time.Second, "How old the oldest unschedulable pod with GPU should be before starting scale up.") + ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.") ) func isFlagPassed(name string) bool { @@ -392,8 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, - UnschedulablePodTimeBuffer: *unschedulablePodTimeBuffer, - UnschedulablePodWithGpuTimeBuffer: *unschedulablePodWithGpuTimeBuffer, + IgnoreSchedulerProcessing: *ignoreSchedulerProcessing, } } diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index e3332383ae17..dfa1d4f7daa7 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -141,7 +141,7 @@ var ( Namespace: caNamespace, Name: "unschedulable_pods_count", Help: "Number of unschedulable pods in the cluster.", - }, []string{"count_type"}, + }, []string{"type"}, ) maxNodesCount = k8smetrics.NewGauge( @@ -473,9 +473,14 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) { } // UpdateUnschedulablePodsCount records number of currently unschedulable pods -func UpdateUnschedulablePodsCount(uschedulablePodsCount, unknownPodsCount int) { - unschedulablePodsCount.WithLabelValues("unschedulable").Set(float64(uschedulablePodsCount)) - unschedulablePodsCount.WithLabelValues("unknown").Set(float64(unknownPodsCount)) +func UpdateUnschedulablePodsCount(uschedulablePodsCount, schedulerUnprocessedCount int) { + UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount, "unschedulable") + UpdateUnschedulablePodsCountWithLabel(schedulerUnprocessedCount, "scheduler_unprocessed") +} + +// UpdateUnschedulablePodsCountWithLabel records number of currently unschedulable pods wil label "type" value "label" +func UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount int, label string) { + unschedulablePodsCount.WithLabelValues(label).Set(float64(uschedulablePodsCount)) } // UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index b2b329f42f37..4a96344cada6 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -144,11 +144,37 @@ type PodLister interface { List() ([]*apiv1.Pod, error) } +func isScheduled(pod *apiv1.Pod) bool { + if pod == nil { + return false + } + return pod.Spec.NodeName != "" +} +func isDeleted(pod *apiv1.Pod) bool { + if pod == nil { + return false + } + return pod.GetDeletionTimestamp() != nil +} +func isUnschedulable(pod *apiv1.Pod) bool { + if pod == nil { + return false + } + if isScheduled(pod) || isDeleted(pod) { + return false + } + _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if condition == nil || condition.Status != apiv1.ConditionFalse || condition.Reason != apiv1.PodReasonUnschedulable { + return false + } + return true +} + // ScheduledPods is a helper method that returns all scheduled pods from given pod list. func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { var scheduledPods []*apiv1.Pod for _, pod := range allPods { - if pod.Spec.NodeName != "" { + if isScheduled(pod) { scheduledPods = append(scheduledPods, pod) continue } @@ -156,35 +182,33 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } -// UnknownPods is a helper method that returns all pods which are not yet processed by the scheduler -func UnknownPods(allPods []*apiv1.Pod) []*apiv1.Pod { - var unknownPods []*apiv1.Pod +// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the scheduler +func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod { + var unprocessedPods []*apiv1.Pod for _, pod := range allPods { // Make sure it's not scheduled or deleted - if pod.Spec.NodeName != "" || pod.GetDeletionTimestamp() != nil { + if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) { continue } - // Make sure it's not unschedulable + // Make sure that if it's not scheduled it's either + // Not processed (condition is nil) + // Or Reason is empty (not schedulerError, terminated, ...etc) _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") { - unknownPods = append(unknownPods, pod) + unprocessedPods = append(unprocessedPods, pod) } } - return unknownPods + return unprocessedPods } // UnschedulablePods is a helper method that returns all unschedulable pods from given pod list. func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod for _, pod := range allPods { - if pod.Spec.NodeName == "" { - _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) - if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable { - if pod.GetDeletionTimestamp() == nil { - unschedulablePods = append(unschedulablePods, pod) - } - } + if !isUnschedulable(pod) { + continue } + unschedulablePods = append(unschedulablePods, pod) } return unschedulablePods } From cfbfaa271a3dc29fa83a11037ba6c906f2192c4a Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Tue, 7 Nov 2023 19:26:52 +0000 Subject: [PATCH 3/6] Add new test for new behaviour and revert changes made to other tests --- .../core/static_autoscaler_test.go | 153 ++++++++++++++---- 1 file changed, 124 insertions(+), 29 deletions(-) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index f3ebc1d4412b..c8e07145bb35 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -171,14 +171,11 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, time.Now()) n3 := BuildTestNode("n3", 1000, 1000) - SetNodeReadyState(n3, true, time.Now()) n4 := BuildTestNode("n4", 1000, 1000) - n5 := BuildTestNode("n5", 1000, 1000) p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler tn := BuildTestNode("tn", 1000, 1000) tni := schedulerframework.NewNodeInfo() @@ -251,7 +248,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // MaxNodesTotal reached. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -262,10 +259,10 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() context.MaxNodesTotal = 10 err = autoscaler.RunOnce(time.Now().Add(time.Hour)) @@ -305,12 +302,12 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unregistered nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() provider.AddNodeGroup("ng2", 0, 10, 1) - provider.AddNode("ng2", n4) + provider.AddNode("ng2", n3) err = autoscaler.RunOnce(time.Now().Add(4 * time.Hour)) assert.NoError(t, err) @@ -318,11 +315,11 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) // Remove unregistered nodes. - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - onScaleDownMock.On("ScaleDown", "ng2", "n4").Return(nil).Once() + onScaleDownMock.On("ScaleDown", "ng2", "n3").Return(nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) @@ -332,15 +329,15 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) // Scale up to node gorup min size. - readyNodeLister.SetNodes([]*apiv1.Node{n5}) - allNodeLister.SetNodes([]*apiv1.Node{n5}) + readyNodeLister.SetNodes([]*apiv1.Node{n4}) + allNodeLister.SetNodes([]*apiv1.Node{n4}) allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil) podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up. provider.AddNodeGroup("ng3", 3, 10, 1) - provider.AddNode("ng3", n5) + provider.AddNode("ng3", n4) err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour)) assert.NoError(t, err) @@ -369,7 +366,6 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { p1 := BuildTestPod("p1", 100, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler tn1 := BuildTestNode("tn1", 100, 1000) SetNodeReadyState(tn1, true, time.Now()) @@ -461,11 +457,11 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupCreateMock.On("Create", "autoprovisioned-TN2").Return(nil).Once() - onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 2).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "autoprovisioned-TN2", 1).Return(nil).Once() err = autoscaler.RunOnce(time.Now().Add(time.Hour)) assert.NoError(t, err) @@ -525,13 +521,10 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { SetNodeReadyState(n1, true, time.Now()) n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, time.Now()) - n3 := BuildTestNode("n3", 1000, 1000) - SetNodeReadyState(n3, true, time.Now()) p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100) provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { @@ -541,7 +534,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { deleteFinished <- true return ret }) - provider.AddNodeGroup("ng1", 3, 10, 2) + provider.AddNodeGroup("ng1", 2, 10, 2) provider.AddNode("ng1", n1) // broken node, that will be just hanging out there during @@ -613,10 +606,10 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() err = autoscaler.RunOnce(later.Add(time.Hour)) assert.NoError(t, err) @@ -625,12 +618,11 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Remove broken node after going over min size provider.AddNode("ng1", n2) - provider.AddNode("ng1", n3) - ng1.SetTargetSize(4) + ng1.SetTargetSize(3) - readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -999,6 +991,109 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } +func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + allPodListerMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + deleteFinished := make(chan bool, 1) + + now := time.Now() + later := now.Add(1 * time.Minute) + + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Now()) + + p1 := BuildTestPod("p1", 600, 100) + p1.Spec.NodeName = "n1" + p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := onScaleDownMock.ScaleDown(id, name) + deleteFinished <- true + return ret + }) + provider.AddNodeGroup("ng1", 1, 10, 1) + provider.AddNode("ng1", n1) + + ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) + assert.NotNil(t, ng1) + assert.NotNil(t, provider) + + // Create context with mocked lister registry. + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: time.Minute, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + IgnoreSchedulerProcessing: true, + } + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + + context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + assert.NoError(t, err) + + setUpScaleDownActuator(&context, options) + + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + } + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) + + nodes := []*apiv1.Node{n1} + clusterState.UpdateNodes(nodes, nil, now) + + processors := NewTestProcessors(&context) + + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + suOrchestrator := orchestrator.New() + suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: time.Now(), + lastScaleDownFailTime: time.Now(), + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, + } + + // Scale up. + readyNodeLister.SetNodes([]*apiv1.Node{n1}) + allNodeLister.SetNodes([]*apiv1.Node{n1}) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() + + err = autoscaler.RunOnce(later.Add(time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) +} + func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { // setup provider := &mockprovider.CloudProvider{} From 4635a6dc04c4dbede06326c563d209c39b442dfc Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Wed, 8 Nov 2023 10:15:22 +0000 Subject: [PATCH 4/6] Allow users to specify which schedulers to ignore --- .../config/autoscaling_options.go | 3 +++ cluster-autoscaler/core/static_autoscaler.go | 2 +- .../core/static_autoscaler_test.go | 15 ++++++++--- cluster-autoscaler/main.go | 3 +++ .../utils/kubernetes/listers.go | 25 +++++++++++++++++-- .../utils/scheduler/scheduler.go | 9 +++++++ cluster-autoscaler/utils/test/test_utils.go | 7 ++++++ 7 files changed, 58 insertions(+), 6 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 40441206a0f7..66cd06113a55 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -281,4 +281,7 @@ type AutoscalingOptions struct { //for scheduler to mark pods as unschedulable and will process both marked & non-marked pods //it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up. IgnoreSchedulerProcessing bool + //IgnoredSchedulers are used to specify which schedulers to ignore their processing + //if IgnoreSchedulerProcessing is set to true + IgnoredSchedulers map[string]bool } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index d9a09d34ff54..f5f7634bdddf 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -311,7 +311,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) if a.IgnoreSchedulerProcessing { - schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods) + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.IgnoredSchedulers) } // Update cluster resource usage metrics diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index c8e07145bb35..2649f69be95e 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -992,6 +992,9 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * } func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { + ignoredScheduler := "ignored-scheduler" + nonIgnoredScheduler := "non-ignored-scheduler" + readyNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister := kubernetes.NewTestNodeLister(nil) allPodListerMock := &podListerMock{} @@ -1010,7 +1013,9 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler + p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(ignoredScheduler)) // non-default scheduler & ignored, expects a scale-up + p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonIgnoredScheduler)) // non-default scheduler & not ignored, shouldn't cause a scale-up provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { @@ -1041,6 +1046,10 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { MaxCoresTotal: 10, MaxMemoryTotal: 100000, IgnoreSchedulerProcessing: true, + IgnoredSchedulers: map[string]bool{ + apiv1.DefaultSchedulerName: true, + ignoredScheduler: true, + }, } processorCallbacks := newStaticAutoscalerProcessorCallbacks() @@ -1083,10 +1092,10 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 2).Return(nil).Once() + onScaleUpMock.On("ScaleUp", "ng1", 3).Return(nil).Once() err = autoscaler.RunOnce(later.Add(time.Hour)) assert.NoError(t, err) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 8429cb06d9aa..abf28a5dde40 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,6 +31,7 @@ import ( "github.com/spf13/pflag" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server/mux" @@ -244,6 +245,7 @@ var ( forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.") + ignoredSchedulers = pflag.StringSlice("ignore-schedulers", []string{apiv1.DefaultSchedulerName}, fmt.Sprintf("Names of schedulers to be ignored if '--ignore-scheduler-processing' is set to true. default value '%s' is used", apiv1.DefaultSchedulerName)) ) func isFlagPassed(name string) bool { @@ -392,6 +394,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, IgnoreSchedulerProcessing: *ignoreSchedulerProcessing, + IgnoredSchedulers: scheduler_util.GetIgnoredSchedulersMap(*ignoredSchedulers), } } diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index 4a96344cada6..5e0eea234a55 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -144,18 +144,23 @@ type PodLister interface { List() ([]*apiv1.Pod, error) } +// isScheduled checks whether a pod is scheduled on a node or not func isScheduled(pod *apiv1.Pod) bool { if pod == nil { return false } return pod.Spec.NodeName != "" } + +// isDeleted checks whether a pod is deleted not func isDeleted(pod *apiv1.Pod) bool { if pod == nil { return false } return pod.GetDeletionTimestamp() != nil } + +// isUnschedulable checks whether a pod is unschedulable or not func isUnschedulable(pod *apiv1.Pod) bool { if pod == nil { return false @@ -170,6 +175,12 @@ func isUnschedulable(pod *apiv1.Pod) bool { return true } +// getIsDefaultSchedulerIgnored checks if the default scheduler should be ignored or not +func getIsDefaultSchedulerIgnored(ignoredSchedulers map[string]bool) bool { + ignored, ok := ignoredSchedulers[apiv1.DefaultSchedulerName] + return ignored && ok +} + // ScheduledPods is a helper method that returns all scheduled pods from given pod list. func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { var scheduledPods []*apiv1.Pod @@ -182,10 +193,20 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } -// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the scheduler -func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod { +// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified ignored schedulers +func SchedulerUnprocessedPods(allPods []*apiv1.Pod, ignoredSchedulers map[string]bool) []*apiv1.Pod { var unprocessedPods []*apiv1.Pod + + isDefaultSchedulerIgnored := getIsDefaultSchedulerIgnored(ignoredSchedulers) + for _, pod := range allPods { + // Don't add a pod with a scheduler that isn't specified by the user + if !isDefaultSchedulerIgnored && pod.Spec.SchedulerName == "" { + continue + } + if isIgnored, found := ignoredSchedulers[pod.Spec.SchedulerName]; !found || !isIgnored { + continue + } // Make sure it's not scheduled or deleted if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) { continue diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 59d008db64d8..7b31fa77c47a 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -147,3 +147,12 @@ func ConfigFromPath(path string) (*scheduler_config.KubeSchedulerConfiguration, return cfgObj, nil } + +// GetIgnoredSchedulersMap returns a map of scheduler names that should be ignored as keys, and values are set to true +func GetIgnoredSchedulersMap(ignoredSchedulers []string) map[string]bool { + ignoredSchedulersMap := make(map[string]bool, len(ignoredSchedulers)) + for _, scheduler := range ignoredSchedulers { + ignoredSchedulersMap[scheduler] = true + } + return ignoredSchedulersMap +} diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index c984c0a839ae..1b76b24b8bfb 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -82,6 +82,13 @@ func MarkUnschedulable() func(*apiv1.Pod) { } } +// AddSchedulerName adds scheduler name to a pod. +func AddSchedulerName(schedulerName string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Spec.SchedulerName = schedulerName + } +} + // BuildDSTestPod creates a DaemonSet pod with cpu and memory. func BuildDSTestPod(name string, cpu int64, mem int64) *apiv1.Pod { From a1ae4d3b57f71eb3ea2aeac3622dda9b4459d5a3 Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Fri, 17 Nov 2023 19:30:04 +0000 Subject: [PATCH 5/6] Update flags, Improve tests readability & use Bypass instead of ignore in naming --- .../config/autoscaling_options.go | 9 +- .../podlistprocessor/filter_out_expendable.go | 2 + cluster-autoscaler/core/static_autoscaler.go | 9 +- .../core/static_autoscaler_test.go | 313 ++++++++++++------ cluster-autoscaler/main.go | 7 +- .../utils/kubernetes/listers.go | 30 +- .../utils/scheduler/scheduler.go | 16 +- 7 files changed, 246 insertions(+), 140 deletions(-) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 66cd06113a55..a4a42f4ae357 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -277,11 +277,6 @@ type AutoscalingOptions struct { // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint // based on the latency between the CA and the api-server DynamicNodeDeleteDelayAfterTaintEnabled bool - //IgnoreSchedulerProcessing is used to signal whether CA will/won't wait - //for scheduler to mark pods as unschedulable and will process both marked & non-marked pods - //it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up. - IgnoreSchedulerProcessing bool - //IgnoredSchedulers are used to specify which schedulers to ignore their processing - //if IgnoreSchedulerProcessing is set to true - IgnoredSchedulers map[string]bool + // BypassedSchedulers are used to specify which schedulers to bypass their processing + BypassedSchedulers map[string]bool } diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index 14b17b445831..d075044394f3 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -36,12 +36,14 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { nodes, err := context.AllNodeLister().List() if err != nil { + klog.Warningf("Failed to list all nodes while filtering expendable: %v", err) return nil, err } expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff) if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil { + klog.Warningf("Failed to add preempting pods to snapshot: %v", err) return nil, err } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index f5f7634bdddf..a638789cf7e9 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -310,8 +310,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) - if a.IgnoreSchedulerProcessing { - schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.IgnoredSchedulers) + isSchedulerProcessingIgnored := len(a.BypassedSchedulers) > 0 + if isSchedulerProcessingIgnored { + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.BypassedSchedulers) } // Update cluster resource usage metrics @@ -456,7 +457,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr // SchedulerUnprocessed might be zero here if it was disabled metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed)) - if a.IgnoreSchedulerProcessing { + if isSchedulerProcessingIgnored { // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods unschedulablePods = append(unschedulablePods, schedulerUnprocessed...) } @@ -543,7 +544,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable klog.V(1).Info("Max total nodes in cluster reached") - } else if !a.IgnoreSchedulerProcessing && allPodsAreNew(unschedulablePodsToHelp, currentTime) { + } else if !isSchedulerProcessingIgnored && allPodsAreNew(unschedulablePodsToHelp, currentTime) { // The assumption here is that these pods have been created very recently and probably there // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 2649f69be95e..55906cf94350 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -51,6 +51,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" kube_record "k8s.io/client-go/tools/record" @@ -156,6 +157,122 @@ func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor) } +type nodeGroup struct { + name string + nodes []*apiv1.Node + min int + max int +} +type scaleCall struct { + ng string + delta int +} +type testCase struct { + nodeGroups []*nodeGroup + pods []*apiv1.Pod + podListerCallTimes int + daemonSets []*appsv1.DaemonSet + daemonSetListerCallTimes int + pdbs []*policyv1.PodDisruptionBudget + pdbListerCallTimes int + expectedScaleUps []*scaleCall + expectedScaleDowns []*scaleCall + now time.Time + lastScaleupTime time.Time + lastScaleDownFailTime time.Time + runAutoscalerAt time.Time + autoscalingOptions config.AutoscalingOptions + OkTotalUnreadyCount int +} + +func testAutoscaler(t *testing.T, tc testCase) { + readyNodeLister := kubernetes.NewTestNodeLister(nil) + allNodeLister := kubernetes.NewTestNodeLister(nil) + allPodListerMock := &podListerMock{} + podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} + daemonSetListerMock := &daemonSetListerMock{} + onScaleUpMock := &onScaleUpMock{} + onScaleDownMock := &onScaleDownMock{} + deleteFinished := make(chan bool, 1) + + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return onScaleUpMock.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := onScaleDownMock.ScaleDown(id, name) + deleteFinished <- true + return ret + }) + + allNodes := make([]*apiv1.Node, 0) + for _, ng := range tc.nodeGroups { + provider.AddNodeGroup(ng.name, ng.min, ng.max, len(ng.nodes)) + for _, node := range ng.nodes { + allNodes = append(allNodes, node) + provider.AddNode(ng.name, node) + } + reflectedNg := reflect.ValueOf(provider.GetNodeGroup(ng.name)).Interface().(*testprovider.TestNodeGroup) + assert.NotNil(t, reflectedNg) + } + // Create context with mocked lister registry. + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + + context, err := NewScaleTestAutoscalingContext(tc.autoscalingOptions, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + assert.NoError(t, err) + + setUpScaleDownActuator(&context, tc.autoscalingOptions) + + listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: tc.OkTotalUnreadyCount, + } + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(tc.autoscalingOptions.NodeGroupDefaults)) + + clusterState.UpdateNodes(allNodes, nil, tc.now) + processors := NewTestProcessors(&context) + + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + suOrchestrator := orchestrator.New() + suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + lastScaleUpTime: tc.lastScaleupTime, + lastScaleDownFailTime: tc.lastScaleDownFailTime, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, + } + + // Assummes all nodes are ready, to be updated when used in tests which needs non-ready nodes + readyNodeLister.SetNodes(allNodes) + allNodeLister.SetNodes(allNodes) + allPodListerMock.On("List").Return(tc.pods, nil).Times(tc.podListerCallTimes) + daemonSetListerMock.On("List", labels.Everything()).Return(tc.daemonSets, nil).Times(tc.daemonSetListerCallTimes) + podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Times(tc.pdbListerCallTimes) + + for _, scaleUpCall := range tc.expectedScaleUps { + onScaleUpMock.On("ScaleUp", scaleUpCall.ng, scaleUpCall.delta).Return(nil).Once() + } + for _, scaleDownCall := range tc.expectedScaleDowns { + onScaleDownMock.On("ScaleDown", scaleDownCall.ng, scaleDownCall.delta).Return(nil).Once() + } + + err = autoscaler.RunOnce(tc.runAutoscalerAt) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, allPodListerMock, + podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) +} + +// TODO: Refactor tests to use testAutoscaler + func TestStaticAutoscalerRunOnce(t *testing.T) { readyNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister := kubernetes.NewTestNodeLister(nil) @@ -992,47 +1109,8 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * } func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { - ignoredScheduler := "ignored-scheduler" - nonIgnoredScheduler := "non-ignored-scheduler" - - readyNodeLister := kubernetes.NewTestNodeLister(nil) - allNodeLister := kubernetes.NewTestNodeLister(nil) - allPodListerMock := &podListerMock{} - podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} - daemonSetListerMock := &daemonSetListerMock{} - onScaleUpMock := &onScaleUpMock{} - onScaleDownMock := &onScaleDownMock{} - deleteFinished := make(chan bool, 1) - - now := time.Now() - later := now.Add(1 * time.Minute) - - n1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(n1, true, time.Now()) - - p1 := BuildTestPod("p1", 600, 100) - p1.Spec.NodeName = "n1" - p2 := BuildTestPod("p2", 600, 100, MarkUnschedulable()) - p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored - p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(ignoredScheduler)) // non-default scheduler & ignored, expects a scale-up - p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonIgnoredScheduler)) // non-default scheduler & not ignored, shouldn't cause a scale-up - - provider := testprovider.NewTestCloudProvider( - func(id string, delta int) error { - return onScaleUpMock.ScaleUp(id, delta) - }, func(id string, name string) error { - ret := onScaleDownMock.ScaleDown(id, name) - deleteFinished <- true - return ret - }) - provider.AddNodeGroup("ng1", 1, 10, 1) - provider.AddNode("ng1", n1) - - ng1 := reflect.ValueOf(provider.GetNodeGroup("ng1")).Interface().(*testprovider.TestNodeGroup) - assert.NotNil(t, ng1) - assert.NotNil(t, provider) - - // Create context with mocked lister registry. + bypassedScheduler := "bypassed-scheduler" + nonBypassedScheduler := "non-bypassed-scheduler" options := config.AutoscalingOptions{ NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ ScaleDownUnneededTime: time.Minute, @@ -1040,67 +1118,114 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { ScaleDownUtilizationThreshold: 0.5, MaxNodeProvisionTime: 10 * time.Second, }, - EstimatorName: estimator.BinpackingEstimatorName, - ScaleDownEnabled: true, - MaxNodesTotal: 10, - MaxCoresTotal: 10, - MaxMemoryTotal: 100000, - IgnoreSchedulerProcessing: true, - IgnoredSchedulers: map[string]bool{ - apiv1.DefaultSchedulerName: true, - ignoredScheduler: true, - }, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + BypassedSchedulers: scheduler.GetBypassedSchedulersMap([]string{ + apiv1.DefaultSchedulerName, + bypassedScheduler, + }), } - processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, nil, provider, processorCallbacks, nil) - assert.NoError(t, err) - - setUpScaleDownActuator(&context, options) + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, time.Now()) - listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) - context.ListerRegistry = listerRegistry + p1 := BuildTestPod("p1", 600, 100) + p1.Spec.NodeName = "n1" + p2 := BuildTestPod("p2", 100, 100, AddSchedulerName(bypassedScheduler)) + p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up + p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonBypassedScheduler)) - clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: 1, + testCases := map[string]testCase{ + "Unprocessed pod with bypassed scheduler doesn't cause a scale-up when there's capacity": { + pods: []*apiv1.Pod{p1, p2}, + podListerCallTimes: 2, + nodeGroups: []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }}, + daemonSetListerCallTimes: 1, + pdbListerCallTimes: 1, + expectedScaleUps: []*scaleCall{}, + now: time.Now(), + lastScaleupTime: time.Now(), + lastScaleDownFailTime: time.Now(), + runAutoscalerAt: time.Now().Add(time.Hour), + autoscalingOptions: options, + }, + "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Default Scheduler": { + pods: []*apiv1.Pod{p1, p3}, + podListerCallTimes: 2, + nodeGroups: []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }}, + daemonSetListerCallTimes: 1, + pdbListerCallTimes: 1, + expectedScaleUps: []*scaleCall{{ + ng: "ng1", + delta: 1, + }}, + now: time.Now(), + lastScaleupTime: time.Now(), + lastScaleDownFailTime: time.Now(), + runAutoscalerAt: time.Now().Add(time.Hour), + autoscalingOptions: options, + }, + "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Non-default Scheduler": { + pods: []*apiv1.Pod{p1, p4}, + podListerCallTimes: 2, + nodeGroups: []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }}, + daemonSetListerCallTimes: 1, + pdbListerCallTimes: 1, + expectedScaleUps: []*scaleCall{{ + ng: "ng1", + delta: 1, + }}, + now: time.Now(), + lastScaleupTime: time.Now(), + lastScaleDownFailTime: time.Now(), + runAutoscalerAt: time.Now().Add(time.Hour), + autoscalingOptions: options, + }, + "Unprocessed pod with non-bypassed scheduler doesn't cause a scale-up when there's no capacity": { + pods: []*apiv1.Pod{p1, p5}, + podListerCallTimes: 2, + nodeGroups: []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }}, + daemonSetListerCallTimes: 1, + pdbListerCallTimes: 1, + expectedScaleUps: []*scaleCall{}, + now: time.Now(), + lastScaleupTime: time.Now(), + lastScaleDownFailTime: time.Now(), + runAutoscalerAt: time.Now().Add(time.Hour), + autoscalingOptions: options, + }, } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - - nodes := []*apiv1.Node{n1} - clusterState.UpdateNodes(nodes, nil, now) - - processors := NewTestProcessors(&context) - - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) - suOrchestrator := orchestrator.New() - suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) - autoscaler := &StaticAutoscaler{ - AutoscalingContext: &context, - clusterStateRegistry: clusterState, - lastScaleUpTime: time.Now(), - lastScaleDownFailTime: time.Now(), - scaleDownPlanner: sdPlanner, - scaleDownActuator: sdActuator, - scaleUpOrchestrator: suOrchestrator, - processors: processors, - processorCallbacks: processorCallbacks, + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + testAutoscaler(t, tc) + }) } - // Scale up. - readyNodeLister.SetNodes([]*apiv1.Node{n1}) - allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Twice() - daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() - onScaleUpMock.On("ScaleUp", "ng1", 3).Return(nil).Once() - - err = autoscaler.RunOnce(later.Add(time.Hour)) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index abf28a5dde40..01f30fb3bc91 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -31,7 +31,6 @@ import ( "github.com/spf13/pflag" - apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/server/mux" @@ -244,8 +243,7 @@ var ( maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") - ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.") - ignoredSchedulers = pflag.StringSlice("ignore-schedulers", []string{apiv1.DefaultSchedulerName}, fmt.Sprintf("Names of schedulers to be ignored if '--ignore-scheduler-processing' is set to true. default value '%s' is used", apiv1.DefaultSchedulerName)) + bypassedSchedulers = pflag.StringSlice("bypassed-scheduler-names", []string{}, fmt.Sprintf("Names of schedulers to bypass. If set to non-empty value, CA will not wait for pods to reach a certain age before triggering a scale-up.")) ) func isFlagPassed(name string) bool { @@ -393,8 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, - IgnoreSchedulerProcessing: *ignoreSchedulerProcessing, - IgnoredSchedulers: scheduler_util.GetIgnoredSchedulersMap(*ignoredSchedulers), + BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers), } } diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index 5e0eea234a55..b9be94b6e665 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -145,26 +145,20 @@ type PodLister interface { } // isScheduled checks whether a pod is scheduled on a node or not +// This method doesn't check for nil ptr, it's the responsibility of the caller func isScheduled(pod *apiv1.Pod) bool { - if pod == nil { - return false - } return pod.Spec.NodeName != "" } // isDeleted checks whether a pod is deleted not +// This method doesn't check for nil ptr, it's the responsibility of the caller func isDeleted(pod *apiv1.Pod) bool { - if pod == nil { - return false - } return pod.GetDeletionTimestamp() != nil } // isUnschedulable checks whether a pod is unschedulable or not +// This method doesn't check for nil ptr, it's the responsibility of the caller func isUnschedulable(pod *apiv1.Pod) bool { - if pod == nil { - return false - } if isScheduled(pod) || isDeleted(pod) { return false } @@ -175,12 +169,6 @@ func isUnschedulable(pod *apiv1.Pod) bool { return true } -// getIsDefaultSchedulerIgnored checks if the default scheduler should be ignored or not -func getIsDefaultSchedulerIgnored(ignoredSchedulers map[string]bool) bool { - ignored, ok := ignoredSchedulers[apiv1.DefaultSchedulerName] - return ignored && ok -} - // ScheduledPods is a helper method that returns all scheduled pods from given pod list. func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { var scheduledPods []*apiv1.Pod @@ -193,18 +181,12 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } -// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified ignored schedulers -func SchedulerUnprocessedPods(allPods []*apiv1.Pod, ignoredSchedulers map[string]bool) []*apiv1.Pod { +// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified bypassed schedulers +func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[string]bool) []*apiv1.Pod { var unprocessedPods []*apiv1.Pod - isDefaultSchedulerIgnored := getIsDefaultSchedulerIgnored(ignoredSchedulers) - for _, pod := range allPods { - // Don't add a pod with a scheduler that isn't specified by the user - if !isDefaultSchedulerIgnored && pod.Spec.SchedulerName == "" { - continue - } - if isIgnored, found := ignoredSchedulers[pod.Spec.SchedulerName]; !found || !isIgnored { + if canBypass := bypassedSchedulers[pod.Spec.SchedulerName]; !canBypass { continue } // Make sure it's not scheduled or deleted diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 7b31fa77c47a..cd981aa72fbf 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -148,11 +148,15 @@ func ConfigFromPath(path string) (*scheduler_config.KubeSchedulerConfiguration, return cfgObj, nil } -// GetIgnoredSchedulersMap returns a map of scheduler names that should be ignored as keys, and values are set to true -func GetIgnoredSchedulersMap(ignoredSchedulers []string) map[string]bool { - ignoredSchedulersMap := make(map[string]bool, len(ignoredSchedulers)) - for _, scheduler := range ignoredSchedulers { - ignoredSchedulersMap[scheduler] = true +// GetBypassedSchedulersMap returns a map of scheduler names that should be bypassed as keys, and values are set to true +// Also sets "" (empty string) to true if default scheduler is bypassed +func GetBypassedSchedulersMap(bypassedSchedulers []string) map[string]bool { + bypassedSchedulersMap := make(map[string]bool, len(bypassedSchedulers)) + for _, scheduler := range bypassedSchedulers { + bypassedSchedulersMap[scheduler] = true } - return ignoredSchedulersMap + if canBypass := bypassedSchedulersMap[apiv1.DefaultSchedulerName]; canBypass { + bypassedSchedulersMap[""] = true + } + return bypassedSchedulersMap } From 5115f1263e44fefd164607c4276c852099ed6fcd Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Tue, 21 Nov 2023 19:03:31 +0000 Subject: [PATCH 6/6] Update static_autoscaler tests & handle pod list processors errors as warnings --- .../podlistprocessor/filter_out_expendable.go | 5 +- cluster-autoscaler/core/static_autoscaler.go | 6 +- .../core/static_autoscaler_test.go | 294 +++++++++--------- 3 files changed, 152 insertions(+), 153 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index d075044394f3..0ec929814a1a 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -17,6 +17,8 @@ limitations under the License. package podlistprocessor import ( + "fmt" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/context" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -36,8 +38,7 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { nodes, err := context.AllNodeLister().List() if err != nil { - klog.Warningf("Failed to list all nodes while filtering expendable: %v", err) - return nil, err + return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err) } expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a638789cf7e9..bdfad667ec7f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -506,7 +506,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l) } - unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + + if err != nil { + klog.Warningf("Failed to process unschedulable pods: %v", err) + } // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 55906cf94350..dd63e6d90c5b 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" @@ -167,111 +168,122 @@ type scaleCall struct { ng string delta int } -type testCase struct { - nodeGroups []*nodeGroup - pods []*apiv1.Pod - podListerCallTimes int - daemonSets []*appsv1.DaemonSet - daemonSetListerCallTimes int - pdbs []*policyv1.PodDisruptionBudget - pdbListerCallTimes int - expectedScaleUps []*scaleCall - expectedScaleDowns []*scaleCall - now time.Time - lastScaleupTime time.Time - lastScaleDownFailTime time.Time - runAutoscalerAt time.Time - autoscalingOptions config.AutoscalingOptions - OkTotalUnreadyCount int + +type commonMocks struct { + readyNodeLister *kube_util.TestNodeLister + allNodeLister *kube_util.TestNodeLister + allPodLister *podListerMock + podDisruptionBudgetLister *podDisruptionBudgetListerMock + daemonSetLister *daemonSetListerMock + + onScaleUp *onScaleUpMock + onScaleDown *onScaleDownMock } -func testAutoscaler(t *testing.T, tc testCase) { - readyNodeLister := kubernetes.NewTestNodeLister(nil) - allNodeLister := kubernetes.NewTestNodeLister(nil) - allPodListerMock := &podListerMock{} - podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} - daemonSetListerMock := &daemonSetListerMock{} - onScaleUpMock := &onScaleUpMock{} - onScaleDownMock := &onScaleDownMock{} - deleteFinished := make(chan bool, 1) +func newCommonMocks() *commonMocks { + return &commonMocks{ + readyNodeLister: kubernetes.NewTestNodeLister(nil), + allNodeLister: kubernetes.NewTestNodeLister(nil), + allPodLister: &podListerMock{}, + podDisruptionBudgetLister: &podDisruptionBudgetListerMock{}, + daemonSetLister: &daemonSetListerMock{}, + onScaleUp: &onScaleUpMock{}, + onScaleDown: &onScaleDownMock{}, + } +} + +type autoscalerSetupConfig struct { + nodeGroups []*nodeGroup + nodeStateUpdateTime time.Time + autoscalingOptions config.AutoscalingOptions + clusterStateConfig clusterstate.ClusterStateRegistryConfig + mocks *commonMocks + nodesDeleted chan bool +} +func setupCloudProvider(config *autoscalerSetupConfig) (*testprovider.TestCloudProvider, error) { provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { - return onScaleUpMock.ScaleUp(id, delta) + return config.mocks.onScaleUp.ScaleUp(id, delta) }, func(id string, name string) error { - ret := onScaleDownMock.ScaleDown(id, name) - deleteFinished <- true + ret := config.mocks.onScaleDown.ScaleDown(id, name) + config.nodesDeleted <- true return ret }) - allNodes := make([]*apiv1.Node, 0) - for _, ng := range tc.nodeGroups { + for _, ng := range config.nodeGroups { provider.AddNodeGroup(ng.name, ng.min, ng.max, len(ng.nodes)) for _, node := range ng.nodes { - allNodes = append(allNodes, node) provider.AddNode(ng.name, node) } reflectedNg := reflect.ValueOf(provider.GetNodeGroup(ng.name)).Interface().(*testprovider.TestNodeGroup) - assert.NotNil(t, reflectedNg) + if reflectedNg == nil { + return nil, fmt.Errorf("Nodegroup '%v' found as nil after setting up cloud provider", ng.name) + } + } + return provider, nil +} + +func setupAutoscalingContext(opts config.AutoscalingOptions, provider cloudprovider.CloudProvider, processorCallbacks callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) { + context, err := NewScaleTestAutoscalingContext(opts, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + if err != nil { + return context, err + } + return context, nil +} + +func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { + provider, err := setupCloudProvider(config) + if err != nil { + return nil, err } + + allNodes := make([]*apiv1.Node, 0) + for _, ng := range config.nodeGroups { + allNodes = append(allNodes, ng.nodes...) + } + // Create context with mocked lister registry. processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(tc.autoscalingOptions, &fake.Clientset{}, nil, provider, processorCallbacks, nil) - assert.NoError(t, err) + context, err := setupAutoscalingContext(config.autoscalingOptions, provider, processorCallbacks) - setUpScaleDownActuator(&context, tc.autoscalingOptions) + if err != nil { + return nil, err + } - listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, + setUpScaleDownActuator(&context, config.autoscalingOptions) + + listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.allPodLister, + config.mocks.podDisruptionBudgetLister, config.mocks.daemonSetLister, nil, nil, nil, nil) context.ListerRegistry = listerRegistry - clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: tc.OkTotalUnreadyCount, - } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(tc.autoscalingOptions.NodeGroupDefaults)) + ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults) + clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor) + + clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime) - clusterState.UpdateNodes(allNodes, nil, tc.now) processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) autoscaler := &StaticAutoscaler{ - AutoscalingContext: &context, - clusterStateRegistry: clusterState, - lastScaleUpTime: tc.lastScaleupTime, - lastScaleDownFailTime: tc.lastScaleDownFailTime, - scaleDownPlanner: sdPlanner, - scaleDownActuator: sdActuator, - scaleUpOrchestrator: suOrchestrator, - processors: processors, - processorCallbacks: processorCallbacks, - } - - // Assummes all nodes are ready, to be updated when used in tests which needs non-ready nodes - readyNodeLister.SetNodes(allNodes) - allNodeLister.SetNodes(allNodes) - allPodListerMock.On("List").Return(tc.pods, nil).Times(tc.podListerCallTimes) - daemonSetListerMock.On("List", labels.Everything()).Return(tc.daemonSets, nil).Times(tc.daemonSetListerCallTimes) - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Times(tc.pdbListerCallTimes) - - for _, scaleUpCall := range tc.expectedScaleUps { - onScaleUpMock.On("ScaleUp", scaleUpCall.ng, scaleUpCall.delta).Return(nil).Once() - } - for _, scaleDownCall := range tc.expectedScaleDowns { - onScaleDownMock.On("ScaleDown", scaleDownCall.ng, scaleDownCall.delta).Return(nil).Once() + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, } - err = autoscaler.RunOnce(tc.runAutoscalerAt) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + return autoscaler, nil } -// TODO: Refactor tests to use testAutoscaler +// TODO: Refactor tests to use setupAutoscaler func TestStaticAutoscalerRunOnce(t *testing.T) { readyNodeLister := kubernetes.NewTestNodeLister(nil) @@ -345,7 +357,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -554,7 +566,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -704,7 +716,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -852,7 +864,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -983,7 +995,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1081,7 +1093,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1108,7 +1120,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } -func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { +func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) { bypassedScheduler := "bypassed-scheduler" nonBypassedScheduler := "non-bypassed-scheduler" options := config.AutoscalingOptions{ @@ -1128,101 +1140,83 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { bypassedScheduler, }), } + now := time.Now() n1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(n1, true, time.Now()) + SetNodeReadyState(n1, true, now) + + ngs := []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }} p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 100, 100, AddSchedulerName(bypassedScheduler)) - p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored - p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonBypassedScheduler)) - testCases := map[string]testCase{ + testSetupConfig := &autoscalerSetupConfig{ + autoscalingOptions: options, + nodeGroups: ngs, + nodeStateUpdateTime: now, + mocks: newCommonMocks(), + clusterStateConfig: clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + }, + } + + testCases := map[string]struct { + setupConfig *autoscalerSetupConfig + pods []*apiv1.Pod + expectedScaleUp *scaleCall + }{ "Unprocessed pod with bypassed scheduler doesn't cause a scale-up when there's capacity": { - pods: []*apiv1.Pod{p1, p2}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + pods: []*apiv1.Pod{p1, p2}, + setupConfig: testSetupConfig, }, "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Default Scheduler": { - pods: []*apiv1.Pod{p1, p3}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{{ + pods: []*apiv1.Pod{p1, p3}, + expectedScaleUp: &scaleCall{ ng: "ng1", delta: 1, - }}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + }, + setupConfig: testSetupConfig, }, "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Non-default Scheduler": { - pods: []*apiv1.Pod{p1, p4}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{{ + pods: []*apiv1.Pod{p1, p4}, + setupConfig: testSetupConfig, + expectedScaleUp: &scaleCall{ ng: "ng1", delta: 1, - }}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + }, }, "Unprocessed pod with non-bypassed scheduler doesn't cause a scale-up when there's no capacity": { - pods: []*apiv1.Pod{p1, p5}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + pods: []*apiv1.Pod{p1, p5}, + setupConfig: testSetupConfig, }, } for tcName, tc := range testCases { t.Run(tcName, func(t *testing.T) { - testAutoscaler(t, tc) + autoscaler, err := setupAutoscaler(tc.setupConfig) + assert.NoError(t, err) + + tc.setupConfig.mocks.readyNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice() + tc.setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + tc.setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + if tc.expectedScaleUp != nil { + tc.setupConfig.mocks.onScaleUp.On("ScaleUp", tc.expectedScaleUp.ng, tc.expectedScaleUp.delta).Return(nil).Once() + } + err = autoscaler.RunOnce(now.Add(time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, tc.setupConfig.mocks.allPodLister, + tc.setupConfig.mocks.podDisruptionBudgetLister, tc.setupConfig.mocks.daemonSetLister, tc.setupConfig.mocks.onScaleUp, tc.setupConfig.mocks.onScaleDown) }) } @@ -2081,7 +2075,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second