diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 05e6cc2fb6e8..a4a42f4ae357 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -277,4 +277,6 @@ type AutoscalingOptions struct { // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint // based on the latency between the CA and the api-server DynamicNodeDeleteDelayAfterTaintEnabled bool + // BypassedSchedulers are used to specify which schedulers to bypass their processing + BypassedSchedulers map[string]bool } diff --git a/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go new file mode 100644 index 000000000000..f26104347eaa --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" +) + +type clearTpuRequests struct { +} + +// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods +func NewClearTPURequestsPodListProcessor() *clearTpuRequests { + return &clearTpuRequests{} +} + +// Process removes pods' tpu requests +func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + return tpu.ClearTPURequests(pods), nil +} + +func (p *clearTpuRequests) CleanUp() { +} diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go new file mode 100644 index 000000000000..0ec929814a1a --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + "fmt" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" + caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + klog "k8s.io/klog/v2" +) + +type filterOutExpendable struct { +} + +// NewFilterOutExpendablePodListProcessor creates a PodListProcessor filtering out expendable pods +func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { + return &filterOutExpendable{} +} + +// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot +func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + nodes, err := context.AllNodeLister().List() + if err != nil { + return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err) + } + expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff + + unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff) + if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil { + klog.Warningf("Failed to add preempting pods to snapshot: %v", err) + return nil, err + } + + return unschedulablePods, nil +} + +// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption. +// this is not strictly correct as we are not simulating preemption itself but it matches +// CA logic from before migration to scheduler framework. So let's keep it for now +func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { + for _, p := range pods { + if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { + klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) + return caerrors.ToAutoscalerError(caerrors.InternalError, err) + } + } + return nil +} + +func (p *filterOutExpendable) CleanUp() { +} diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go new file mode 100644 index 000000000000..5a286b4276de --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestFilterOutExpendable(t *testing.T) { + testCases := []struct { + name string + pods []*apiv1.Pod + wantPods []*apiv1.Pod + wantPodsInSnapshot []*apiv1.Pod + priorityCutoff int + nodes []*apiv1.Node + }{ + { + name: "no pods", + }, + { + name: "single non-expendable pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1), + }, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1), + }, + }, + { + name: "non-expendable pods with priority >= to cutoff priority", + pods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, priority(2)), + test.BuildTestPod("p2", 1000, 1, priority(3)), + }, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, priority(2)), + test.BuildTestPod("p2", 1000, 1, priority(3)), + }, + priorityCutoff: 2, + }, + { + name: "single expednable pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, priority(2)), + }, + priorityCutoff: 3, + }, + { + name: "single waiting-for-low-priority-preemption pod", + pods: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")), + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("node-1", 2400, 2400), + }, + wantPodsInSnapshot: []*apiv1.Pod{ + test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")), + }, + }, + { + name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods", + pods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, priority(3)), + test.BuildTestPod("p2", 1000, 1, priority(4)), + test.BuildTestPod("p3", 1000, 1, priority(1)), + test.BuildTestPod("p4", 1000, 1), + test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")), + }, + priorityCutoff: 2, + wantPods: []*apiv1.Pod{ + test.BuildTestPod("p1", 1000, 1, priority(3)), + test.BuildTestPod("p2", 1000, 1, priority(4)), + test.BuildTestPod("p4", 1000, 1), + }, + wantPodsInSnapshot: []*apiv1.Pod{ + test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")), + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("node-1", 2400, 2400), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + processor := NewFilterOutExpendablePodListProcessor() + snapshot := clustersnapshot.NewBasicClusterSnapshot() + snapshot.AddNodes(tc.nodes) + + pods, err := processor.Process(&context.AutoscalingContext{ + ClusterSnapshot: snapshot, + AutoscalingOptions: config.AutoscalingOptions{ + ExpendablePodsPriorityCutoff: tc.priorityCutoff, + }, + AutoscalingKubeClients: context.AutoscalingKubeClients{ + ListerRegistry: newMockListerRegistry(tc.nodes), + }, + }, tc.pods) + + assert.NoError(t, err) + assert.ElementsMatch(t, tc.wantPods, pods) + + var podsInSnapshot []*apiv1.Pod + nodeInfoLister := snapshot.NodeInfos() + // Get pods in snapshot + for _, n := range tc.nodes { + nodeInfo, err := nodeInfoLister.Get(n.Name) + assert.NoError(t, err) + assert.NotEqual(t, nodeInfo.Pods, nil) + for _, podInfo := range nodeInfo.Pods { + podsInSnapshot = append(podsInSnapshot, podInfo.Pod) + } + } + + assert.ElementsMatch(t, tc.wantPodsInSnapshot, podsInSnapshot) + }) + } +} + +func priority(priority int32) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Spec.Priority = &priority + } +} +func nominatedNodeName(nodeName string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Status.NominatedNodeName = nodeName + } +} + +type mockListerRegistry struct { + kube_util.ListerRegistry + nodes []*apiv1.Node +} + +func newMockListerRegistry(nodes []*apiv1.Node) *mockListerRegistry { + return &mockListerRegistry{ + nodes: nodes, + } +} + +func (mlr mockListerRegistry) AllNodeLister() kube_util.NodeLister { + return &mockNodeLister{nodes: mlr.nodes} +} + +type mockNodeLister struct { + nodes []*apiv1.Node +} + +func (mnl *mockNodeLister) List() ([]*apiv1.Node, error) { + return mnl.nodes, nil +} +func (mnl *mockNodeLister) Get(name string) (*apiv1.Node, error) { + return nil, fmt.Errorf("Unsupported operation") +} diff --git a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go index d357d00bde18..ccbc5dfc8183 100644 --- a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go +++ b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go @@ -32,6 +32,8 @@ type defaultPodListProcessor struct { func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor { return &defaultPodListProcessor{ processors: []pods.PodListProcessor{ + NewClearTPURequestsPodListProcessor(), + NewFilterOutExpendablePodListProcessor(), NewCurrentlyDrainedNodesPodListProcessor(), NewFilterOutSchedulablePodListProcessor(predicateChecker), NewFilterOutDaemonSetPodListProcessor(), diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index ac3103973931..bdfad667ec7f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -57,7 +57,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" "k8s.io/utils/integer" klog "k8s.io/klog/v2" @@ -310,6 +309,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) + schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) + isSchedulerProcessingIgnored := len(a.BypassedSchedulers) > 0 + if isSchedulerProcessingIgnored { + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods, a.BypassedSchedulers) + } // Update cluster resource usage metrics coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime) @@ -451,25 +455,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) - metrics.UpdateUnschedulablePodsCount(len(unschedulablePods)) - - unschedulablePods = tpu.ClearTPURequests(unschedulablePods) - - // todo: move split and append below to separate PodListProcessor - // Some unschedulable pods can be waiting for lower priority pods preemption so they have nominated node to run. - // Such pods don't require scale up but should be considered during scale down. - unschedulablePods, unschedulableWaitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(unschedulablePods, allNodes, a.ExpendablePodsPriorityCutoff) - - // modify the snapshot simulating scheduling of pods waiting for preemption. - // this is not strictly correct as we are not simulating preemption itself but it matches - // CA logic from before migration to scheduler framework. So let's keep it for now - for _, p := range unschedulableWaitingForLowerPriorityPreemption { - if err := a.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { - klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } + // SchedulerUnprocessed might be zero here if it was disabled + metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed)) + if isSchedulerProcessingIgnored { + // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods + unschedulablePods = append(unschedulablePods, schedulerUnprocessed...) } - // Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet. upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes() // For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on @@ -515,7 +506,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l) } - unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + + if err != nil { + klog.Warningf("Failed to process unschedulable pods: %v", err) + } // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) @@ -553,7 +548,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable klog.V(1).Info("Max total nodes in cluster reached") - } else if allPodsAreNew(unschedulablePodsToHelp, currentTime) { + } else if !isSchedulerProcessingIgnored && allPodsAreNew(unschedulablePodsToHelp, currentTime) { // The assumption here is that these pods have been created very recently and probably there // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 75b0db264d50..dd63e6d90c5b 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" @@ -51,6 +52,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" kube_record "k8s.io/client-go/tools/record" @@ -156,6 +158,133 @@ func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor) } +type nodeGroup struct { + name string + nodes []*apiv1.Node + min int + max int +} +type scaleCall struct { + ng string + delta int +} + +type commonMocks struct { + readyNodeLister *kube_util.TestNodeLister + allNodeLister *kube_util.TestNodeLister + allPodLister *podListerMock + podDisruptionBudgetLister *podDisruptionBudgetListerMock + daemonSetLister *daemonSetListerMock + + onScaleUp *onScaleUpMock + onScaleDown *onScaleDownMock +} + +func newCommonMocks() *commonMocks { + return &commonMocks{ + readyNodeLister: kubernetes.NewTestNodeLister(nil), + allNodeLister: kubernetes.NewTestNodeLister(nil), + allPodLister: &podListerMock{}, + podDisruptionBudgetLister: &podDisruptionBudgetListerMock{}, + daemonSetLister: &daemonSetListerMock{}, + onScaleUp: &onScaleUpMock{}, + onScaleDown: &onScaleDownMock{}, + } +} + +type autoscalerSetupConfig struct { + nodeGroups []*nodeGroup + nodeStateUpdateTime time.Time + autoscalingOptions config.AutoscalingOptions + clusterStateConfig clusterstate.ClusterStateRegistryConfig + mocks *commonMocks + nodesDeleted chan bool +} + +func setupCloudProvider(config *autoscalerSetupConfig) (*testprovider.TestCloudProvider, error) { + provider := testprovider.NewTestCloudProvider( + func(id string, delta int) error { + return config.mocks.onScaleUp.ScaleUp(id, delta) + }, func(id string, name string) error { + ret := config.mocks.onScaleDown.ScaleDown(id, name) + config.nodesDeleted <- true + return ret + }) + + for _, ng := range config.nodeGroups { + provider.AddNodeGroup(ng.name, ng.min, ng.max, len(ng.nodes)) + for _, node := range ng.nodes { + provider.AddNode(ng.name, node) + } + reflectedNg := reflect.ValueOf(provider.GetNodeGroup(ng.name)).Interface().(*testprovider.TestNodeGroup) + if reflectedNg == nil { + return nil, fmt.Errorf("Nodegroup '%v' found as nil after setting up cloud provider", ng.name) + } + } + return provider, nil +} + +func setupAutoscalingContext(opts config.AutoscalingOptions, provider cloudprovider.CloudProvider, processorCallbacks callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) { + context, err := NewScaleTestAutoscalingContext(opts, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + if err != nil { + return context, err + } + return context, nil +} + +func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { + provider, err := setupCloudProvider(config) + if err != nil { + return nil, err + } + + allNodes := make([]*apiv1.Node, 0) + for _, ng := range config.nodeGroups { + allNodes = append(allNodes, ng.nodes...) + } + + // Create context with mocked lister registry. + processorCallbacks := newStaticAutoscalerProcessorCallbacks() + + context, err := setupAutoscalingContext(config.autoscalingOptions, provider, processorCallbacks) + + if err != nil { + return nil, err + } + + setUpScaleDownActuator(&context, config.autoscalingOptions) + + listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.allPodLister, + config.mocks.podDisruptionBudgetLister, config.mocks.daemonSetLister, + nil, nil, nil, nil) + context.ListerRegistry = listerRegistry + + ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults) + clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor) + + clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime) + + processors := NewTestProcessors(&context) + + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + suOrchestrator := orchestrator.New() + suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) + + autoscaler := &StaticAutoscaler{ + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, + } + + return autoscaler, nil +} + +// TODO: Refactor tests to use setupAutoscaler + func TestStaticAutoscalerRunOnce(t *testing.T) { readyNodeLister := kubernetes.NewTestNodeLister(nil) allNodeLister := kubernetes.NewTestNodeLister(nil) @@ -228,7 +357,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -437,7 +566,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -587,7 +716,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -735,7 +864,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -866,7 +995,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -964,7 +1093,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -991,6 +1120,108 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } +func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) { + bypassedScheduler := "bypassed-scheduler" + nonBypassedScheduler := "non-bypassed-scheduler" + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: time.Minute, + ScaleDownUnreadyTime: time.Minute, + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + BypassedSchedulers: scheduler.GetBypassedSchedulersMap([]string{ + apiv1.DefaultSchedulerName, + bypassedScheduler, + }), + } + now := time.Now() + + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, now) + + ngs := []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }} + + p1 := BuildTestPod("p1", 600, 100) + p1.Spec.NodeName = "n1" + p2 := BuildTestPod("p2", 100, 100, AddSchedulerName(bypassedScheduler)) + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up + p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonBypassedScheduler)) + + testSetupConfig := &autoscalerSetupConfig{ + autoscalingOptions: options, + nodeGroups: ngs, + nodeStateUpdateTime: now, + mocks: newCommonMocks(), + clusterStateConfig: clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + }, + } + + testCases := map[string]struct { + setupConfig *autoscalerSetupConfig + pods []*apiv1.Pod + expectedScaleUp *scaleCall + }{ + "Unprocessed pod with bypassed scheduler doesn't cause a scale-up when there's capacity": { + pods: []*apiv1.Pod{p1, p2}, + setupConfig: testSetupConfig, + }, + "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Default Scheduler": { + pods: []*apiv1.Pod{p1, p3}, + expectedScaleUp: &scaleCall{ + ng: "ng1", + delta: 1, + }, + setupConfig: testSetupConfig, + }, + "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Non-default Scheduler": { + pods: []*apiv1.Pod{p1, p4}, + setupConfig: testSetupConfig, + expectedScaleUp: &scaleCall{ + ng: "ng1", + delta: 1, + }, + }, + "Unprocessed pod with non-bypassed scheduler doesn't cause a scale-up when there's no capacity": { + pods: []*apiv1.Pod{p1, p5}, + setupConfig: testSetupConfig, + }, + } + + for tcName, tc := range testCases { + t.Run(tcName, func(t *testing.T) { + autoscaler, err := setupAutoscaler(tc.setupConfig) + assert.NoError(t, err) + + tc.setupConfig.mocks.readyNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice() + tc.setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + tc.setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + if tc.expectedScaleUp != nil { + tc.setupConfig.mocks.onScaleUp.On("ScaleUp", tc.expectedScaleUp.ng, tc.expectedScaleUp.delta).Return(nil).Once() + } + err = autoscaler.RunOnce(now.Add(time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, tc.setupConfig.mocks.allPodLister, + tc.setupConfig.mocks.podDisruptionBudgetLister, tc.setupConfig.mocks.daemonSetLister, tc.setupConfig.mocks.onScaleUp, tc.setupConfig.mocks.onScaleDown) + }) + } + +} + func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { // setup provider := &mockprovider.CloudProvider{} @@ -1844,7 +2075,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 5276b97c5658..01f30fb3bc91 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -243,6 +243,7 @@ var ( maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") + bypassedSchedulers = pflag.StringSlice("bypassed-scheduler-names", []string{}, fmt.Sprintf("Names of schedulers to bypass. If set to non-empty value, CA will not wait for pods to reach a certain age before triggering a scale-up.")) ) func isFlagPassed(name string) bool { @@ -390,6 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, + BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers), } } diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 44939c46e17d..dfa1d4f7daa7 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -135,12 +135,13 @@ var ( }, []string{"node_group_type"}, ) - unschedulablePodsCount = k8smetrics.NewGauge( + // Unschedulable pod count can be from scheduler-marked-unschedulable pods or not-yet-processed pods (unknown) + unschedulablePodsCount = k8smetrics.NewGaugeVec( &k8smetrics.GaugeOpts{ Namespace: caNamespace, Name: "unschedulable_pods_count", Help: "Number of unschedulable pods in the cluster.", - }, + }, []string{"type"}, ) maxNodesCount = k8smetrics.NewGauge( @@ -472,8 +473,14 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) { } // UpdateUnschedulablePodsCount records number of currently unschedulable pods -func UpdateUnschedulablePodsCount(podsCount int) { - unschedulablePodsCount.Set(float64(podsCount)) +func UpdateUnschedulablePodsCount(uschedulablePodsCount, schedulerUnprocessedCount int) { + UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount, "unschedulable") + UpdateUnschedulablePodsCountWithLabel(schedulerUnprocessedCount, "scheduler_unprocessed") +} + +// UpdateUnschedulablePodsCountWithLabel records number of currently unschedulable pods wil label "type" value "label" +func UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount int, label string) { + unschedulablePodsCount.WithLabelValues(label).Set(float64(uschedulablePodsCount)) } // UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index cbba21180682..b9be94b6e665 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -144,11 +144,36 @@ type PodLister interface { List() ([]*apiv1.Pod, error) } +// isScheduled checks whether a pod is scheduled on a node or not +// This method doesn't check for nil ptr, it's the responsibility of the caller +func isScheduled(pod *apiv1.Pod) bool { + return pod.Spec.NodeName != "" +} + +// isDeleted checks whether a pod is deleted not +// This method doesn't check for nil ptr, it's the responsibility of the caller +func isDeleted(pod *apiv1.Pod) bool { + return pod.GetDeletionTimestamp() != nil +} + +// isUnschedulable checks whether a pod is unschedulable or not +// This method doesn't check for nil ptr, it's the responsibility of the caller +func isUnschedulable(pod *apiv1.Pod) bool { + if isScheduled(pod) || isDeleted(pod) { + return false + } + _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if condition == nil || condition.Status != apiv1.ConditionFalse || condition.Reason != apiv1.PodReasonUnschedulable { + return false + } + return true +} + // ScheduledPods is a helper method that returns all scheduled pods from given pod list. func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { var scheduledPods []*apiv1.Pod for _, pod := range allPods { - if pod.Spec.NodeName != "" { + if isScheduled(pod) { scheduledPods = append(scheduledPods, pod) continue } @@ -156,18 +181,37 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod { return scheduledPods } +// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified bypassed schedulers +func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[string]bool) []*apiv1.Pod { + var unprocessedPods []*apiv1.Pod + + for _, pod := range allPods { + if canBypass := bypassedSchedulers[pod.Spec.SchedulerName]; !canBypass { + continue + } + // Make sure it's not scheduled or deleted + if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) { + continue + } + // Make sure that if it's not scheduled it's either + // Not processed (condition is nil) + // Or Reason is empty (not schedulerError, terminated, ...etc) + _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) + if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") { + unprocessedPods = append(unprocessedPods, pod) + } + } + return unprocessedPods +} + // UnschedulablePods is a helper method that returns all unschedulable pods from given pod list. func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod for _, pod := range allPods { - if pod.Spec.NodeName == "" { - _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) - if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable { - if pod.GetDeletionTimestamp() == nil { - unschedulablePods = append(unschedulablePods, pod) - } - } + if !isUnschedulable(pod) { + continue } + unschedulablePods = append(unschedulablePods, pod) } return unschedulablePods } diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 59d008db64d8..cd981aa72fbf 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -147,3 +147,16 @@ func ConfigFromPath(path string) (*scheduler_config.KubeSchedulerConfiguration, return cfgObj, nil } + +// GetBypassedSchedulersMap returns a map of scheduler names that should be bypassed as keys, and values are set to true +// Also sets "" (empty string) to true if default scheduler is bypassed +func GetBypassedSchedulersMap(bypassedSchedulers []string) map[string]bool { + bypassedSchedulersMap := make(map[string]bool, len(bypassedSchedulers)) + for _, scheduler := range bypassedSchedulers { + bypassedSchedulersMap[scheduler] = true + } + if canBypass := bypassedSchedulersMap[apiv1.DefaultSchedulerName]; canBypass { + bypassedSchedulersMap[""] = true + } + return bypassedSchedulersMap +} diff --git a/cluster-autoscaler/utils/test/test_utils.go b/cluster-autoscaler/utils/test/test_utils.go index c984c0a839ae..1b76b24b8bfb 100644 --- a/cluster-autoscaler/utils/test/test_utils.go +++ b/cluster-autoscaler/utils/test/test_utils.go @@ -82,6 +82,13 @@ func MarkUnschedulable() func(*apiv1.Pod) { } } +// AddSchedulerName adds scheduler name to a pod. +func AddSchedulerName(schedulerName string) func(*apiv1.Pod) { + return func(pod *apiv1.Pod) { + pod.Spec.SchedulerName = schedulerName + } +} + // BuildDSTestPod creates a DaemonSet pod with cpu and memory. func BuildDSTestPod(name string, cpu int64, mem int64) *apiv1.Pod {