From 5115f1263e44fefd164607c4276c852099ed6fcd Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Tue, 21 Nov 2023 19:03:31 +0000 Subject: [PATCH] Update static_autoscaler tests & handle pod list processors errors as warnings --- .../podlistprocessor/filter_out_expendable.go | 5 +- cluster-autoscaler/core/static_autoscaler.go | 6 +- .../core/static_autoscaler_test.go | 294 +++++++++--------- 3 files changed, 152 insertions(+), 153 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index d075044394f3..0ec929814a1a 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -17,6 +17,8 @@ limitations under the License. package podlistprocessor import ( + "fmt" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/context" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -36,8 +38,7 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { nodes, err := context.AllNodeLister().List() if err != nil { - klog.Warningf("Failed to list all nodes while filtering expendable: %v", err) - return nil, err + return nil, fmt.Errorf("Failed to list all nodes while filtering expendable pods: %v", err) } expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index a638789cf7e9..bdfad667ec7f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -506,7 +506,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l) } - unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods) + + if err != nil { + klog.Warningf("Failed to process unschedulable pods: %v", err) + } // finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable) unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 55906cf94350..dd63e6d90c5b 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -42,6 +42,7 @@ import ( core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" @@ -167,111 +168,122 @@ type scaleCall struct { ng string delta int } -type testCase struct { - nodeGroups []*nodeGroup - pods []*apiv1.Pod - podListerCallTimes int - daemonSets []*appsv1.DaemonSet - daemonSetListerCallTimes int - pdbs []*policyv1.PodDisruptionBudget - pdbListerCallTimes int - expectedScaleUps []*scaleCall - expectedScaleDowns []*scaleCall - now time.Time - lastScaleupTime time.Time - lastScaleDownFailTime time.Time - runAutoscalerAt time.Time - autoscalingOptions config.AutoscalingOptions - OkTotalUnreadyCount int + +type commonMocks struct { + readyNodeLister *kube_util.TestNodeLister + allNodeLister *kube_util.TestNodeLister + allPodLister *podListerMock + podDisruptionBudgetLister *podDisruptionBudgetListerMock + daemonSetLister *daemonSetListerMock + + onScaleUp *onScaleUpMock + onScaleDown *onScaleDownMock } -func testAutoscaler(t *testing.T, tc testCase) { - readyNodeLister := kubernetes.NewTestNodeLister(nil) - allNodeLister := kubernetes.NewTestNodeLister(nil) - allPodListerMock := &podListerMock{} - podDisruptionBudgetListerMock := &podDisruptionBudgetListerMock{} - daemonSetListerMock := &daemonSetListerMock{} - onScaleUpMock := &onScaleUpMock{} - onScaleDownMock := &onScaleDownMock{} - deleteFinished := make(chan bool, 1) +func newCommonMocks() *commonMocks { + return &commonMocks{ + readyNodeLister: kubernetes.NewTestNodeLister(nil), + allNodeLister: kubernetes.NewTestNodeLister(nil), + allPodLister: &podListerMock{}, + podDisruptionBudgetLister: &podDisruptionBudgetListerMock{}, + daemonSetLister: &daemonSetListerMock{}, + onScaleUp: &onScaleUpMock{}, + onScaleDown: &onScaleDownMock{}, + } +} + +type autoscalerSetupConfig struct { + nodeGroups []*nodeGroup + nodeStateUpdateTime time.Time + autoscalingOptions config.AutoscalingOptions + clusterStateConfig clusterstate.ClusterStateRegistryConfig + mocks *commonMocks + nodesDeleted chan bool +} +func setupCloudProvider(config *autoscalerSetupConfig) (*testprovider.TestCloudProvider, error) { provider := testprovider.NewTestCloudProvider( func(id string, delta int) error { - return onScaleUpMock.ScaleUp(id, delta) + return config.mocks.onScaleUp.ScaleUp(id, delta) }, func(id string, name string) error { - ret := onScaleDownMock.ScaleDown(id, name) - deleteFinished <- true + ret := config.mocks.onScaleDown.ScaleDown(id, name) + config.nodesDeleted <- true return ret }) - allNodes := make([]*apiv1.Node, 0) - for _, ng := range tc.nodeGroups { + for _, ng := range config.nodeGroups { provider.AddNodeGroup(ng.name, ng.min, ng.max, len(ng.nodes)) for _, node := range ng.nodes { - allNodes = append(allNodes, node) provider.AddNode(ng.name, node) } reflectedNg := reflect.ValueOf(provider.GetNodeGroup(ng.name)).Interface().(*testprovider.TestNodeGroup) - assert.NotNil(t, reflectedNg) + if reflectedNg == nil { + return nil, fmt.Errorf("Nodegroup '%v' found as nil after setting up cloud provider", ng.name) + } + } + return provider, nil +} + +func setupAutoscalingContext(opts config.AutoscalingOptions, provider cloudprovider.CloudProvider, processorCallbacks callbacks.ProcessorCallbacks) (context.AutoscalingContext, error) { + context, err := NewScaleTestAutoscalingContext(opts, &fake.Clientset{}, nil, provider, processorCallbacks, nil) + if err != nil { + return context, err + } + return context, nil +} + +func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { + provider, err := setupCloudProvider(config) + if err != nil { + return nil, err } + + allNodes := make([]*apiv1.Node, 0) + for _, ng := range config.nodeGroups { + allNodes = append(allNodes, ng.nodes...) + } + // Create context with mocked lister registry. processorCallbacks := newStaticAutoscalerProcessorCallbacks() - context, err := NewScaleTestAutoscalingContext(tc.autoscalingOptions, &fake.Clientset{}, nil, provider, processorCallbacks, nil) - assert.NoError(t, err) + context, err := setupAutoscalingContext(config.autoscalingOptions, provider, processorCallbacks) - setUpScaleDownActuator(&context, tc.autoscalingOptions) + if err != nil { + return nil, err + } - listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, + setUpScaleDownActuator(&context, config.autoscalingOptions) + + listerRegistry := kube_util.NewListerRegistry(config.mocks.allNodeLister, config.mocks.readyNodeLister, config.mocks.allPodLister, + config.mocks.podDisruptionBudgetLister, config.mocks.daemonSetLister, nil, nil, nil, nil) context.ListerRegistry = listerRegistry - clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ - OkTotalUnreadyCount: tc.OkTotalUnreadyCount, - } - clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(tc.autoscalingOptions.NodeGroupDefaults)) + ngConfigProcesssor := nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.autoscalingOptions.NodeGroupDefaults) + clusterState := clusterstate.NewClusterStateRegistry(provider, config.clusterStateConfig, context.LogRecorder, NewBackoff(), ngConfigProcesssor) + + clusterState.UpdateNodes(allNodes, nil, config.nodeStateUpdateTime) - clusterState.UpdateNodes(allNodes, nil, tc.now) processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) autoscaler := &StaticAutoscaler{ - AutoscalingContext: &context, - clusterStateRegistry: clusterState, - lastScaleUpTime: tc.lastScaleupTime, - lastScaleDownFailTime: tc.lastScaleDownFailTime, - scaleDownPlanner: sdPlanner, - scaleDownActuator: sdActuator, - scaleUpOrchestrator: suOrchestrator, - processors: processors, - processorCallbacks: processorCallbacks, - } - - // Assummes all nodes are ready, to be updated when used in tests which needs non-ready nodes - readyNodeLister.SetNodes(allNodes) - allNodeLister.SetNodes(allNodes) - allPodListerMock.On("List").Return(tc.pods, nil).Times(tc.podListerCallTimes) - daemonSetListerMock.On("List", labels.Everything()).Return(tc.daemonSets, nil).Times(tc.daemonSetListerCallTimes) - podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Times(tc.pdbListerCallTimes) - - for _, scaleUpCall := range tc.expectedScaleUps { - onScaleUpMock.On("ScaleUp", scaleUpCall.ng, scaleUpCall.delta).Return(nil).Once() - } - for _, scaleDownCall := range tc.expectedScaleDowns { - onScaleDownMock.On("ScaleDown", scaleDownCall.ng, scaleDownCall.delta).Return(nil).Once() + AutoscalingContext: &context, + clusterStateRegistry: clusterState, + scaleDownPlanner: sdPlanner, + scaleDownActuator: sdActuator, + scaleUpOrchestrator: suOrchestrator, + processors: processors, + processorCallbacks: processorCallbacks, } - err = autoscaler.RunOnce(tc.runAutoscalerAt) - assert.NoError(t, err) - mock.AssertExpectationsForObjects(t, allPodListerMock, - podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) + return autoscaler, nil } -// TODO: Refactor tests to use testAutoscaler +// TODO: Refactor tests to use setupAutoscaler func TestStaticAutoscalerRunOnce(t *testing.T) { readyNodeLister := kubernetes.NewTestNodeLister(nil) @@ -345,7 +357,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -554,7 +566,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -704,7 +716,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -852,7 +864,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, taints.TaintConfig{}) @@ -983,7 +995,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1081,7 +1093,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(t, &context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1108,7 +1120,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } -func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { +func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) { bypassedScheduler := "bypassed-scheduler" nonBypassedScheduler := "non-bypassed-scheduler" options := config.AutoscalingOptions{ @@ -1128,101 +1140,83 @@ func TestStaticAutoscalerRunOnceWithSchedulerProcessingIgnored(t *testing.T) { bypassedScheduler, }), } + now := time.Now() n1 := BuildTestNode("n1", 1000, 1000) - SetNodeReadyState(n1, true, time.Now()) + SetNodeReadyState(n1, true, now) + + ngs := []*nodeGroup{{ + name: "ng1", + min: 1, + max: 10, + nodes: []*apiv1.Node{n1}, + }} p1 := BuildTestPod("p1", 600, 100) p1.Spec.NodeName = "n1" p2 := BuildTestPod("p2", 100, 100, AddSchedulerName(bypassedScheduler)) - p3 := BuildTestPod("p3", 600, 100, AddSchedulerName(apiv1.DefaultSchedulerName)) // Not yet processed by scheduler, default scheduler is ignored - p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up + p3 := BuildTestPod("p3", 600, 100) // Not yet processed by scheduler, default scheduler is ignored + p4 := BuildTestPod("p4", 600, 100, AddSchedulerName(bypassedScheduler)) // non-default scheduler & ignored, expects a scale-up p5 := BuildTestPod("p5", 600, 100, AddSchedulerName(nonBypassedScheduler)) - testCases := map[string]testCase{ + testSetupConfig := &autoscalerSetupConfig{ + autoscalingOptions: options, + nodeGroups: ngs, + nodeStateUpdateTime: now, + mocks: newCommonMocks(), + clusterStateConfig: clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + }, + } + + testCases := map[string]struct { + setupConfig *autoscalerSetupConfig + pods []*apiv1.Pod + expectedScaleUp *scaleCall + }{ "Unprocessed pod with bypassed scheduler doesn't cause a scale-up when there's capacity": { - pods: []*apiv1.Pod{p1, p2}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + pods: []*apiv1.Pod{p1, p2}, + setupConfig: testSetupConfig, }, "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Default Scheduler": { - pods: []*apiv1.Pod{p1, p3}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{{ + pods: []*apiv1.Pod{p1, p3}, + expectedScaleUp: &scaleCall{ ng: "ng1", delta: 1, - }}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + }, + setupConfig: testSetupConfig, }, "Unprocessed pod with bypassed scheduler causes a scale-up when there's no capacity - Non-default Scheduler": { - pods: []*apiv1.Pod{p1, p4}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{{ + pods: []*apiv1.Pod{p1, p4}, + setupConfig: testSetupConfig, + expectedScaleUp: &scaleCall{ ng: "ng1", delta: 1, - }}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + }, }, "Unprocessed pod with non-bypassed scheduler doesn't cause a scale-up when there's no capacity": { - pods: []*apiv1.Pod{p1, p5}, - podListerCallTimes: 2, - nodeGroups: []*nodeGroup{{ - name: "ng1", - min: 1, - max: 10, - nodes: []*apiv1.Node{n1}, - }}, - daemonSetListerCallTimes: 1, - pdbListerCallTimes: 1, - expectedScaleUps: []*scaleCall{}, - now: time.Now(), - lastScaleupTime: time.Now(), - lastScaleDownFailTime: time.Now(), - runAutoscalerAt: time.Now().Add(time.Hour), - autoscalingOptions: options, + pods: []*apiv1.Pod{p1, p5}, + setupConfig: testSetupConfig, }, } for tcName, tc := range testCases { t.Run(tcName, func(t *testing.T) { - testAutoscaler(t, tc) + autoscaler, err := setupAutoscaler(tc.setupConfig) + assert.NoError(t, err) + + tc.setupConfig.mocks.readyNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allNodeLister.SetNodes([]*apiv1.Node{n1}) + tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice() + tc.setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() + tc.setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() + if tc.expectedScaleUp != nil { + tc.setupConfig.mocks.onScaleUp.On("ScaleUp", tc.expectedScaleUp.ng, tc.expectedScaleUp.delta).Return(nil).Once() + } + err = autoscaler.RunOnce(now.Add(time.Hour)) + assert.NoError(t, err) + mock.AssertExpectationsForObjects(t, tc.setupConfig.mocks.allPodLister, + tc.setupConfig.mocks.podDisruptionBudgetLister, tc.setupConfig.mocks.daemonSetLister, tc.setupConfig.mocks.onScaleUp, tc.setupConfig.mocks.onScaleDown) }) } @@ -2081,7 +2075,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second