From 24ae6d4e9818b5fc5ff93bd1d0b205403a3d3a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Mon, 30 Sep 2024 19:08:24 +0200 Subject: [PATCH] TMP squash: #7466 DONOTSUBMIT --- .../podlistprocessor/filter_out_expendable.go | 4 +- .../filter_out_expendable_test.go | 4 +- .../filter_out_schedulable_test.go | 16 +-- .../core/scaledown/actuation/actuator.go | 19 +-- .../core/scaledown/actuation/actuator_test.go | 5 +- .../core/scaledown/actuation/drain_test.go | 3 +- .../core/scaleup/orchestrator/orchestrator.go | 6 +- cluster-autoscaler/core/static_autoscaler.go | 44 ++----- .../estimator/binpacking_estimator.go | 10 +- .../estimator/binpacking_estimator_test.go | 6 +- .../pod_injection_processor_test.go | 7 +- cluster-autoscaler/simulator/cluster.go | 4 +- .../simulator/clustersnapshot/basic.go | 61 ++++----- .../clustersnapshot/clustersnapshot.go | 28 ++-- .../clustersnapshot_benchmark_test.go | 63 ++++----- .../clustersnapshot/clustersnapshot_test.go | 121 +++++++++--------- .../simulator/clustersnapshot/delta.go | 64 ++++----- .../simulator/clustersnapshot/test_utils.go | 10 +- .../predicatechecker/schedulerbased_test.go | 9 +- .../simulator/scheduling/hinting_simulator.go | 2 +- 20 files changed, 199 insertions(+), 287 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index 0ec929814a1a..550f8a10520f 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -23,7 +23,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) type filterOutExpendable struct { @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods // CA logic from before migration to scheduler framework. So let's keep it for now func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { for _, p := range pods { - if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { + if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil { klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) return caerrors.ToAutoscalerError(caerrors.InternalError, err) } diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go index 458f633c7152..94f6915e3028 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) { t.Run(tc.name, func(t *testing.T) { processor := NewFilterOutExpendablePodListProcessor() snapshot := clustersnapshot.NewBasicClusterSnapshot() - snapshot.AddNodes(tc.nodes) + err := snapshot.SetClusterState(tc.nodes, nil) + assert.NoError(t, err) pods, err := processor.Process(&context.AutoscalingContext{ ClusterSnapshot: snapshot, diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index e02e0b9c0bb6..7b0054f9a2f2 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" @@ -183,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) { allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...) for node, pods := range tc.nodesWithPods { - err := clusterSnapshot.AddNode(node) - assert.NoError(t, err) - for _, pod := range pods { pod.Spec.NodeName = node.Name - err = clusterSnapshot.AddPod(pod, node.Name) - assert.NoError(t, err) - allExpectedScheduledPods = append(allExpectedScheduledPods, pod) } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) + assert.NoError(t, err) } clusterSnapshot.Fork() @@ -286,15 +283,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { assert.NoError(b, err) clusterSnapshot := snapshotFactory() - if err := clusterSnapshot.AddNodes(nodes); err != nil { + if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil { assert.NoError(b, err) } - for _, pod := range scheduledPods { - if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - assert.NoError(b, err) - } - } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index b02e2016aaab..a85410172684 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { - knownNodes := make(map[string]bool) snapshot := clustersnapshot.NewBasicClusterSnapshot() pods, err := a.ctx.AllPodLister().List() if err != nil { @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) - for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { - return nil, err - } - - knownNodes[node.Name] = true - } - - for _, pod := range nonExpendableScheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - return nil, err - } - } + err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods) + if err != nil { + return nil, err } - return snapshot, nil } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 2f48e498c8ee..6f44abaf06b6 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -1159,7 +1160,7 @@ func TestStartDeletion(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) for _, bucket := range emptyNodeGroupViews { for _, node := range bucket.Nodes { - err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name]) + err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...)) if err != nil { t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err) } @@ -1171,7 +1172,7 @@ func TestStartDeletion(t *testing.T) { if !found { t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name) } - err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods) + err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) if err != nil { t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err) } diff --git a/cluster-autoscaler/core/scaledown/actuation/drain_test.go b/cluster-autoscaler/core/scaledown/actuation/drain_test.go index 7205afe4a975..6ba905761db5 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain_test.go @@ -37,6 +37,7 @@ import ( . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -612,7 +613,7 @@ func TestPodsToEvict(t *testing.T) { t.Run(tn, func(t *testing.T) { snapshot := clustersnapshot.NewBasicClusterSnapshot() node := BuildTestNode("test-node", 1000, 1000) - err := snapshot.AddNodeWithPods(node, tc.pods) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...)) if err != nil { t.Errorf("AddNodeWithPods unexpected error: %v", err) } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index dd4b53a241ac..8eb316c594b2 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -569,11 +569,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups( defer o.autoscalingContext.ClusterSnapshot.Revert() // Add test node to snapshot. - var allPods []*apiv1.Pod - for _, podInfo := range nodeInfo.Pods() { - allPods = append(allPods, podInfo.Pod) - } - if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil { + if err := o.autoscalingContext.ClusterSnapshot.AddNodeInfo(nodeInfo); err != nil { klog.Errorf("Error while adding test Node: %v", err) return []estimator.PodEquivalenceGroup{} } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index f14e8db7f681..6902b13df35c 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -34,7 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" - orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" @@ -58,7 +58,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) const ( @@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { a.initialized = true } -func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError { - a.ClusterSnapshot.Clear() - - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := a.ClusterSnapshot.AddNode(node); err != nil { - klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - knownNodes[node.Name] = true - } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - } - } - return nil -} - func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError { a.RemainingPdbTracker.Clear() @@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) // Initialize cluster state to ClusterSnapshot - if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil { - return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ") + if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil { + return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ") } // Initialize Pod Disruption Budget tracking if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil { @@ -486,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming) // Remove the nodes from the snapshot as well so that the state is consistent. for _, notStartedNodeName := range allRegisteredUpcoming { - err := a.ClusterSnapshot.RemoveNode(notStartedNodeName) + err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName) if err != nil { klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err) // ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the @@ -682,20 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[ nodeGroups := a.nodeGroupsById() upcomingNodeGroups := make(map[string]int) upcomingNodesFromUpcomingNodeGroups := 0 - for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { + for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { nodeGroup := nodeGroups[nodeGroupName] if nodeGroup == nil { return fmt.Errorf("failed to find node group: %s", nodeGroupName) } isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup) - for _, upcomingNode := range upcomingNodes { - var pods []*apiv1.Pod - for _, podInfo := range upcomingNode.Pods() { - pods = append(pods, podInfo.Pod) - } - err := a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods) + for _, upcomingNodeInfo := range upcomingNodeInfos { + err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo) if err != nil { - return fmt.Errorf("Failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err) + return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err) } if isUpcomingNodeGroup { upcomingNodesFromUpcomingNodeGroups++ diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 6ffad3800df6..55e1de431997 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -25,7 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods. @@ -211,11 +211,7 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( template *framework.NodeInfo, ) error { newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex)) - var pods []*apiv1.Pod - for _, podInfo := range newNodeInfo.Pods() { - pods = append(pods, podInfo.Pod) - } - if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { + if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil { return err } estimationState.newNodeNameIndex++ @@ -229,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode( pod *apiv1.Pod, nodeName string, ) error { - if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err) } estimationState.newNodesWithPods[nodeName] = true diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index e0fa48aeda10..e0205ffdc854 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() // Add one node in different zone to trigger topology spread constraints - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(t, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(b, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(b, err) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index d2f96b244585..13a98c8d78c8 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot() - clusterSnapshot.AddNode(node) - for _, pod := range tc.scheduledPods { - clusterSnapshot.AddPod(pod, node.Name) - } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) + assert.NoError(t, err) ctx := context.AutoscalingContext{ AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 6855ae5efb95..e81d9ecea0ff 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -32,7 +32,7 @@ import ( kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // NodeToBeRemoved contain information about a node that can be removed. @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n // remove pods from clusterSnapshot first for _, pod := range pods { - if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil { + if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil { // just log error klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index ce083894f64e..be8388c5ce8b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -153,16 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error { return nil } -func (data *internalBasicSnapshotData) addNodes(nodes []*apiv1.Node) error { - for _, node := range nodes { - if err := data.addNode(node); err != nil { - return err - } - } - return nil -} - -func (data *internalBasicSnapshotData) removeNode(nodeName string) error { +func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { return ErrNodeNotFound } @@ -205,7 +196,7 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st // NewBasicClusterSnapshot creates instances of BasicClusterSnapshot. func NewBasicClusterSnapshot() *BasicClusterSnapshot { snapshot := &BasicClusterSnapshot{} - snapshot.Clear() + snapshot.clear() return snapshot } @@ -241,41 +232,39 @@ func (snapshot *BasicClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) return nil } -// AddNode adds node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.getInternalData().addNode(node) -} - -// AddNodes adds nodes in batch to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNodes(nodes []*apiv1.Node) error { - return snapshot.getInternalData().addNodes(nodes) -} +// SetClusterState sets the cluster state. +func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error { + snapshot.clear() -// AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error { - if err := snapshot.AddNode(node); err != nil { - return err - } - for _, pod := range pods { - if err := snapshot.AddPod(pod, node.Name); err != nil { + knownNodes := make(map[string]bool) + for _, node := range nodes { + if err := snapshot.getInternalData().addNode(node); err != nil { return err } + knownNodes[node.Name] = true + } + for _, pod := range scheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil { + return err + } + } } return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.getInternalData().removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.getInternalData().removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.getInternalData().addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.getInternalData().removePod(namespace, podName, nodeName) } @@ -308,8 +297,8 @@ func (snapshot *BasicClusterSnapshot) Commit() error { return nil } -// Clear reset cluster snapshot to empty, unforked state -func (snapshot *BasicClusterSnapshot) Clear() { +// clear reset cluster snapshot to empty, unforked state +func (snapshot *BasicClusterSnapshot) clear() { baseData := newInternalBasicSnapshotData() snapshot.data = []*internalBasicSnapshotData{baseData} } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index a80c85c22d22..1c60fcc0b730 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -29,24 +29,22 @@ import ( // It exposes mutation methods and can be viewed as scheduler's SharedLister. type ClusterSnapshot interface { schedulerframework.SharedLister - // AddNode adds node to the snapshot. - AddNode(node *apiv1.Node) error - // AddNodes adds nodes to the snapshot. - AddNodes(nodes []*apiv1.Node) error - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. - RemoveNode(nodeName string) error - // AddPod adds pod to the snapshot and schedules it to given node. - AddPod(pod *apiv1.Pod, nodeName string) error - // RemovePod removes pod from the snapshot. - RemovePod(namespace string, podName string, nodeName string) error - // AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. - AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error - // IsPVCUsedByPods returns if the pvc is used by any pod, key = / - IsPVCUsedByPods(key string) bool + + // SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot + // with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName. + SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error + + // ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot. + ForceAddPod(pod *apiv1.Pod, nodeName string) error + // ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot. + ForceRemovePod(namespace string, podName string, nodeName string) error // AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as // any DRA objects passed along them. AddNodeInfo(nodeInfo *framework.NodeInfo) error + // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as + // any DRA objects owned by them. + RemoveNodeInfo(nodeName string) error // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. @@ -61,8 +59,6 @@ type ClusterSnapshot interface { Revert() // Commit commits changes done after forking. Commit() error - // Clear reset cluster snapshot to empty, unforked state. - Clear() } // ErrNodeNotFound means that a node wasn't found in the snapshot. diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go index cf851773537e..fb6468adad6f 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go @@ -22,6 +22,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" apiv1 "k8s.io/api/core/v1" @@ -67,7 +69,7 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) { } } -func BenchmarkAddNodes(b *testing.B) { +func BenchmarkAddNodeInfo(b *testing.B) { testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000} for snapshotName, snapshotFactory := range snapshots { @@ -75,13 +77,13 @@ func BenchmarkAddNodes(b *testing.B) { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - clusterSnapshot.Clear() + assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil)) b.StartTimer() for _, node := range nodes { - err := clusterSnapshot.AddNode(node) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) if err != nil { assert.NoError(b, err) } @@ -90,24 +92,6 @@ func BenchmarkAddNodes(b *testing.B) { }) } } - for snapshotName, snapshotFactory := range snapshots { - for _, tc := range testCases { - nodes := createTestNodes(tc) - clusterSnapshot := snapshotFactory() - b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNodes() %d", snapshotName, tc), func(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - clusterSnapshot.Clear() - b.StartTimer() - err := clusterSnapshot.AddNodes(nodes) - if err != nil { - assert.NoError(b, err) - } - } - }) - } - } } func BenchmarkListNodeInfos(b *testing.B) { @@ -117,7 +101,7 @@ func BenchmarkListNodeInfos(b *testing.B) { for _, tc := range testCases { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() - err := clusterSnapshot.AddNodes(nodes) + err := clusterSnapshot.SetClusterState(nodes, nil) if err != nil { assert.NoError(b, err) } @@ -142,25 +126,24 @@ func BenchmarkAddPods(b *testing.B) { for snapshotName, snapshotFactory := range snapshots { for _, tc := range testCases { - clusterSnapshot := snapshotFactory() nodes := createTestNodes(tc) - err := clusterSnapshot.AddNodes(nodes) - assert.NoError(b, err) pods := createTestPods(tc * 30) assignPodsToNodes(pods, nodes) + clusterSnapshot := snapshotFactory() + err := clusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(b, err) b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - clusterSnapshot.Clear() - err = clusterSnapshot.AddNodes(nodes) + err = clusterSnapshot.SetClusterState(nodes, nil) if err != nil { assert.NoError(b, err) } b.StartTimer() for _, pod := range pods { - err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName) + err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName) if err != nil { assert.NoError(b, err) } @@ -182,24 +165,20 @@ func BenchmarkForkAddRevert(b *testing.B) { pods := createTestPods(ntc * ptc) assignPodsToNodes(pods, nodes) clusterSnapshot := snapshotFactory() - err := clusterSnapshot.AddNodes(nodes) + err := clusterSnapshot.SetClusterState(nodes, pods) assert.NoError(b, err) - for _, pod := range pods { - err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName) - assert.NoError(b, err) - } tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000) tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000) b.ResetTimer() b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode1) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1)) if err != nil { assert.NoError(b, err) } clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode2) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2)) if err != nil { assert.NoError(b, err) } @@ -234,12 +213,14 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := createTestNodes(tc.nodeCount + 1000) snapshot := NewDeltaClusterSnapshot() - if err := snapshot.AddNodes(nodes[:tc.nodeCount]); err != nil { + if err := snapshot.SetClusterState(nodes[:tc.nodeCount], nil); err != nil { assert.NoError(b, err) } snapshot.Fork() - if err := snapshot.AddNodes(nodes[tc.nodeCount:]); err != nil { - assert.NoError(b, err) + for _, node := range nodes[tc.nodeCount:] { + if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { + assert.NoError(b, err) + } } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -254,7 +235,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := createTestNodes(tc.nodeCount) snapshot := NewDeltaClusterSnapshot() - if err := snapshot.AddNodes(nodes); err != nil { + if err := snapshot.SetClusterState(nodes, nil); err != nil { assert.NoError(b, err) } b.ResetTimer() diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go index f9ce65162580..4eeb67253558 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go @@ -75,12 +75,8 @@ func getSnapshotState(t *testing.T, snapshot ClusterSnapshot) snapshotState { func startSnapshot(t *testing.T, snapshotFactory func() ClusterSnapshot, state snapshotState) ClusterSnapshot { snapshot := snapshotFactory() - err := snapshot.AddNodes(state.nodes) + err := snapshot.SetClusterState(state.nodes, state.pods) assert.NoError(t, err) - for _, pod := range state.pods { - err := snapshot.AddPod(pod, pod.Spec.NodeName) - assert.NoError(t, err) - } return snapshot } @@ -98,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase { testCases := []modificationTestCase{ { - name: "add node", + name: "add empty nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -108,9 +104,9 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "add node with pods", + name: "add nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -119,25 +115,25 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "remove node", + name: "remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, { - name: "remove node, then add it back", + name: "remove nodeInfo, then add it back", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -145,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "add pod, then remove node", + name: "add pod, then remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.AddPod(pod, node.Name) + err := snapshot.ForceAddPod(pod, node.Name) assert.NoError(t, err) - err = snapshot.RemoveNode(node.Name) + err = snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, @@ -203,7 +199,7 @@ func TestForking(t *testing.T) { tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() snapshot.Revert() @@ -247,7 +243,7 @@ func TestForking(t *testing.T) { snapshot.Fork() tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() err := snapshot.Commit() assert.NoError(t, err) @@ -279,7 +275,7 @@ func TestForking(t *testing.T) { } } -func TestClear(t *testing.T) { +func TestSetClusterState(t *testing.T) { // Run with -count=1 to avoid caching. localRand := rand.New(rand.NewSource(time.Now().Unix())) @@ -313,10 +309,21 @@ func TestClear(t *testing.T) { snapshot := startSnapshot(t, snapshotFactory, state) compareStates(t, state, getSnapshotState(t, snapshot)) - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) compareStates(t, snapshotState{}, getSnapshotState(t, snapshot)) }) + t.Run(fmt.Sprintf("%s: clear base %d nodes %d pods and set a new state", name, nodeCount, podCount), + func(t *testing.T) { + snapshot := startSnapshot(t, snapshotFactory, state) + compareStates(t, state, getSnapshotState(t, snapshot)) + + newNodes, newPods := createTestNodes(13), createTestPods(37) + assignPodsToNodes(newPods, newNodes) + assert.NoError(t, snapshot.SetClusterState(newNodes, newPods)) + + compareStates(t, snapshotState{nodes: newNodes, pods: newPods}, getSnapshotState(t, snapshot)) + }) t.Run(fmt.Sprintf("%s: clear fork %d nodes %d pods %d extra nodes %d extra pods", name, nodeCount, podCount, extraNodeCount, extraPodCount), func(t *testing.T) { snapshot := startSnapshot(t, snapshotFactory, state) @@ -324,23 +331,24 @@ func TestClear(t *testing.T) { snapshot.Fork() - err := snapshot.AddNodes(extraNodes) - assert.NoError(t, err) + for _, node := range extraNodes { + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) + assert.NoError(t, err) + } for _, pod := range extraPods { - err := snapshot.AddPod(pod, pod.Spec.NodeName) + err := snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err) } compareStates(t, snapshotState{allNodes, allPods}, getSnapshotState(t, snapshot)) - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) compareStates(t, snapshotState{}, getSnapshotState(t, snapshot)) - // Clear() should break out of forked state. + // SetClusterState() should break out of forked state. snapshot.Fork() - assert.NoError(t, err) }) } } @@ -352,17 +360,17 @@ func TestNode404(t *testing.T) { op func(ClusterSnapshot) error }{ {"add pod", func(snapshot ClusterSnapshot) error { - return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node") + return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node") }}, {"remove pod", func(snapshot ClusterSnapshot) error { - return snapshot.RemovePod("default", "p1", "node") + return snapshot.ForceRemovePod("default", "p1", "node") }}, {"get node", func(snapshot ClusterSnapshot) error { _, err := snapshot.NodeInfos().Get("node") return err }}, - {"remove node", func(snapshot ClusterSnapshot) error { - return snapshot.RemoveNode("node") + {"remove nodeInfo", func(snapshot ClusterSnapshot) error { + return snapshot.RemoveNodeInfo("node") }}, } @@ -382,13 +390,13 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted after fork - shouldn't be able to operate on it. @@ -408,10 +416,10 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted from base - shouldn't be able to operate on it. @@ -431,11 +439,8 @@ func TestNodeAlreadyExists(t *testing.T) { name string op func(ClusterSnapshot) error }{ - {"add node", func(snapshot ClusterSnapshot) error { - return snapshot.AddNode(node) - }}, - {"add node with pod", func(snapshot ClusterSnapshot) error { - return snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod}) + {"add nodeInfo", func(snapshot ClusterSnapshot) error { + return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) }}, } @@ -445,7 +450,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in base. @@ -457,7 +462,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -474,7 +479,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in fork. @@ -487,7 +492,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.Commit() @@ -624,17 +629,17 @@ func TestPVCUsedByPods(t *testing.T) { for _, tc := range testcase { t.Run(fmt.Sprintf("%s with snapshot (%s)", tc.desc, snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(tc.node, tc.pods) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(tc.node, tc.pods...)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) assert.Equal(t, tc.exists, volumeExists) if tc.removePod != "" { - err = snapshot.RemovePod("default", tc.removePod, "node") + err = snapshot.ForceRemovePod("default", tc.removePod, "node") assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) assert.Equal(t, tc.existsAfterRemove, volumeExists) } }) @@ -694,38 +699,38 @@ func TestPVCClearAndFork(t *testing.T) { for snapshotName, snapshotFactory := range snapshots { t.Run(fmt.Sprintf("fork and revert snapshot with pvc pods with snapshot: %s", snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod1}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod1)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) snapshot.Fork() assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) - err = snapshot.AddPod(pod2, "node") + err = snapshot.ForceAddPod(pod2, "node") assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) assert.Equal(t, true, volumeExists) snapshot.Revert() - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) assert.Equal(t, false, volumeExists) }) t.Run(fmt.Sprintf("clear snapshot with pvc pods with snapshot: %s", snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod1}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod1)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) - snapshot.Clear() - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + assert.NoError(t, snapshot.SetClusterState(nil, nil)) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, false, volumeExists) }) diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index 9559b43dbb78..869e494e0226 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -136,16 +136,6 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework return nodeInfoList } -// Convenience method to avoid writing loop for adding nodes. -func (data *internalDeltaSnapshotData) addNodes(nodes []*apiv1.Node) error { - for _, node := range nodes { - if err := data.addNode(node); err != nil { - return err - } - } - return nil -} - func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error { nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) @@ -187,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() { data.pvcNamespaceMap = nil } -func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { +func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error { _, foundInDelta := data.addedNodeInfoMap[nodeName] if foundInDelta { // If node was added within this delta, delete this change. @@ -306,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err return data, nil } for node := range data.deletedNodeInfos { - if err := data.baseData.removeNode(node); err != nil { + if err := data.baseData.removeNodeInfo(node); err != nil { return nil, err } } for _, node := range data.modifiedNodeInfoMap { - if err := data.baseData.removeNode(node.Node().Name); err != nil { + if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil { return nil, err } if err := data.baseData.addNodeInfo(node); err != nil { @@ -399,7 +389,7 @@ func (snapshot *DeltaClusterSnapshot) StorageInfos() schedulerframework.StorageI // NewDeltaClusterSnapshot creates instances of DeltaClusterSnapshot. func NewDeltaClusterSnapshot() *DeltaClusterSnapshot { snapshot := &DeltaClusterSnapshot{} - snapshot.Clear() + snapshot.clear() return snapshot } @@ -431,41 +421,39 @@ func (snapshot *DeltaClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) return nil } -// AddNode adds node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.data.addNode(node) -} - -// AddNodes adds nodes in batch to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNodes(nodes []*apiv1.Node) error { - return snapshot.data.addNodes(nodes) -} +// SetClusterState sets the cluster state. +func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error { + snapshot.clear() -// AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error { - if err := snapshot.AddNode(node); err != nil { - return err - } - for _, pod := range pods { - if err := snapshot.AddPod(pod, node.Name); err != nil { + knownNodes := make(map[string]bool) + for _, node := range nodes { + if err := snapshot.data.addNode(node); err != nil { return err } + knownNodes[node.Name] = true + } + for _, pod := range scheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { + return err + } + } } return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.data.removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.data.removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.data.addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.data.removePod(namespace, podName, nodeName) } @@ -501,6 +489,6 @@ func (snapshot *DeltaClusterSnapshot) Commit() error { // Clear reset cluster snapshot to empty, unforked state // Time: O(1) -func (snapshot *DeltaClusterSnapshot) Clear() { +func (snapshot *DeltaClusterSnapshot) clear() { snapshot.data = newInternalDeltaSnapshotData() } diff --git a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go index 501756fe2438..f0cd8c67546e 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go +++ b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go @@ -20,7 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" ) // InitializeClusterSnapshotOrDie clears cluster snapshot and then initializes it with given set of nodes and pods. @@ -32,19 +34,19 @@ func InitializeClusterSnapshotOrDie( pods []*apiv1.Pod) { var err error - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) for _, node := range nodes { - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err, "error while adding node %s", node.Name) } for _, pod := range pods { if pod.Spec.NodeName != "" { - err = snapshot.AddPod(pod, pod.Spec.NodeName) + err = snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName) } else if pod.Status.NominatedNodeName != "" { - err = snapshot.AddPod(pod, pod.Status.NominatedNodeName) + err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName) assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName) } else { assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name) diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go index 44b7ebf60d00..d5423777f711 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go @@ -24,6 +24,7 @@ import ( testconfig "k8s.io/autoscaler/cluster-autoscaler/config/test" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" scheduler "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -147,7 +148,7 @@ func TestCheckPredicate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNodeWithPods(tt.node, tt.scheduledPods) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) @@ -247,9 +248,9 @@ func TestFitsAnyNode(t *testing.T) { } clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(n1000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNode(n2000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { @@ -285,7 +286,7 @@ func TestDebugInfo(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err := clusterSnapshot.AddNode(node1) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) // with default predicate checker diff --git a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go index 2287d28810e4..2f24bb8bf4ba 100644 --- a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go +++ b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go @@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust if nodeName != "" { klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName) - if err := clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err) } statuses = append(statuses, Status{Pod: pod, NodeName: nodeName})