diff --git a/hack/kwok/requirements.sh b/hack/kwok/requirements.sh index 0dccc4ec35..10fb877fa3 100755 --- a/hack/kwok/requirements.sh +++ b/hack/kwok/requirements.sh @@ -15,3 +15,7 @@ for Version in $(seq 0 1); do yqVersion="$Version" yq eval '.spec.versions[env(yqVersion)].schema.openAPIV3Schema.properties.spec.properties.template.properties.spec.properties.requirements.items.properties.key.x-kubernetes-validations += [ {"message": "label domain \"karpenter.kwok.sh\" is restricted", "rule": "self in [\"karpenter.kwok.sh/instance-cpu\", \"karpenter.kwok.sh/instance-memory\", \"karpenter.kwok.sh/instance-family\", \"karpenter.kwok.sh/instance-size\"] || !self.find(\"^([^/]+)\").endsWith(\"karpenter.kwok.sh\")"}]' -i kwok/charts/crds/karpenter.sh_nodepools.yaml done + + +# Add ExampleReason in KwoK CloudProvider +yq eval '.spec.versions[0].schema.openAPIV3Schema.properties.spec.properties.disruption.properties.budgets.items.properties.reasons.items.enum += [ "ExampleReason" ]' -i kwok/charts/crds/karpenter.sh_nodepools.yaml \ No newline at end of file diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index cbfcf0ab42..e589b9c672 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -112,13 +112,16 @@ spec: description: |- Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. Otherwise, this will apply to each reason defined. - allowed reasons are Underutilized, Empty, and Drifted. + allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. items: - description: DisruptionReason defines valid reasons for disruption budgets. + description: |- + DisruptionReason defines valid reasons for disruption budgets. + CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons enum: - Underutilized - Empty - Drifted + - ExampleReason type: string type: array schedule: diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index cd2b99ed11..ec337f277a 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -112,9 +112,11 @@ spec: description: |- Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. Otherwise, this will apply to each reason defined. - allowed reasons are Underutilized, Empty, and Drifted. + allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. items: - description: DisruptionReason defines valid reasons for disruption budgets. + description: |- + DisruptionReason defines valid reasons for disruption budgets. + CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons enum: - Underutilized - Empty diff --git a/pkg/apis/v1/nodepool.go b/pkg/apis/v1/nodepool.go index 2e2de7a38a..9558d590e4 100644 --- a/pkg/apis/v1/nodepool.go +++ b/pkg/apis/v1/nodepool.go @@ -89,7 +89,7 @@ type Disruption struct { type Budget struct { // Reasons is a list of disruption methods that this budget applies to. If Reasons is not set, this budget applies to all methods. // Otherwise, this will apply to each reason defined. - // allowed reasons are Underutilized, Empty, and Drifted. + // allowed reasons are Underutilized, Empty, and Drifted and additional CloudProvider-specific reasons. // +optional Reasons []DisruptionReason `json:"reasons,omitempty"` // Nodes dictates the maximum number of NodeClaims owned by this NodePool @@ -129,6 +129,7 @@ const ( ) // DisruptionReason defines valid reasons for disruption budgets. +// CloudProviders will need to append to the list of enums when implementing cloud provider disruption reasons // +kubebuilder:validation:Enum={Underutilized,Empty,Drifted} type DisruptionReason string diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index bf5e80fcf4..b37802d646 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -61,6 +61,8 @@ type CloudProvider struct { CreatedNodeClaims map[string]*v1.NodeClaim Drifted cloudprovider.DriftReason NodeClassGroupVersionKind []schema.GroupVersionKind + + disruptionReasons []v1.DisruptionReason } func NewCloudProvider() *CloudProvider { @@ -95,6 +97,8 @@ func (c *CloudProvider) Reset() { Kind: "", }, } + + c.disruptionReasons = []v1.DisruptionReason{"ExampleReason"} } func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) { @@ -236,7 +240,7 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([] } func (c *CloudProvider) DisruptionReasons() []v1.DisruptionReason { - return nil + return c.disruptionReasons } func (c *CloudProvider) Delete(_ context.Context, nc *v1.NodeClaim) error { diff --git a/pkg/controllers/disruption/cloudprovider.go b/pkg/controllers/disruption/cloudprovider.go new file mode 100644 index 0000000000..371fbceeff --- /dev/null +++ b/pkg/controllers/disruption/cloudprovider.go @@ -0,0 +1,127 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption + +import ( + "context" + "errors" + + "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" + "sigs.k8s.io/karpenter/pkg/controllers/provisioning" + "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" + "sigs.k8s.io/karpenter/pkg/controllers/state" + "sigs.k8s.io/karpenter/pkg/events" +) + +// CloudProvider is a subreconciler that deletes candidates according to cloud provider specific reasons. +// This should be methods that are +type CloudProvider struct { + cloudprovider.CloudProvider + kubeClient client.Client + recorder events.Recorder + cluster *state.Cluster + provisioner *provisioning.Provisioner +} + +func NewCloudProvider(kubeClient client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder, provisioner *provisioning.Provisioner) *CloudProvider { + return &CloudProvider{ + CloudProvider: cloudProvider, + kubeClient: kubeClient, + cluster: cluster, + provisioner: provisioner, + } +} + +// ShouldDisrupt is a predicate used to filter candidates +func (cp *CloudProvider) ShouldDisrupt(ctx context.Context, c *Candidate) bool { + for _, reason := range cp.DisruptionReasons() { + if c.NodeClaim.StatusConditions().Get(string(reason)).IsTrue() { + return true + } + } + return false +} + +// ComputeCommand generates a disruption command given candidates +func (cp *CloudProvider) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) { + // Do a quick check through the candidates to see if they're empty. + // For each candidate that is empty with a nodePool allowing its disruption + // add it to the existing command. + empty := make([]*Candidate, 0, len(candidates)) + for _, candidate := range candidates { + if len(candidate.reschedulablePods) > 0 { + continue + } + // If there's disruptions allowed for the candidate's nodepool, + // add it to the list of candidates, and decrement the budget. + if disruptionBudgetMapping[candidate.nodePool.Name][cp.Reason()] > 0 { + empty = append(empty, candidate) + disruptionBudgetMapping[candidate.nodePool.Name][cp.Reason()]-- + } + } + // Disrupt all empty CloudProvidered candidates, as they require no scheduling simulations. + if len(empty) > 0 { + return Command{ + candidates: empty, + }, scheduling.Results{}, nil + } + + for _, candidate := range candidates { + // If the disruption budget doesn't allow this candidate to be disrupted, + // continue to the next candidate. We don't need to decrement any budget + // counter since CloudProvider commands can only have one candidate. + if disruptionBudgetMapping[candidate.nodePool.Name][cp.Reason()] == 0 { + continue + } + // Check if we need to create any NodeClaims. + results, err := SimulateScheduling(ctx, cp.kubeClient, cp.cluster, cp.provisioner, candidate) + if err != nil { + // if a candidate is now deleting, just retry + if errors.Is(err, errCandidateDeleting) { + continue + } + return Command{}, scheduling.Results{}, err + } + // Emit an event that we couldn't reschedule the pods on the node. + if !results.AllNonPendingPodsScheduled() { + cp.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...) + continue + } + + return Command{ + candidates: []*Candidate{candidate}, + replacements: results.NewNodeClaims, + }, results, nil + } + return Command{}, scheduling.Results{}, nil +} + +func (cp *CloudProvider) Reason() v1.DisruptionReason { + return "CloudProviderReason" +} + +func (cp *CloudProvider) Class() string { + return EventualDisruptionClass +} + +func (cp *CloudProvider) ConsolidationType() string { + return "" +} diff --git a/pkg/controllers/disruption/cloudprovider_test.go b/pkg/controllers/disruption/cloudprovider_test.go new file mode 100644 index 0000000000..da8e5335f3 --- /dev/null +++ b/pkg/controllers/disruption/cloudprovider_test.go @@ -0,0 +1,962 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disruption_test + +import ( + "sync" + "time" + + "sigs.k8s.io/karpenter/pkg/metrics" + + "github.com/awslabs/operatorpkg/status" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/controllers/disruption" + "sigs.k8s.io/karpenter/pkg/scheduling" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" +) + +var _ = FDescribe("CloudProvider", func() { + var nodePool *v1.NodePool + var nodeClaim *v1.NodeClaim + var node *corev1.Node + + BeforeEach(func() { + nodePool = test.NodePool(v1.NodePool{ + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidateAfter: v1.MustParseNillableDuration("Never"), + // Disrupt away! + Budgets: []v1.Budget{{ + Nodes: "100%", + }}, + }, + }, + }) + nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaim.StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + }) + Context("Metrics", func() { + var eligibleNodesLabels = map[string]string{ + metrics.ReasonLabel: "CloudProviderReason", + } + FIt("should correctly report eligible nodes", func() { + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + ExpectMetricGaugeValue(disruption.EligibleNodes, 0, eligibleNodesLabels) + + // remove the do-not-disrupt annotation to make the node eligible for cloudprovider disruption and update cluster state + pod.SetAnnotations(map[string]string{}) + ExpectApplied(ctx, env.Client, pod) + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + ExpectMetricGaugeValue(disruption.EligibleNodes, 1, eligibleNodesLabels) + }) + }) + Context("Budgets", func() { + var numNodes = 10 + var nodeClaims []*v1.NodeClaim + var nodes []*corev1.Node + var rs *appsv1.ReplicaSet + labels := map[string]string{ + "app": "test", + } + BeforeEach(func() { + // create our RS so we can link a pod to it + rs = test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + }) + It("should allow all nodes to be disrupted", func() { + nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + + nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "100%"}} + + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < numNodes; i++ { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{ + "nodepool": nodePool.Name, + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 10)) + + // Execute command, thus deleting all nodes + ExpectSingletonReconciled(ctx, queue) + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(0)) + }) + It("should allow no nodes to be disrupted", func() { + nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + + nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "0%"}} + + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < numNodes; i++ { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{ + "nodepool": nodePool.Name, + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 0)) + + // Execute command, thus deleting no nodes + ExpectSingletonReconciled(ctx, queue) + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(numNodes)) + }) + It("should only allow 3 empty nodes to be disrupted", func() { + nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + + nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "30%"}} + + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < numNodes; i++ { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{ + "nodepool": nodePool.Name, + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 3)) + + // Execute command, thus deleting 3 nodes + ExpectSingletonReconciled(ctx, queue) + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(7)) + }) + It("should disrupt 3 nodes, taking into account commands in progress", func() { + nodeClaims, nodes = test.NodeClaimsAndNodes(numNodes, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodePool.Spec.Disruption.Budgets = []v1.Budget{{Nodes: "30%"}} + + ExpectApplied(ctx, env.Client, nodePool) + + // Mark the first five as disrupted + for i := range lo.Range(5) { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + } + + for i := 0; i < numNodes; i++ { + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + // 3 pods to fit on 3 nodes that will be disrupted so that they're not empty + // and have to be in 3 different commands + pods := test.Pods(5, test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}}) + // Bind the pods to the first n nodes. + for i := 0; i < len(pods); i++ { + ExpectApplied(ctx, env.Client, pods[i]) + ExpectManualBinding(ctx, env.Client, pods[i], nodes[i]) + } + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + + // Reconcile 5 times, enqueuing 3 commands total. + for i := 0; i < 5; i++ { + ExpectSingletonReconciled(ctx, disruptionController) + } + + nodes = ExpectNodes(ctx, env.Client) + Expect(len(lo.Filter(nodes, func(nc *corev1.Node, _ int) bool { + return lo.Contains(nc.Spec.Taints, v1.DisruptedNoScheduleTaint) + }))).To(Equal(3)) + // Execute all commands in the queue, only deleting 3 nodes + for i := 0; i < 5; i++ { + ExpectSingletonReconciled(ctx, queue) + } + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(7)) + }) + It("should allow 2 nodes from each nodePool to be deleted", func() { + // Create 10 nodepools + nps := test.NodePools(10, v1.NodePool{ + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidateAfter: v1.MustParseNillableDuration("Never"), + Budgets: []v1.Budget{{ + // 1/2 of 3 nodes == 1.5 nodes. This should round up to 2. + Nodes: "50%", + }}, + }, + Template: v1.NodeClaimTemplate{ + Spec: v1.NodeClaimTemplateSpec{ + ExpireAfter: v1.MustParseNillableDuration("Never"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < len(nps); i++ { + ExpectApplied(ctx, env.Client, nps[i]) + } + nodeClaims = make([]*v1.NodeClaim, 0, 30) + nodes = make([]*corev1.Node, 0, 30) + // Create 3 nodes for each nodePool + for _, np := range nps { + ncs, ns := test.NodeClaimsAndNodes(3, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: np.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaims = append(nodeClaims, ncs...) + nodes = append(nodes, ns...) + } + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < len(nodeClaims); i++ { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + for _, np := range nps { + metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{ + "nodepool": np.Name, + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 2)) + } + + // Execute the command in the queue, only deleting 20 nodes + ExpectSingletonReconciled(ctx, queue) + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(10)) + }) + FIt("should allow all nodes from each nodePool to be deleted", func() { + // Create 10 nodepools + nps := test.NodePools(10, v1.NodePool{ + Spec: v1.NodePoolSpec{ + Disruption: v1.Disruption{ + ConsolidateAfter: v1.MustParseNillableDuration("Never"), + Budgets: []v1.Budget{{ + Nodes: "100%", + }}, + }, + }, + }) + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < len(nps); i++ { + ExpectApplied(ctx, env.Client, nps[i]) + } + nodeClaims = make([]*v1.NodeClaim, 0, 30) + nodes = make([]*corev1.Node, 0, 30) + // Create 3 nodes for each nodePool + for _, np := range nps { + ncs, ns := test.NodeClaimsAndNodes(3, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: np.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaims = append(nodeClaims, ncs...) + nodes = append(nodes, ns...) + } + ExpectApplied(ctx, env.Client, nodePool) + for i := 0; i < len(nodeClaims); i++ { + nodeClaims[i].StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaims[i], nodes[i]) + } + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + for _, np := range nps { + metric, found := FindMetricWithLabelValues("karpenter_nodepools_allowed_disruptions", map[string]string{ + "nodepool": np.Name, + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().GetValue()).To(BeNumerically("==", 3)) + } + + // Execute the command in the queue, deleting all nodes + ExpectSingletonReconciled(ctx, queue) + Expect(len(ExpectNodeClaims(ctx, env.Client))).To(Equal(0)) + }) + }) + Context("CP Disruption Reasons", func() { + It("should continue to the next disrupted node if the first cannot reschedule all pods", func() { + pod := test.Pod(test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("150"), + }, + }, + }) + podToExpire := test.Pod(test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaim2.StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaim2, node2, podToExpire) + ExpectManualBinding(ctx, env.Client, podToExpire, node2) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node2}, []*v1.NodeClaim{nodeClaim2}) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) + + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + ExpectExists(ctx, env.Client, nodeClaim) + ExpectNotFound(ctx, env.Client, nodeClaim2) + }) + It("should ignore nodes without the cloudprovider disrupted status condition", func() { + _ = nodeClaim.StatusConditions().Clear(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes with the karpenter.sh/do-not-disrupt annotation", func() { + node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "true"}) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes that have pods with the karpenter.sh/do-not-disrupt annotation", func() { + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should disrupt nodes that have pods with the karpenter.sh/do-not-disrupt annotation when the NodePool's TerminationGracePeriod is not nil", func() { + nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to create a replacement but not delete the old nodeclaim + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for cloud provider disruption + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should disrupt nodes that have pods with the blocking PDBs when the NodePool's TerminationGracePeriod is not nil", func() { + nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + podLabels := map[string]string{"test": "value"} + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, + }, + }) + budget := test.PodDisruptionBudget(test.PDBOptions{ + Labels: podLabels, + MaxUnavailable: fromInt(0), + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod, budget) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to create a replacement but not delete the old nodeclaim + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for cloud provider disruption + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes with the cloud provider status condition set to false", func() { + nodeClaim.StatusConditions().SetFalse(string(cloudProvider.DisruptionReasons()[0]), "NotDisrupted", "NotDisrupted") + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("can delete disrupted nodes", func() { + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + // We should delete the nodeClaim that has disrupted + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, nodeClaim, node) + }) + It("should disrupt all empty disrupted nodes in parallel", func() { + nodeClaims, nodes := test.NodeClaimsAndNodes(100, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + for _, m := range nodeClaims { + m.StatusConditions().SetTrue(string(cloudProvider.DisruptionReasons()[0])) + ExpectApplied(ctx, env.Client, m) + } + for _, n := range nodes { + ExpectApplied(ctx, env.Client, n) + } + ExpectApplied(ctx, env.Client, nodePool) + + // inform cluster state about nodes and nodeClaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...) + + // Expect that the disrupted nodeClaims are gone + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + }) + It("can replace disrupted nodes", func() { + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}}) + + ExpectApplied(ctx, env.Client, rs, pod, nodeClaim, node, nodePool) + + // bind the pods to the node + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + // disruption won't delete the old nodeClaim until the new nodeClaim is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + ExpectNotFound(ctx, env.Client, nodeClaim, node) + + // Expect that the new nodeClaim was created and its different than the original + nodeclaims := ExpectNodeClaims(ctx, env.Client) + nodes := ExpectNodes(ctx, env.Client) + Expect(nodeclaims).To(HaveLen(1)) + Expect(nodes).To(HaveLen(1)) + Expect(nodeclaims[0].Name).ToNot(Equal(nodeClaim.Name)) + Expect(nodes[0].Name).ToNot(Equal(node.Name)) + }) + It("should untaint nodes when disruption replacement fails", func() { + cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to untaint + + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pod) + + // bind pods to node + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + var wg sync.WaitGroup + ExpectNewNodeClaimsDeleted(ctx, env.Client, &wg, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Wait > 5 seconds for eventual consistency hack in orchestration.Queue + fakeClock.Step(5*time.Second + time.Nanosecond*1) + ExpectSingletonReconciled(ctx, queue) + // We should have tried to create a new nodeClaim but failed to do so; therefore, we untainted the existing node + node = ExpectExists(ctx, env.Client, node) + Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + }) + It("can replace disrupted nodes with multiple nodes", func() { + currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "current-on-demand", + Offerings: []cloudprovider.Offering{ + { + Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.5, + Available: false, + }, + }, + }) + replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "replacement-on-demand", + Offerings: []cloudprovider.Offering{ + { + Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.3, + Available: true, + }, + }, + Resources: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("3")}, + }) + cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{ + currentInstance, + replacementInstance, + } + + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pods := test.Pods(3, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}, + // Make each pod request about a third of the allocatable on the node + ResourceRequirements: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("2")}, + }, + }) + + nodeClaim.Labels = lo.Assign(nodeClaim.Labels, map[string]string{ + corev1.LabelInstanceTypeStable: currentInstance.Name, + v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), + }) + nodeClaim.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} + node.Labels = lo.Assign(node.Labels, map[string]string{ + corev1.LabelInstanceTypeStable: currentInstance.Name, + v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), + }) + node.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} + + ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pods[0], pods[1], pods[2]) + + // bind the pods to the node + ExpectManualBinding(ctx, env.Client, pods[0], node) + ExpectManualBinding(ctx, env.Client, pods[1], node) + ExpectManualBinding(ctx, env.Client, pods[2], node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + // expect that cloud provider disruption provisioned three nodes, one for each pod + ExpectNotFound(ctx, env.Client, nodeClaim, node) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(3)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) + }) + It("should disrupt one non-empty node at a time, starting with the earliest drift", func() { + Skip("this isn't ordered for now") + labels := map[string]string{ + "app": "test", + } + + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + + pods := test.Pods(2, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }, + }, + // Make each pod request only fit on a single node + ResourceRequirements: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("30")}, + }, + }) + + nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("32")}, + }, + }) + nodeClaim2.Status.Conditions = append(nodeClaim2.Status.Conditions, status.Condition{ + Type: string(cloudProvider.DisruptionReasons()[0]), + Status: metav1.ConditionTrue, + Reason: string(cloudProvider.DisruptionReasons()[0]), + Message: string(cloudProvider.DisruptionReasons()[0]), + LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Hour)}, + }) + + ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], nodeClaim, node, nodeClaim2, node2, nodePool) + + // bind pods to node so that they're not empty and don't disrupt in parallel. + ExpectManualBinding(ctx, env.Client, pods[0], node) + ExpectManualBinding(ctx, env.Client, pods[1], node2) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node, node2}, []*v1.NodeClaim{nodeClaim, nodeClaim2}) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) + + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) + ExpectNotFound(ctx, env.Client, nodeClaim2, node2) + ExpectExists(ctx, env.Client, nodeClaim) + ExpectExists(ctx, env.Client, node) + }) + It("should delete nodes with the karpenter.sh/do-not-disrupt annotation set to false", func() { + node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "false"}) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + // We should delete the nodeClaim that has disrupted + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, nodeClaim, node) + }) + }) +}) diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 4dd30c25ce..78f17f149c 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -79,6 +79,8 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi cloudProvider: cp, lastRun: map[string]time.Time{}, methods: []Method{ + // Terminate any NodeClaims with StatusConditions specific to the CloudProvider implementation. For + NewCloudProvider(kubeClient, cluster, c.cloudProvider, recorder, provisioner), // Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule. NewDrift(kubeClient, cluster, provisioner, recorder), // Delete any empty NodeClaims as there is zero cost in terms of disruption.