diff --git a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml index 2ed31b19e9..d02b382821 100644 --- a/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml +++ b/kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: kwoknodeclasses.karpenter.kwok.sh spec: group: karpenter.kwok.sh @@ -96,12 +96,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/kwok/apis/v1alpha1/kwoknodeclass.go b/kwok/apis/v1alpha1/kwoknodeclass.go index ecc08fcf48..6e2b248307 100644 --- a/kwok/apis/v1alpha1/kwoknodeclass.go +++ b/kwok/apis/v1alpha1/kwoknodeclass.go @@ -18,6 +18,8 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" ) // KWOKNodeClass is the Schema for the KWOKNodeClass API @@ -38,3 +40,5 @@ type KWOKNodeClassList struct { metav1.ListMeta `json:"metadata,omitempty"` Items []KWOKNodeClass `json:"items"` } + +const DisruptionReasonExampleReason v1.DisruptionReason = "ExampleReason" diff --git a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml index e10563ab7b..2ab24b2688 100644 --- a/kwok/charts/crds/karpenter.sh_nodeclaims.yaml +++ b/kwok/charts/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: nodeclaims.karpenter.sh spec: group: karpenter.sh @@ -262,19 +262,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -350,12 +346,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -798,12 +789,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/kwok/charts/crds/karpenter.sh_nodepools.yaml b/kwok/charts/crds/karpenter.sh_nodepools.yaml index e589b9c672..333d794504 100644 --- a/kwok/charts/crds/karpenter.sh_nodepools.yaml +++ b/kwok/charts/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: nodepools.karpenter.sh spec: group: karpenter.sh @@ -393,19 +393,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -477,12 +473,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -1046,12 +1037,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 85537ecdb6..9eb5fbde0d 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -65,7 +65,7 @@ func (c CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1 } func (c CloudProvider) DisruptionReasons() []v1.DisruptionReason { - return nil + return []v1.DisruptionReason{v1alpha1.DisruptionReasonExampleReason} } func (c CloudProvider) Delete(ctx context.Context, nodeClaim *v1.NodeClaim) error { diff --git a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml index 5f5bff9ae0..01234b06e1 100644 --- a/pkg/apis/crds/karpenter.sh_nodeclaims.yaml +++ b/pkg/apis/crds/karpenter.sh_nodeclaims.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: nodeclaims.karpenter.sh spec: group: karpenter.sh @@ -260,19 +260,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -348,12 +344,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -794,12 +785,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index ec337f277a..c6efbb5179 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: nodepools.karpenter.sh spec: group: karpenter.sh @@ -390,19 +390,15 @@ spec: description: |- TerminationGracePeriod is the maximum duration the controller will wait before forcefully deleting the pods on a node, measured from when deletion is first initiated. - Warning: this feature takes precedence over a Pod's terminationGracePeriodSeconds value, and bypasses any blocked PDBs or the karpenter.sh/do-not-disrupt annotation. - This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. When set, drifted nodes will begin draining even if there are pods blocking eviction. Draining will respect PDBs and the do-not-disrupt annotation until the TGP is reached. - Karpenter will preemptively delete pods so their terminationGracePeriodSeconds align with the node's terminationGracePeriod. If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, that pod will be deleted at T = node timeout - pod terminationGracePeriodSeconds. - The feature can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. If left undefined, the controller will wait indefinitely for pods to be drained. pattern: ^([0-9]+(s|m|h))+$ @@ -474,12 +470,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string @@ -1041,12 +1032,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/pkg/apis/v1/nodepool.go b/pkg/apis/v1/nodepool.go index 381ab5c234..91aef57f39 100644 --- a/pkg/apis/v1/nodepool.go +++ b/pkg/apis/v1/nodepool.go @@ -30,7 +30,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/clock" ) @@ -135,20 +134,12 @@ const ( type DisruptionReason string const ( + DisruptionReasonAll DisruptionReason = "All" DisruptionReasonUnderutilized DisruptionReason = "Underutilized" DisruptionReasonEmpty DisruptionReason = "Empty" DisruptionReasonDrifted DisruptionReason = "Drifted" ) -var ( - // WellKnownDisruptionReasons is a list of all valid reasons for disruption budgets. - WellKnownDisruptionReasons = sets.New( - DisruptionReasonUnderutilized, - DisruptionReasonEmpty, - DisruptionReasonDrifted, - ) -) - type Limits v1.ResourceList func (l Limits) ExceededBy(resources v1.ResourceList) error { @@ -321,7 +312,7 @@ func (nl *NodePoolList) OrderByWeight() { // amount of state that the disruption controller must reconcile, while allowing the GetAllowedDisruptionsByReason() // to bubble up any errors in validation. func (in *NodePool) MustGetAllowedDisruptions(ctx context.Context, c clock.Clock, numNodes int) map[DisruptionReason]int { - allowedDisruptions, err := in.GetAllowedDisruptionsByReason(ctx, c, numNodes) + allowedDisruptions, err := in.GetAllowedDisruptionsByReason(c, numNodes) if err != nil { return map[DisruptionReason]int{} } @@ -329,24 +320,35 @@ func (in *NodePool) MustGetAllowedDisruptions(ctx context.Context, c clock.Clock } // GetAllowedDisruptionsByReason returns the minimum allowed disruptions across all disruption budgets, for all disruption methods for a given nodepool -func (in *NodePool) GetAllowedDisruptionsByReason(ctx context.Context, c clock.Clock, numNodes int) (map[DisruptionReason]int, error) { +func (in *NodePool) GetAllowedDisruptionsByReason(c clock.Clock, numNodes int) (map[DisruptionReason]int, error) { var multiErr error allowedDisruptions := map[DisruptionReason]int{} - for reason := range WellKnownDisruptionReasons { - allowedDisruptions[reason] = math.MaxInt32 - } + allowedDisruptions[DisruptionReasonAll] = math.MaxInt32 for _, budget := range in.Spec.Disruption.Budgets { val, err := budget.GetAllowedDisruptions(c, numNodes) if err != nil { multiErr = multierr.Append(multiErr, err) } - // If reasons is nil, it applies to all well known disruption reasons - for _, reason := range lo.Ternary(budget.Reasons == nil, WellKnownDisruptionReasons.UnsortedList(), budget.Reasons) { - allowedDisruptions[reason] = lo.Min([]int{allowedDisruptions[reason], val}) + + if budget.Reasons == nil { + allowedDisruptions[DisruptionReasonAll] = lo.Min([]int{allowedDisruptions[DisruptionReasonAll], val}) + continue + } + for _, reason := range budget.Reasons { + if reasonVal, found := allowedDisruptions[reason]; found { + allowedDisruptions[reason] = lo.Min([]int{reasonVal, val}) + continue + } + allowedDisruptions[reason] = val } } + // All the node count for a specific reason needs to be less or equal then disruption reason shared for all disruption + for _, reason := range lo.Keys(allowedDisruptions) { + allowedDisruptions[reason] = lo.Min([]int{allowedDisruptions[reason], allowedDisruptions[DisruptionReasonAll]}) + } + return allowedDisruptions, multiErr } diff --git a/pkg/apis/v1/nodepool_budgets_test.go b/pkg/apis/v1/nodepool_budgets_test.go index a9965d5f7e..4d80cd7ca9 100644 --- a/pkg/apis/v1/nodepool_budgets_test.go +++ b/pkg/apis/v1/nodepool_budgets_test.go @@ -17,7 +17,7 @@ limitations under the License. package v1_test import ( - "context" + "fmt" "math" "strings" "time" @@ -36,7 +36,6 @@ var _ = Describe("Budgets", func() { var nodePool *NodePool var budgets []Budget var fakeClock *clock.FakeClock - var ctx = context.Background() BeforeEach(func() { // Set the time to the middle of the year of 2000, the best year ever @@ -79,6 +78,7 @@ var _ = Describe("Budgets", func() { DisruptionReasonUnderutilized, DisruptionReasonDrifted, DisruptionReasonEmpty, + "CloudProviderDisruptionReason", }, Nodes: "0", Schedule: lo.ToPtr("@weekly"), @@ -100,11 +100,12 @@ var _ = Describe("Budgets", func() { budgets[5].Schedule = lo.ToPtr("* * * * *") budgets[5].Duration = lo.ToPtr(metav1.Duration{Duration: lo.Must(time.ParseDuration("1h"))}) - disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(ctx, fakeClock, 100) + disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100) Expect(err).To(BeNil()) Expect(disruptionsByReason[DisruptionReasonUnderutilized]).To(Equal(0)) Expect(disruptionsByReason[DisruptionReasonDrifted]).To(Equal(0)) Expect(disruptionsByReason[DisruptionReasonEmpty]).To(Equal(0)) + Expect(disruptionsByReason["CloudProviderDisruptionReason"]).To(Equal(0)) }) It("should return MaxInt32 for all reasons when there are no active budgets", func() { @@ -112,9 +113,10 @@ var _ = Describe("Budgets", func() { budgets[i].Schedule = lo.ToPtr("@yearly") budgets[i].Duration = lo.ToPtr(metav1.Duration{Duration: lo.Must(time.ParseDuration("1h"))}) } - disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(ctx, fakeClock, 100) + disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100) Expect(err).To(BeNil()) + fmt.Println(disruptionsByReason) // All budgets should have unbounded disruptions when inactive for _, disruptions := range disruptionsByReason { Expect(disruptions).To(Equal(math.MaxInt32)) @@ -124,7 +126,7 @@ var _ = Describe("Budgets", func() { It("should ignore reason-defined budgets when inactive", func() { budgets[3].Schedule = lo.ToPtr("@yearly") budgets[4].Schedule = lo.ToPtr("@yearly") - disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(ctx, fakeClock, 100) + disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100) Expect(err).To(BeNil()) for _, disruptions := range disruptionsByReason { Expect(disruptions).To(Equal(10)) @@ -134,7 +136,7 @@ var _ = Describe("Budgets", func() { It("should return the budget for all disruption reasons when undefined", func() { nodePool.Spec.Disruption.Budgets = budgets[:1] Expect(len(nodePool.Spec.Disruption.Budgets)).To(Equal(1)) - disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(ctx, fakeClock, 100) + disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100) Expect(err).To(BeNil()) Expect(len(budgets[0].Reasons)).To(Equal(0)) for _, disruptions := range disruptionsByReason { @@ -155,7 +157,7 @@ var _ = Describe("Budgets", func() { }, }, }...) - disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(ctx, fakeClock, 100) + disruptionsByReason, err := nodePool.GetAllowedDisruptionsByReason(fakeClock, 100) Expect(err).To(BeNil()) Expect(disruptionsByReason[DisruptionReasonEmpty]).To(Equal(4)) diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index bf5e80fcf4..2aabc75fa4 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -236,7 +236,7 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([] } func (c *CloudProvider) DisruptionReasons() []v1.DisruptionReason { - return nil + return []v1.DisruptionReason{"CloudProviderDisruptionReason"} } func (c *CloudProvider) Delete(_ context.Context, nc *v1.NodeClaim) error { diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index b2d4222f6e..ba5ea9f0a8 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -25,9 +25,12 @@ import ( "sync" "time" + disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" + "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" @@ -71,9 +74,9 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi c := MakeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder, queue) // Generate eventually disruptable reason based on a combination of drift and cloudprovider disruption reason - EventualDisruption := []Method{} + eventualDisruptionMethods := []Method{} for _, reason := range append([]v1.DisruptionReason{v1.DisruptionReasonDrifted}, cp.DisruptionReasons()...) { - EventualDisruption = append(EventualDisruption, NewEventualDisruption(kubeClient, cluster, provisioner, recorder, reason)) + eventualDisruptionMethods = append(eventualDisruptionMethods, NewEventualDisruption(kubeClient, cluster, provisioner, recorder, reason)) } return &Controller{ @@ -87,7 +90,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provi lastRun: map[string]time.Time{}, methods: append( // Terminate any NodeClaims that have need to be eventually disrupted from provisioning specifications, allowing the pods to reschedule. - EventualDisruption, + eventualDisruptionMethods, []Method{ // Delete any empty NodeClaims as there is zero cost in terms of disruption. NewEmptiness(c), @@ -171,6 +174,21 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro if err != nil { return false, fmt.Errorf("building disruption budgets, %w", err) } + // Emit metric for every nodepool + for _, nodePoolName := range lo.Keys(disruptionBudgetMapping) { + allowedDisruption, exists := disruptionBudgetMapping[nodePoolName][disruption.Reason()] + NodePoolAllowedDisruptions.With(map[string]string{ + metrics.NodePoolLabel: nodePoolName, metrics.ReasonLabel: string(disruption.Reason()), + }).Set(float64(lo.Ternary(exists, allowedDisruption, disruptionBudgetMapping[nodePoolName][v1.DisruptionReasonAll]))) + if allowedDisruption == 0 { + np := &v1.NodePool{} + if err := c.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, np); err != nil { + return false, fmt.Errorf("getting nodepool, %w", err) + } + c.recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(np, disruption.Reason())) + } + } + // Determine the disruption action cmd, schedulingResults, err := disruption.ComputeCommand(ctx, disruptionBudgetMapping, candidates...) if err != nil { @@ -283,7 +301,7 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) { var buf bytes.Buffer for _, np := range nodePoolList.Items { // Use a dummy value of 100 since we only care if this errors. - if _, err := np.GetAllowedDisruptionsByReason(ctx, c.clock, 100); err != nil { + if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100); err != nil { fmt.Fprintf(&buf, "invalid disruption budgets in nodepool %s, %s", np.Name, err) } } diff --git a/pkg/controllers/disruption/emptiness.go b/pkg/controllers/disruption/emptiness.go index 31954c738a..a6281f596b 100644 --- a/pkg/controllers/disruption/emptiness.go +++ b/pkg/controllers/disruption/emptiness.go @@ -65,10 +65,12 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping empty := make([]*Candidate, 0, len(candidates)) constrainedByBudgets := false for _, candidate := range candidates { + _, found := disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()] + reason := lo.Ternary(found, e.Reason(), v1.DisruptionReasonAll) if len(candidate.reschedulablePods) > 0 { continue } - if disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][reason] == 0 { // set constrainedByBudgets to true if any node was a candidate but was constrained by a budget constrainedByBudgets = true continue @@ -76,7 +78,7 @@ func (e *Emptiness) ComputeCommand(ctx context.Context, disruptionBudgetMapping // If there's disruptions allowed for the candidate's nodepool, // add it to the list of candidates, and decrement the budget. empty = append(empty, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][e.Reason()]-- + disruptionBudgetMapping[candidate.nodePool.Name][reason]-- } // none empty, so do nothing if len(empty) == 0 { diff --git a/pkg/controllers/disruption/eventual.go b/pkg/controllers/disruption/eventual.go index 55d184c226..c1bf92eb3a 100644 --- a/pkg/controllers/disruption/eventual.go +++ b/pkg/controllers/disruption/eventual.go @@ -21,10 +21,10 @@ import ( "errors" "sort" + "github.com/samber/lo" "sigs.k8s.io/controller-runtime/pkg/client" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" - "sigs.k8s.io/karpenter/pkg/cloudprovider" disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" @@ -34,34 +34,33 @@ import ( // Drift is a subreconciler that deletes drifted candidates. type EventualDisruption struct { - kubeClient client.Client - cluster *state.Cluster - provisioner *provisioning.Provisioner - recorder events.Recorder - cloudProvider cloudprovider.CloudProvider - disruptionReason v1.DisruptionReason + kubeClient client.Client + cluster *state.Cluster + provisioner *provisioning.Provisioner + recorder events.Recorder + reason v1.DisruptionReason } -func NewEventualDisruption(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder, disruptionReason v1.DisruptionReason) *EventualDisruption { +func NewEventualDisruption(kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, recorder events.Recorder, reason v1.DisruptionReason) *EventualDisruption { return &EventualDisruption{ - kubeClient: kubeClient, - cluster: cluster, - provisioner: provisioner, - recorder: recorder, - disruptionReason: disruptionReason, + kubeClient: kubeClient, + cluster: cluster, + provisioner: provisioner, + recorder: recorder, + reason: reason, } } // ShouldDisrupt is a predicate used to filter candidates -func (ed *EventualDisruption) ShouldDisrupt(ctx context.Context, c *Candidate) bool { - return c.NodeClaim.StatusConditions().Get(string(ed.disruptionReason)).IsTrue() +func (d *EventualDisruption) ShouldDisrupt(ctx context.Context, c *Candidate) bool { + return c.NodeClaim.StatusConditions().Get(string(d.Reason())).IsTrue() } // ComputeCommand generates a disruption command given candidates -func (ed *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) { +func (d *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]map[v1.DisruptionReason]int, candidates ...*Candidate) (Command, scheduling.Results, error) { sort.Slice(candidates, func(i int, j int) bool { - return candidates[i].NodeClaim.StatusConditions().Get(string(ed.disruptionReason)).LastTransitionTime.Time.Before( - candidates[j].NodeClaim.StatusConditions().Get(string(ed.disruptionReason)).LastTransitionTime.Time) + return candidates[i].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time.Before( + candidates[j].NodeClaim.StatusConditions().Get(string(d.Reason())).LastTransitionTime.Time) }) // Do a quick check through the candidates to see if they're empty. @@ -69,14 +68,16 @@ func (ed *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudg // add it to the existing command. empty := make([]*Candidate, 0, len(candidates)) for _, candidate := range candidates { + _, found := disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()] + reason := lo.Ternary(found, d.Reason(), v1.DisruptionReasonAll) if len(candidate.reschedulablePods) > 0 { continue } // If there's disruptions allowed for the candidate's nodepool, // add it to the list of candidates, and decrement the budget. - if disruptionBudgetMapping[candidate.nodePool.Name][ed.Reason()] > 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][reason] > 0 { empty = append(empty, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][ed.Reason()]-- + disruptionBudgetMapping[candidate.nodePool.Name][reason]-- } } // Disrupt all empty drifted candidates, as they require no scheduling simulations. @@ -90,11 +91,11 @@ func (ed *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudg // If the disruption budget doesn't allow this candidate to be disrupted, // continue to the next candidate. We don't need to decrement any budget // counter since drift commands can only have one candidate. - if disruptionBudgetMapping[candidate.nodePool.Name][ed.Reason()] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][d.Reason()] == 0 { continue } // Check if we need to create any NodeClaims. - results, err := SimulateScheduling(ctx, ed.kubeClient, ed.cluster, ed.provisioner, candidate) + results, err := SimulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate) if err != nil { // if a candidate is now deleting, just retry if errors.Is(err, errCandidateDeleting) { @@ -104,7 +105,7 @@ func (ed *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudg } // Emit an event that we couldn't reschedule the pods on the node. if !results.AllNonPendingPodsScheduled() { - ed.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...) + d.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...) continue } @@ -116,14 +117,14 @@ func (ed *EventualDisruption) ComputeCommand(ctx context.Context, disruptionBudg return Command{}, scheduling.Results{}, nil } -func (ed *EventualDisruption) Reason() v1.DisruptionReason { - return ed.disruptionReason +func (d *EventualDisruption) Reason() v1.DisruptionReason { + return d.reason } -func (ed *EventualDisruption) Class() string { +func (d *EventualDisruption) Class() string { return EventualDisruptionClass } -func (ed *EventualDisruption) ConsolidationType() string { +func (d *EventualDisruption) ConsolidationType() string { return "" } diff --git a/pkg/controllers/disruption/eventual_test.go b/pkg/controllers/disruption/eventual_test.go index 6dc3dfa30e..f5ff8e6bc7 100644 --- a/pkg/controllers/disruption/eventual_test.go +++ b/pkg/controllers/disruption/eventual_test.go @@ -75,13 +75,43 @@ var _ = Describe("Drift", func() { }, }, }) - nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted) }) Context("Metrics", func() { - var eligibleNodesLabels = map[string]string{ - metrics.ReasonLabel: "drifted", - } - It("should correctly report eligible nodes", func() { + It("should correctly report eligible drifted nodes", func() { + eligibleNodesLabels := map[string]string{ + metrics.ReasonLabel: "drifted", + } + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + ExpectMetricGaugeValue(disruption.EligibleNodes, 0, eligibleNodesLabels) + + // remove the do-not-disrupt annotation to make the node eligible for drift and update cluster state + pod.SetAnnotations(map[string]string{}) + nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted) + ExpectApplied(ctx, env.Client, pod, nodeClaim) + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + ExpectMetricGaugeValue(disruption.EligibleNodes, 1, eligibleNodesLabels) + }) + It("should correctly report eligible cloudprovider specific node", func() { + var eligibleNodesLabels = map[string]string{ + metrics.ReasonLabel: "cloudproviderdisruptionreason", + } pod := test.Pod(test.PodOptions{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ @@ -101,7 +131,8 @@ var _ = Describe("Drift", func() { // remove the do-not-disrupt annotation to make the node eligible for drift and update cluster state pod.SetAnnotations(map[string]string{}) - ExpectApplied(ctx, env.Client, pod) + nodeClaim.StatusConditions().SetTrue("CloudProviderDisruptionReason") + ExpectApplied(ctx, env.Client, pod, nodeClaim) ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) fakeClock.Step(10 * time.Minute) @@ -441,6 +472,9 @@ var _ = Describe("Drift", func() { }) }) Context("Drift", func() { + BeforeEach(func() { + nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrifted) + }) It("should continue to the next drifted node if the first cannot reschedule all pods", func() { pod := test.Pod(test.PodOptions{ ResourceRequirements: corev1.ResourceRequirements{ @@ -952,6 +986,529 @@ var _ = Describe("Drift", func() { // Cascade any deletion of the nodeClaim to the node ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + // We should delete the nodeClaim that has drifted + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, nodeClaim, node) + }) + }) + Context("CloudProvider Disruption", func() { + var cloudproviderDisruptionReason v1.DisruptionReason + BeforeEach(func() { + cloudproviderDisruptionReason = cloudProvider.DisruptionReasons()[0] + nodeClaim.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) + }) + It("should continue to the next cloudprovider disruption node if the first cannot reschedule all pods", func() { + pod := test.Pod(test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("150"), + }, + }, + }) + podToExpire := test.Pod(test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + nodeClaim2.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) + ExpectApplied(ctx, env.Client, nodeClaim2, node2, podToExpire) + ExpectManualBinding(ctx, env.Client, podToExpire, node2) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node2}, []*v1.NodeClaim{nodeClaim2}) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) + + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + ExpectExists(ctx, env.Client, nodeClaim) + ExpectNotFound(ctx, env.Client, nodeClaim2) + }) + It("should ignore nodes without the cloudprovider disruption status condition", func() { + _ = nodeClaim.StatusConditions().Clear(string(cloudproviderDisruptionReason)) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes with the karpenter.sh/do-not-disrupt annotation", func() { + node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "true"}) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes that have pods with the karpenter.sh/do-not-disrupt annotation", func() { + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should drift nodes that have pods with the karpenter.sh/do-not-disrupt annotation when the NodePool's TerminationGracePeriod is not nil", func() { + nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1.DoNotDisruptAnnotationKey: "true", + }, + }, + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to create a replacement but not delete the old nodeclaim + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for drift + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should drift nodes that have pods with the blocking PDBs when the NodePool's TerminationGracePeriod is not nil", func() { + nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + podLabels := map[string]string{"test": "value"} + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, + }, + }) + budget := test.PodDisruptionBudget(test.PDBOptions{ + Labels: podLabels, + MaxUnavailable: fromInt(0), + }) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool, pod, budget) + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to create a replacement but not delete the old nodeclaim + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) // new nodeclaim is created for drift + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("should ignore nodes with the cloudprovider disrupted status condition set to false", func() { + nodeClaim.StatusConditions().SetFalse(string(cloudproviderDisruptionReason), "NotDisrupted", "NotDisrupted") + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + ExpectSingletonReconciled(ctx, disruptionController) + + // Expect to not create or delete more nodeclaims + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, nodeClaim) + }) + It("can delete cloudprovider disrupted nodes", func() { + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + // We should delete the nodeClaim that has drifted + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, nodeClaim, node) + }) + It("should disrupt all empty cloudprovider disrupted nodes in parallel", func() { + nodeClaims, nodes := test.NodeClaimsAndNodes(100, v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + for _, m := range nodeClaims { + m.StatusConditions().SetTrue(string(cloudproviderDisruptionReason)) + ExpectApplied(ctx, env.Client, m) + } + for _, n := range nodes { + ExpectApplied(ctx, env.Client, n) + } + ExpectApplied(ctx, env.Client, nodePool) + + // inform cluster state about nodes and nodeClaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims) + ExpectSingletonReconciled(ctx, disruptionController) + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims...) + + // Expect that the drifted nodeClaims are gone + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) + }) + It("can replace cloudprovider disrupted nodes", func() { + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}}) + + ExpectApplied(ctx, env.Client, rs, pod, nodeClaim, node, nodePool) + + // bind the pods to the node + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + // disruption won't delete the old nodeClaim until the new nodeClaim is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + ExpectNotFound(ctx, env.Client, nodeClaim, node) + + // Expect that the new nodeClaim was created and its different than the original + nodeclaims := ExpectNodeClaims(ctx, env.Client) + nodes := ExpectNodes(ctx, env.Client) + Expect(nodeclaims).To(HaveLen(1)) + Expect(nodes).To(HaveLen(1)) + Expect(nodeclaims[0].Name).ToNot(Equal(nodeClaim.Name)) + Expect(nodes[0].Name).ToNot(Equal(node.Name)) + }) + It("should untaint nodes when cloudprovider disrupted replacement fails", func() { + cloudProvider.AllowedCreateCalls = 0 // fail the replacement and expect it to untaint + + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pod := test.Pod(test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pod) + + // bind pods to node + ExpectManualBinding(ctx, env.Client, pod, node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + var wg sync.WaitGroup + ExpectNewNodeClaimsDeleted(ctx, env.Client, &wg, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Wait > 5 seconds for eventual consistency hack in orchestration.Queue + fakeClock.Step(5*time.Second + time.Nanosecond*1) + ExpectSingletonReconciled(ctx, queue) + // We should have tried to create a new nodeClaim but failed to do so; therefore, we untainted the existing node + node = ExpectExists(ctx, env.Client, node) + Expect(node.Spec.Taints).ToNot(ContainElement(v1.DisruptedNoScheduleTaint)) + }) + It("can replace cloudprovider disrupted nodes with multiple nodes", func() { + currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "current-on-demand", + Offerings: []cloudprovider.Offering{ + { + Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.5, + Available: false, + }, + }, + }) + replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "replacement-on-demand", + Offerings: []cloudprovider.Offering{ + { + Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.3, + Available: true, + }, + }, + Resources: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("3")}, + }) + cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{ + currentInstance, + replacementInstance, + } + + labels := map[string]string{ + "app": "test", + } + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed()) + + pods := test.Pods(3, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }}, + // Make each pod request about a third of the allocatable on the node + ResourceRequirements: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("2")}, + }, + }) + + nodeClaim.Labels = lo.Assign(nodeClaim.Labels, map[string]string{ + corev1.LabelInstanceTypeStable: currentInstance.Name, + v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), + }) + nodeClaim.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} + node.Labels = lo.Assign(node.Labels, map[string]string{ + corev1.LabelInstanceTypeStable: currentInstance.Name, + v1.CapacityTypeLabelKey: currentInstance.Offerings[0].Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: currentInstance.Offerings[0].Requirements.Get(corev1.LabelTopologyZone).Any(), + }) + node.Status.Allocatable = map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("8")} + + ExpectApplied(ctx, env.Client, rs, nodeClaim, node, nodePool, pods[0], pods[1], pods[2]) + + // bind the pods to the node + ExpectManualBinding(ctx, env.Client, pods[0], node) + ExpectManualBinding(ctx, env.Client, pods[1], node) + ExpectManualBinding(ctx, env.Client, pods[2], node) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 3) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + + // expect that drift provisioned three nodes, one for each pod + ExpectNotFound(ctx, env.Client, nodeClaim, node) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(3)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3)) + }) + It("should cloudprovider disrupt one non-empty node at a time, starting with the earliest drift", func() { + labels := map[string]string{ + "app": "test", + } + + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + + pods := test.Pods(2, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: lo.ToPtr(true), + BlockOwnerDeletion: lo.ToPtr(true), + }, + }, + }, + // Make each pod request only fit on a single node + ResourceRequirements: corev1.ResourceRequirements{ + Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("30")}, + }, + }) + + nodeClaim2, node2 := test.NodeClaimAndNode(v1.NodeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1.NodePoolLabelKey: nodePool.Name, + corev1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1.CapacityTypeLabelKey: mostExpensiveOffering.Requirements.Get(v1.CapacityTypeLabelKey).Any(), + corev1.LabelTopologyZone: mostExpensiveOffering.Requirements.Get(corev1.LabelTopologyZone).Any(), + }, + }, + Status: v1.NodeClaimStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("32")}, + }, + }) + nodeClaim2.Status.Conditions = append(nodeClaim2.Status.Conditions, status.Condition{ + Type: string(cloudproviderDisruptionReason), + Status: metav1.ConditionTrue, + Reason: string(cloudproviderDisruptionReason), + Message: string(cloudproviderDisruptionReason), + LastTransitionTime: metav1.Time{Time: time.Now().Add(-time.Hour)}, + }) + + ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], nodeClaim, node, nodeClaim2, node2, nodePool) + + // bind pods to node so that they're not empty and don't disrupt in parallel. + ExpectManualBinding(ctx, env.Client, pods[0], node) + ExpectManualBinding(ctx, env.Client, pods[1], node2) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node, node2}, []*v1.NodeClaim{nodeClaim, nodeClaim2}) + + // disruption won't delete the old node until the new node is ready + var wg sync.WaitGroup + ExpectMakeNewNodeClaimsReady(ctx, env.Client, &wg, cluster, cloudProvider, 1) + ExpectSingletonReconciled(ctx, disruptionController) + wg.Wait() + + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim, nodeClaim2) + + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2)) + Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(2)) + ExpectNotFound(ctx, env.Client, nodeClaim2, node2) + ExpectExists(ctx, env.Client, nodeClaim) + ExpectExists(ctx, env.Client, node) + }) + It("should delete nodes with the karpenter.sh/do-not-disrupt annotation set to false", func() { + node.Annotations = lo.Assign(node.Annotations, map[string]string{v1.DoNotDisruptAnnotationKey: "false"}) + ExpectApplied(ctx, env.Client, nodeClaim, node, nodePool) + + // inform cluster state about nodes and nodeclaims + ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{node}, []*v1.NodeClaim{nodeClaim}) + + fakeClock.Step(10 * time.Minute) + ExpectSingletonReconciled(ctx, disruptionController) + // Process the item so that the nodes can be deleted. + ExpectSingletonReconciled(ctx, queue) + // Cascade any deletion of the nodeClaim to the node + ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim) + // We should delete the nodeClaim that has drifted Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(0)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index b6aaa819d6..eb496f87b5 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -41,7 +41,6 @@ import ( pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" - "sigs.k8s.io/karpenter/pkg/metrics" operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging" ) @@ -237,14 +236,7 @@ func BuildDisruptionBudgets(ctx context.Context, cluster *state.Cluster, clk clo // Allowing this value to be negative breaks assumptions in the code used to calculate how many nodes can be disrupted. allowedDisruptions := lo.Clamp(minDisruptions-disrupting[nodePool.Name], 0, math.MaxInt32) disruptionBudgetMapping[nodePool.Name][reason] = allowedDisruptions - allowedDisruptionsTotal += allowedDisruptions - NodePoolAllowedDisruptions.With(map[string]string{ - metrics.NodePoolLabel: nodePool.Name, metrics.ReasonLabel: string(reason), - }).Set(float64(allowedDisruptions)) - if allowedDisruptions == 0 { - recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(lo.ToPtr(nodePool), reason)) - } } if allowedDisruptionsTotal == 0 { recorder.Publish(disruptionevents.NodePoolBlocked(lo.ToPtr(nodePool))) diff --git a/pkg/controllers/disruption/multinodeconsolidation.go b/pkg/controllers/disruption/multinodeconsolidation.go index 71995d69dc..eb1266149c 100644 --- a/pkg/controllers/disruption/multinodeconsolidation.go +++ b/pkg/controllers/disruption/multinodeconsolidation.go @@ -59,9 +59,13 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB disruptableCandidates := make([]*Candidate, 0, len(candidates)) constrainedByBudgets := false for _, candidate := range candidates { + // If a disruption reason is not defined, we will use the DisruptionReasonAll that covers all the disruption reasons + _, found := disruptionBudgetMapping[candidate.nodePool.Name][m.Reason()] + reason := lo.Ternary(found, m.Reason(), v1.DisruptionReasonAll) + // If there's disruptions allowed for the candidate's nodepool, // add it to the list of candidates, and decrement the budget. - if disruptionBudgetMapping[candidate.nodePool.Name][m.Reason()] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][reason] == 0 { constrainedByBudgets = true continue } @@ -73,7 +77,7 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB } // set constrainedByBudgets to true if any node was a candidate but was constrained by a budget disruptableCandidates = append(disruptableCandidates, candidate) - disruptionBudgetMapping[candidate.nodePool.Name][m.Reason()]-- + disruptionBudgetMapping[candidate.nodePool.Name][reason]-- } // Only consider a maximum batch of 100 NodeClaims to save on computation. diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index 4c858c6bf2..cb5d1b0a94 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/samber/lo" "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -51,12 +52,16 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption // Set a timeout timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) constrainedByBudgets := false + // binary search to find the maximum number of NodeClaims we can terminate for i, candidate := range candidates { + _, found := disruptionBudgetMapping[candidate.nodePool.Name][s.Reason()] + reason := lo.Ternary(found, s.Reason(), v1.DisruptionReasonAll) + // If the disruption budget doesn't allow this candidate to be disrupted, // continue to the next candidate. We don't need to decrement any budget // counter since single node consolidation commands can only have one candidate. - if disruptionBudgetMapping[candidate.nodePool.Name][s.Reason()] == 0 { + if disruptionBudgetMapping[candidate.nodePool.Name][reason] == 0 { constrainedByBudgets = true continue } diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 50ee9019d6..a97e810163 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -69,6 +69,7 @@ var nodeClaimStateController *informer.NodeClaimController var fakeClock *clock.FakeClock var recorder *test.EventRecorder var queue *orchestration.Queue +var wellKnownDisruptionReasons []v1.DisruptionReason var onDemandInstances []*cloudprovider.InstanceType var spotInstances []*cloudprovider.InstanceType @@ -147,6 +148,10 @@ var _ = BeforeEach(func() { }) leastExpensiveSpotInstance, mostExpensiveSpotInstance = spotInstances[0], spotInstances[len(spotInstances)-1] leastExpensiveSpotOffering, mostExpensiveSpotOffering = leastExpensiveSpotInstance.Offerings[0], mostExpensiveSpotInstance.Offerings[0] + wellKnownDisruptionReasons = append([]v1.DisruptionReason{ + v1.DisruptionReasonEmpty, + v1.DisruptionReasonUnderutilized, + v1.DisruptionReasonDrifted}, cloudProvider.DisruptionReasons()...) }) var _ = AfterEach(func() { @@ -632,7 +637,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() { budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) // This should not bring in the unmanaged node. - for reason := range v1.WellKnownDisruptionReasons { + for _, reason := range wellKnownDisruptionReasons { Expect(budgets[nodePool.Name][reason]).To(Equal(10)) } }) @@ -663,7 +668,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() { budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) // This should not bring in the uninitialized node. - for reason := range v1.WellKnownDisruptionReasons { + for _, reason := range wellKnownDisruptionReasons { Expect(budgets[nodePool.Name][reason]).To(Equal(10)) } }) @@ -684,7 +689,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() { budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - for reason := range v1.WellKnownDisruptionReasons { + for _, reason := range wellKnownDisruptionReasons { Expect(budgets[nodePool.Name][reason]).To(Equal(0)) } }) @@ -709,7 +714,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() { budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - for reason := range v1.WellKnownDisruptionReasons { + for _, reason := range wellKnownDisruptionReasons { Expect(budgets[nodePool.Name][reason]).To(Equal(8)) } }) @@ -730,7 +735,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() { budgets, err := disruption.BuildDisruptionBudgets(ctx, cluster, fakeClock, env.Client, recorder) Expect(err).To(Succeed()) - for reason := range v1.WellKnownDisruptionReasons { + for _, reason := range wellKnownDisruptionReasons { Expect(budgets[nodePool.Name][reason]).To(Equal(8)) } }) diff --git a/pkg/controllers/disruption/validation.go b/pkg/controllers/disruption/validation.go index 49004b2ecc..4eee248816 100644 --- a/pkg/controllers/disruption/validation.go +++ b/pkg/controllers/disruption/validation.go @@ -54,16 +54,17 @@ func IsValidationError(err error) bool { // of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to // skip the validation TTL for all but the first command. type Validation struct { - start time.Time - clock clock.Clock - cluster *state.Cluster - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider - provisioner *provisioning.Provisioner - once sync.Once - recorder events.Recorder - queue *orchestration.Queue - reason v1.DisruptionReason + start time.Time + clock clock.Clock + cluster *state.Cluster + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + provisioner *provisioning.Provisioner + once sync.Once + recorder events.Recorder + queue *orchestration.Queue + reason v1.DisruptionReason + wellKnownDisruptionReason []v1.DisruptionReason } func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, @@ -77,6 +78,10 @@ func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Cl recorder: recorder, queue: queue, reason: reason, + wellKnownDisruptionReason: append([]v1.DisruptionReason{ + v1.DisruptionReasonEmpty, + v1.DisruptionReasonUnderutilized, + v1.DisruptionReasonDrifted}, cp.DisruptionReasons()...), } } diff --git a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml index 3907c2cafd..ae6d14b27d 100644 --- a/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml +++ b/pkg/test/v1alpha1/crds/karpenter.test.sh_testnodeclasses.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.15.0 + controller-gen.kubebuilder.io/version: v0.16.1 name: testnodeclasses.karpenter.test.sh spec: group: karpenter.test.sh @@ -84,12 +84,7 @@ spec: - Unknown type: string type: - description: |- - type of condition in CamelCase or in foo.example.com/CamelCase. - --- - Many .condition.type values are consistent across resources like Available, but because arbitrary conditions can be - useful (see .node.status.conditions), the ability to deconflict is important. - The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + description: type of condition in CamelCase or in foo.example.com/CamelCase. maxLength: 316 pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ type: string diff --git a/test/suites/perf/disruption_test.go b/test/suites/perf/disruption_test.go new file mode 100644 index 0000000000..ef790af044 --- /dev/null +++ b/test/suites/perf/disruption_test.go @@ -0,0 +1,74 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package perf_test + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/test" +) + +var _ = Describe("Disruption", func() { + var replicas = 100 + It("should do simple provisioning and cloudprovider disruption", func() { + deployment := test.Deployment(test.DeploymentOptions{ + Replicas: int32(replicas), + PodOptions: test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{ + Labels: testLabels, + }, + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + }}) + env.ExpectCreated(deployment) + env.ExpectCreated(nodePool, nodeClass) + env.EventuallyExpectHealthyPodCount(labelSelector, replicas) + + env.TimeIntervalCollector.Start("KOWK Disruption") + nodeClaimList := &v1.NodeClaimList{} + Expect(env.Client.List(env, nodeClaimList, client.HasLabels{test.DiscoveryLabel})).To(Succeed()) + for i := range nodeClaimList.Items { + Expect(nodeClaimList.Items[i].StatusConditions().SetTrue("ExampleReason")).To(BeTrue()) + Expect(env.Client.Status().Update(env, &nodeClaimList.Items[i])).To(Succeed()) + } + + // Eventually expect one node to have an ExampleReason + Eventually(func(g Gomega) { + nodeClaims := &v1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaims, client.MatchingFields{"status.conditions[*].type": "ExampleReason"})).To(Succeed()) + g.Expect(len(nodeClaims.Items)).ToNot(Equal(0)) + }).WithTimeout(5 * time.Second).Should(Succeed()) + // Then eventually expect no node to have an ExampleReason + Eventually(func(g Gomega) { + nodeClaims := &v1.NodeClaimList{} + g.Expect(env.Client.List(env, nodeClaims, client.MatchingFields{"status.conditions[*].type": "ExampleReason"})).To(Succeed()) + g.Expect(len(nodeClaims.Items)).To(Equal(0)) + }).WithTimeout(3 * time.Minute).Should(Succeed()) + env.TimeIntervalCollector.End("KOWK Disruption") + }) +}) diff --git a/test/suites/perf/scheduling_test.go b/test/suites/perf/scheduling_test.go index 3ca77776f1..4a9e84fe39 100644 --- a/test/suites/perf/scheduling_test.go +++ b/test/suites/perf/scheduling_test.go @@ -33,9 +33,9 @@ import ( "sigs.k8s.io/karpenter/pkg/test" ) -var replicas int = 100 - var _ = Describe("Performance", func() { + var replicas = 100 + Context("Provisioning", func() { It("should do simple provisioning", func() { deployment := test.Deployment(test.DeploymentOptions{