From da2c2f5594a3704b2c67b5161205e50f037b2eb2 Mon Sep 17 00:00:00 2001 From: Nick Tran <10810510+njtran@users.noreply.github.com> Date: Wed, 16 Aug 2023 05:15:41 -0700 Subject: [PATCH] perf: Implement timeouts and Max-batch size for Consolidation (#472) --- .../deprovisioning/emptiness_test.go | 261 +++++++++++ pkg/controllers/deprovisioning/helpers.go | 1 - pkg/controllers/deprovisioning/metrics.go | 13 + .../multimachineconsolidation.go | 23 +- .../singlemachineconsolidation.go | 14 +- pkg/controllers/deprovisioning/suite_test.go | 428 +++++++++--------- pkg/controllers/provisioning/metrics.go | 2 +- .../provisioning/scheduling/metrics.go | 36 ++ .../provisioning/scheduling/scheduler.go | 2 + pkg/metrics/constants.go | 4 +- 10 files changed, 553 insertions(+), 231 deletions(-) create mode 100644 pkg/controllers/deprovisioning/emptiness_test.go create mode 100644 pkg/controllers/provisioning/scheduling/metrics.go diff --git a/pkg/controllers/deprovisioning/emptiness_test.go b/pkg/controllers/deprovisioning/emptiness_test.go new file mode 100644 index 000000000000..7afea7a401e2 --- /dev/null +++ b/pkg/controllers/deprovisioning/emptiness_test.go @@ -0,0 +1,261 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// nolint:gosec +package deprovisioning_test + +import ( + "sort" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/test" + . "github.com/aws/karpenter-core/pkg/test/expectations" +) + +var _ = Describe("Empty Nodes (TTLSecondsAfterEmpty)", func() { + var prov *v1alpha5.Provisioner + var machine *v1alpha5.Machine + var node *v1.Node + + BeforeEach(func() { + prov = test.Provisioner(test.ProvisionerOptions{ + TTLSecondsAfterEmpty: ptr.Int64(30), + }) + machine, node = test.MachineAndNode(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, + v1.LabelTopologyZone: mostExpensiveOffering.Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + machine.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) + }) + It("can delete empty nodes with TTLSecondsAfterEmpty", func() { + ExpectApplied(ctx, env.Client, prov, machine, node) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) + + fakeClock.Step(10 * time.Minute) + wg := sync.WaitGroup{} + ExpectTriggerVerifyAction(&wg) + ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine) + + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, machine, node) + }) + It("should ignore TTLSecondsAfterEmpty nodes without the empty status condition", func() { + _ = machine.StatusConditions().ClearCondition(v1alpha5.MachineEmpty) + ExpectApplied(ctx, env.Client, machine, node, prov) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) + + ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + + // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) + }) + It("should ignore TTLSecondsAfterEmpty nodes with the empty status condition set to false", func() { + machine.StatusConditions().MarkFalse(v1alpha5.MachineEmpty, "", "") + ExpectApplied(ctx, env.Client, machine, node, prov) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) + + fakeClock.Step(10 * time.Minute) + + ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + + // Expect to not create or delete more machines + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine) + }) +}) + +var _ = Describe("Empty Nodes (Consolidation)", func() { + var prov *v1alpha5.Provisioner + var machine1, machine2 *v1alpha5.Machine + var node1, node2 *v1.Node + + BeforeEach(func() { + prov = test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: ptr.Bool(true)}}) + machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, + v1.LabelTopologyZone: mostExpensiveOffering.Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + machine1.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) + machine2, node2 = test.MachineAndNode(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, + v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, + v1.LabelTopologyZone: mostExpensiveOffering.Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourcePods: resource.MustParse("100"), + }, + }, + }) + machine2.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) + }) + It("can delete empty nodes with consolidation", func() { + ExpectApplied(ctx, env.Client, machine1, node1, prov) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1}) + + fakeClock.Step(10 * time.Minute) + + var wg sync.WaitGroup + ExpectTriggerVerifyAction(&wg) + ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) + wg.Wait() + + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) + + // we should delete the empty node + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, machine1, node1) + }) + It("can delete multiple empty nodes with consolidation", func() { + ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2}) + + fakeClock.Step(10 * time.Minute) + wg := sync.WaitGroup{} + ExpectTriggerVerifyAction(&wg) + ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) + + // Cascade any deletion of the machine to the node + ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) + + // we should delete the empty nodes + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) + ExpectNotFound(ctx, env.Client, machine1) + ExpectNotFound(ctx, env.Client, machine2) + }) + It("considers pending pods when consolidating", func() { + largeTypes := lo.Filter(cloudProvider.InstanceTypes, func(item *cloudprovider.InstanceType, index int) bool { + return item.Capacity.Cpu().Cmp(resource.MustParse("64")) >= 0 + }) + sort.Slice(largeTypes, func(i, j int) bool { + return largeTypes[i].Offerings[0].Price < largeTypes[j].Offerings[0].Price + }) + + largeCheapType := largeTypes[0] + machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: largeCheapType.Name, + v1alpha5.LabelCapacityType: largeCheapType.Offerings[0].CapacityType, + v1.LabelTopologyZone: largeCheapType.Offerings[0].Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: *largeCheapType.Capacity.Cpu(), + v1.ResourcePods: *largeCheapType.Capacity.Pods(), + }, + }, + }) + + // there is a pending pod that should land on the node + pod := test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1"), + }, + }, + }) + unsched := test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("62"), + }, + }, + }) + + ExpectApplied(ctx, env.Client, machine1, node1, pod, unsched, prov) + + // bind one of the pods to the node + ExpectManualBinding(ctx, env.Client, pod, node1) + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1}) + + ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) + + // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large + // node to schedule, which prevents the large expensive node from being replaced + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) + Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) + ExpectExists(ctx, env.Client, machine1) + }) +}) diff --git a/pkg/controllers/deprovisioning/helpers.go b/pkg/controllers/deprovisioning/helpers.go index d1078b80ef61..9081880c7002 100644 --- a/pkg/controllers/deprovisioning/helpers.go +++ b/pkg/controllers/deprovisioning/helpers.go @@ -68,7 +68,6 @@ func filterCandidates(ctx context.Context, kubeClient client.Client, recorder ev //nolint:gocyclo func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, candidates ...*Candidate) (*pscheduling.Results, error) { - candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...) nodes := cluster.Nodes() deletingNodes := nodes.Deleting() diff --git a/pkg/controllers/deprovisioning/metrics.go b/pkg/controllers/deprovisioning/metrics.go index 77f31528f2f2..8399922d5358 100644 --- a/pkg/controllers/deprovisioning/metrics.go +++ b/pkg/controllers/deprovisioning/metrics.go @@ -32,6 +32,10 @@ const ( deprovisioningSubsystem = "deprovisioning" deprovisionerLabel = "deprovisioner" actionLabel = "action" + consolidationType = "consolidationType" + + multiMachineConsolidationLabelValue = "multi-machine" + singleMachineConsolidationLabelValue = "single-machine" ) var ( @@ -70,4 +74,13 @@ var ( }, []string{deprovisionerLabel}, ) + deprovisioningConsolidationTimeoutsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: deprovisioningSubsystem, + Name: "consolidation_timeouts", + Help: "Number of times the Consolidation algorithm has reached a timeout. Labeled by consolidationType.", + }, + []string{consolidationType}, + ) ) diff --git a/pkg/controllers/deprovisioning/multimachineconsolidation.go b/pkg/controllers/deprovisioning/multimachineconsolidation.go index e7134af02cfe..a9e1a56e4092 100644 --- a/pkg/controllers/deprovisioning/multimachineconsolidation.go +++ b/pkg/controllers/deprovisioning/multimachineconsolidation.go @@ -18,12 +18,15 @@ import ( "context" "fmt" "math" + "time" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/clock" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/samber/lo" + "github.com/aws/karpenter-core/pkg/cloudprovider" "github.com/aws/karpenter-core/pkg/controllers/provisioning" "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling" @@ -31,6 +34,8 @@ import ( "github.com/aws/karpenter-core/pkg/events" ) +const MultiMachineConsolidationTimeoutDuration = 1 * time.Minute + type MultiMachineConsolidation struct { consolidation } @@ -50,8 +55,10 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat } deprovisioningEligibleMachinesGauge.WithLabelValues(m.String()).Set(float64(len(candidates))) - // For now, we will consider up to every machine in the cluster, might be configurable in the future. - maxParallel := len(candidates) + // Only consider a maximum batch of 100 machines to save on computation. + // This could be further configurable in the future. + maxParallel := lo.Clamp(len(candidates), 0, 100) + cmd, err := m.firstNMachineConsolidationOption(ctx, candidates, maxParallel) if err != nil { return Command{}, err @@ -89,10 +96,20 @@ func (m *MultiMachineConsolidation) firstNMachineConsolidationOption(ctx context } lastSavedCommand := Command{} + // Set a timeout + timeout := m.clock.Now().Add(MultiMachineConsolidationTimeoutDuration) // binary search to find the maximum number of machines we can terminate for min <= max { + if m.clock.Now().After(timeout) { + deprovisioningConsolidationTimeoutsCounter.WithLabelValues(multiMachineConsolidationLabelValue).Inc() + if lastSavedCommand.candidates == nil { + logging.FromContext(ctx).Debugf("failed to find a multi-machine consolidation after timeout, last considered batch had %d machines", (min+max)/2) + } else { + logging.FromContext(ctx).Debugf("stopping multi-machine consolidation after timeout, returning last valid command %s", lastSavedCommand) + } + return lastSavedCommand, nil + } mid := (min + max) / 2 - candidatesToConsolidate := candidates[0 : mid+1] cmd, err := m.computeConsolidation(ctx, candidatesToConsolidate...) diff --git a/pkg/controllers/deprovisioning/singlemachineconsolidation.go b/pkg/controllers/deprovisioning/singlemachineconsolidation.go index 2491b223886e..396a2ea5be2e 100644 --- a/pkg/controllers/deprovisioning/singlemachineconsolidation.go +++ b/pkg/controllers/deprovisioning/singlemachineconsolidation.go @@ -17,6 +17,7 @@ package deprovisioning import ( "context" "fmt" + "time" "k8s.io/utils/clock" "knative.dev/pkg/logging" @@ -28,6 +29,8 @@ import ( "github.com/aws/karpenter-core/pkg/events" ) +const SingleMachineConsolidationTimeoutDuration = 3 * time.Minute + // SingleMachineConsolidation is the consolidation controller that performs single machine consolidation. type SingleMachineConsolidation struct { consolidation @@ -51,7 +54,16 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida deprovisioningEligibleMachinesGauge.WithLabelValues(c.String()).Set(float64(len(candidates))) v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder) - for _, candidate := range candidates { + + // Set a timeout + timeout := c.clock.Now().Add(SingleMachineConsolidationTimeoutDuration) + // binary search to find the maximum number of machines we can terminate + for i, candidate := range candidates { + if c.clock.Now().After(timeout) { + deprovisioningConsolidationTimeoutsCounter.WithLabelValues(singleMachineConsolidationLabelValue).Inc() + logging.FromContext(ctx).Debugf("abandoning single-machine consolidation due to timeout after evaluating %d candidates", i) + return Command{}, nil + } // compute a possible consolidation option cmd, err := c.computeConsolidation(ctx, candidate) if err != nil { diff --git a/pkg/controllers/deprovisioning/suite_test.go b/pkg/controllers/deprovisioning/suite_test.go index f1ad961373f0..a2f13467f63f 100644 --- a/pkg/controllers/deprovisioning/suite_test.go +++ b/pkg/controllers/deprovisioning/suite_test.go @@ -1838,230 +1838,6 @@ var _ = Describe("Topology Consideration", func() { }) }) -var _ = Describe("Empty Nodes (Consolidation)", func() { - var prov *v1alpha5.Provisioner - var machine1, machine2 *v1alpha5.Machine - var node1, node2 *v1.Node - - BeforeEach(func() { - prov = test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: ptr.Bool(true)}}) - machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: prov.Name, - v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, - v1.LabelTopologyZone: mostExpensiveOffering.Zone, - }, - }, - Status: v1alpha5.MachineStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("32"), - v1.ResourcePods: resource.MustParse("100"), - }, - }, - }) - machine1.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) - machine2, node2 = test.MachineAndNode(v1alpha5.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: prov.Name, - v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, - v1.LabelTopologyZone: mostExpensiveOffering.Zone, - }, - }, - Status: v1alpha5.MachineStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("32"), - v1.ResourcePods: resource.MustParse("100"), - }, - }, - }) - machine2.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) - }) - It("can delete empty nodes with consolidation", func() { - ExpectApplied(ctx, env.Client, machine1, node1, prov) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1}) - - fakeClock.Step(10 * time.Minute) - - var wg sync.WaitGroup - ExpectTriggerVerifyAction(&wg) - ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) - wg.Wait() - - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1) - - // we should delete the empty node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1, node1) - }) - It("can delete multiple empty nodes with consolidation", func() { - ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2}) - - fakeClock.Step(10 * time.Minute) - wg := sync.WaitGroup{} - ExpectTriggerVerifyAction(&wg) - ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2) - - // we should delete the empty nodes - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine1) - ExpectNotFound(ctx, env.Client, machine2) - }) - It("considers pending pods when consolidating", func() { - largeTypes := lo.Filter(cloudProvider.InstanceTypes, func(item *cloudprovider.InstanceType, index int) bool { - return item.Capacity.Cpu().Cmp(resource.MustParse("64")) >= 0 - }) - sort.Slice(largeTypes, func(i, j int) bool { - return largeTypes[i].Offerings[0].Price < largeTypes[j].Offerings[0].Price - }) - - largeCheapType := largeTypes[0] - machine1, node1 = test.MachineAndNode(v1alpha5.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: prov.Name, - v1.LabelInstanceTypeStable: largeCheapType.Name, - v1alpha5.LabelCapacityType: largeCheapType.Offerings[0].CapacityType, - v1.LabelTopologyZone: largeCheapType.Offerings[0].Zone, - }, - }, - Status: v1alpha5.MachineStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: *largeCheapType.Capacity.Cpu(), - v1.ResourcePods: *largeCheapType.Capacity.Pods(), - }, - }, - }) - - // there is a pending pod that should land on the node - pod := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("1"), - }, - }, - }) - unsched := test.UnschedulablePod(test.PodOptions{ - ResourceRequirements: v1.ResourceRequirements{ - Requests: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("62"), - }, - }, - }) - - ExpectApplied(ctx, env.Client, machine1, node1, pod, unsched, prov) - - // bind one of the pods to the node - ExpectManualBinding(ctx, env.Client, pod, node1) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1}) - - ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) - - // we don't need any new nodes and consolidation should notice the huge pending pod that needs the large - // node to schedule, which prevents the large expensive node from being replaced - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine1) - }) -}) - -var _ = Describe("Empty Nodes (TTLSecondsAfterEmpty)", func() { - var prov *v1alpha5.Provisioner - var machine *v1alpha5.Machine - var node *v1.Node - - BeforeEach(func() { - prov = test.Provisioner(test.ProvisionerOptions{ - TTLSecondsAfterEmpty: ptr.Int64(30), - }) - machine, node = test.MachineAndNode(v1alpha5.Machine{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - v1alpha5.ProvisionerNameLabelKey: prov.Name, - v1.LabelInstanceTypeStable: mostExpensiveInstance.Name, - v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType, - v1.LabelTopologyZone: mostExpensiveOffering.Zone, - }, - }, - Status: v1alpha5.MachineStatus{ - ProviderID: test.RandomProviderID(), - Allocatable: map[v1.ResourceName]resource.Quantity{ - v1.ResourceCPU: resource.MustParse("32"), - v1.ResourcePods: resource.MustParse("100"), - }, - }, - }) - machine.StatusConditions().MarkTrue(v1alpha5.MachineEmpty) - }) - It("can delete empty nodes with TTLSecondsAfterEmpty", func() { - ExpectApplied(ctx, env.Client, prov, machine, node) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) - - fakeClock.Step(10 * time.Minute) - wg := sync.WaitGroup{} - ExpectTriggerVerifyAction(&wg) - ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - - // Cascade any deletion of the machine to the node - ExpectMachinesCascadeDeletion(ctx, env.Client, machine) - - // we should delete the empty node - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0)) - ExpectNotFound(ctx, env.Client, machine, node) - }) - It("should ignore TTLSecondsAfterEmpty nodes without the empty status condition", func() { - _ = machine.StatusConditions().ClearCondition(v1alpha5.MachineEmpty) - ExpectApplied(ctx, env.Client, machine, node, prov) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) - - ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - - // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) - }) - It("should ignore TTLSecondsAfterEmpty nodes with the empty status condition set to false", func() { - machine.StatusConditions().MarkFalse(v1alpha5.MachineEmpty, "", "") - ExpectApplied(ctx, env.Client, machine, node, prov) - - // inform cluster state about nodes and machines - ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine}) - - fakeClock.Step(10 * time.Minute) - - ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{}) - - // Expect to not create or delete more machines - Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1)) - ExpectExists(ctx, env.Client, machine) - }) -}) - var _ = Describe("Consolidation TTL", func() { var prov *v1alpha5.Provisioner var machine1, machine2 *v1alpha5.Machine @@ -2171,6 +1947,7 @@ var _ = Describe("Consolidation TTL", func() { Eventually(fakeClock.HasWaiters, time.Second*10).Should(BeTrue()) // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) + // and the node should not be deleted yet ExpectExists(ctx, env.Client, node1) @@ -2298,6 +2075,7 @@ var _ = Describe("Consolidation TTL", func() { // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) + // controller should finish Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() @@ -2352,6 +2130,192 @@ var _ = Describe("Consolidation TTL", func() { }) }) +var _ = Describe("Consolidation Timeout", func() { + var prov *v1alpha5.Provisioner + BeforeEach(func() { + prov = test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: ptr.Bool(true)}}) + ctx = settings.ToContext(ctx, test.Settings(settings.Settings{DriftEnabled: false})) + }) + It("should return the last valid command when multi-machine consolidation times out", func() { + numNodes := 20 + labels := map[string]string{ + "app": "test", + } + machines, nodes := test.MachinesAndNodes(numNodes, v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: leastExpensiveInstance.Name, + v1alpha5.LabelCapacityType: leastExpensiveOffering.CapacityType, + v1.LabelTopologyZone: leastExpensiveOffering.Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourcePods: resource.MustParse("100"), + }, + }}, + ) + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + pods := test.Pods(numNodes, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(true), + }, + }}, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + // Make the resource requests small so that many nodes can be consolidated at once. + v1.ResourceCPU: resource.MustParse("10m"), + }, + }, + }) + + ExpectApplied(ctx, env.Client, rs, prov) + for _, machine := range machines { + ExpectApplied(ctx, env.Client, machine) + } + for _, node := range nodes { + ExpectApplied(ctx, env.Client, node) + } + for i, pod := range pods { + ExpectApplied(ctx, env.Client, pod) + ExpectManualBinding(ctx, env.Client, pod, nodes[i]) + } + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, nodes, machines) + + var wg sync.WaitGroup + wg.Add(1) + finished := atomic.Bool{} + go func() { + defer GinkgoRecover() + defer wg.Done() + defer finished.Store(true) + ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) + }() + + ExpectTriggerVerifyAction(&wg) + + // advance the clock so that the timeout expires + fakeClock.Step(deprovisioning.MultiMachineConsolidationTimeoutDuration) + + // wait for the controller to block on the validation timeout + Eventually(fakeClock.HasWaiters, time.Second*10).Should(BeTrue()) + // controller should be blocking during the timeout + Expect(finished.Load()).To(BeFalse()) + + // and the node should not be deleted yet + for i := range machines { + ExpectExists(ctx, env.Client, machines[i]) + } + + // controller should finish + Eventually(finished.Load, 10*time.Second).Should(BeTrue()) + wg.Wait() + + // should have at least two nodes deleted from multi machine consolidation + Expect(len(ExpectMachines(ctx, env.Client))).To(BeNumerically("<=", numNodes-2)) + }) + It("should exit single-machine consolidation if it times out", func() { + numNodes := 25 + labels := map[string]string{ + "app": "test", + } + machines, nodes := test.MachinesAndNodes(numNodes, v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + v1alpha5.ProvisionerNameLabelKey: prov.Name, + v1.LabelInstanceTypeStable: leastExpensiveInstance.Name, + v1alpha5.LabelCapacityType: leastExpensiveOffering.CapacityType, + v1.LabelTopologyZone: leastExpensiveOffering.Zone, + }, + }, + Status: v1alpha5.MachineStatus{ + Allocatable: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("32"), + v1.ResourcePods: resource.MustParse("100"), + }, + }}, + ) + // create our RS so we can link a pod to it + rs := test.ReplicaSet() + ExpectApplied(ctx, env.Client, rs) + pods := test.Pods(numNodes, test.PodOptions{ + ObjectMeta: metav1.ObjectMeta{Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: rs.Name, + UID: rs.UID, + Controller: ptr.Bool(true), + BlockOwnerDeletion: ptr.Bool(true), + }, + }}, + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + // Make the pods more than half of the allocatable so that only one machine can be done at any time + v1.ResourceCPU: resource.MustParse("20"), + }, + }, + }) + + ExpectApplied(ctx, env.Client, rs, prov) + for _, machine := range machines { + ExpectApplied(ctx, env.Client, machine) + } + for _, node := range nodes { + ExpectApplied(ctx, env.Client, node) + } + for i, pod := range pods { + ExpectApplied(ctx, env.Client, pod) + ExpectManualBinding(ctx, env.Client, pod, nodes[i]) + } + + // inform cluster state about nodes and machines + ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, nodes, machines) + + var wg sync.WaitGroup + wg.Add(1) + finished := atomic.Bool{} + go func() { + defer GinkgoRecover() + defer wg.Done() + defer finished.Store(true) + ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{}) + }() + + // ExpectTriggerVerifyAction so that we can try to compute some deprovisioning actions. + ExpectTimeoutCompleted(&wg) + + // advance the clock so that the timeout expires for multi-machine + fakeClock.Step(deprovisioning.MultiMachineConsolidationTimeoutDuration) + // advance the clock so that the timeout expires for single-machine + fakeClock.Step(deprovisioning.SingleMachineConsolidationTimeoutDuration) + + // advance the clock so that the timeout expires + fakeClock.Step(31 * time.Second) + + // controller should finish + Eventually(finished.Load, 10*time.Second).Should(BeTrue()) + wg.Wait() + + // should have no machines deleted from multi machine consolidation + Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(numNodes)) + }) +}) + var _ = Describe("Parallelization", func() { var prov *v1alpha5.Provisioner var machine *v1alpha5.Machine @@ -2747,6 +2711,7 @@ var _ = Describe("Multi-Node Consolidation", func() { // advance the clock so that the timeout expires fakeClock.Step(31 * time.Second) + // controller should finish Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() @@ -2873,6 +2838,7 @@ var _ = Describe("Multi-Node Consolidation", func() { Eventually(fakeClock.HasWaiters, time.Second*5).Should(BeTrue()) // controller should be blocking during the timeout Expect(finished.Load()).To(BeFalse()) + // and the node should not be deleted yet ExpectExists(ctx, env.Client, machine1) ExpectExists(ctx, env.Client, machine2) @@ -2969,6 +2935,20 @@ func ExpectTriggerVerifyAction(wg *sync.WaitGroup) { }() } +func ExpectTimeoutCompleted(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 10; i++ { + time.Sleep(250 * time.Millisecond) + if fakeClock.HasWaiters() { + break + } + } + fakeClock.Step(3 * time.Minute) + }() +} + // ExpectNewMachinesDeleted simulates the machines being created and then removed, similar to what would happen // during an ICE error on the created machine func ExpectNewMachinesDeleted(ctx context.Context, c client.Client, wg *sync.WaitGroup, numNewMachines int) { diff --git a/pkg/controllers/provisioning/metrics.go b/pkg/controllers/provisioning/metrics.go index dab1c3f88e7d..af0d97ab71fa 100644 --- a/pkg/controllers/provisioning/metrics.go +++ b/pkg/controllers/provisioning/metrics.go @@ -30,7 +30,7 @@ var schedulingDuration = prometheus.NewHistogram( Namespace: metrics.Namespace, Subsystem: "provisioner", Name: "scheduling_duration_seconds", - Help: "Duration of scheduling process in seconds. Broken down by provisioner and error.", + Help: "Duration of scheduling process in seconds.", Buckets: metrics.DurationBuckets(), }, ) diff --git a/pkg/controllers/provisioning/scheduling/metrics.go b/pkg/controllers/provisioning/scheduling/metrics.go new file mode 100644 index 000000000000..fee2f61f3ef7 --- /dev/null +++ b/pkg/controllers/provisioning/scheduling/metrics.go @@ -0,0 +1,36 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduling + +import ( + "github.com/prometheus/client_golang/prometheus" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/aws/karpenter-core/pkg/metrics" +) + +func init() { + crmetrics.Registry.MustRegister(schedulingSimulationDuration) +} + +var schedulingSimulationDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: "provisioner", + Name: "scheduling_simulation_duration_seconds", + Help: "Duration of scheduling simulations used for deprovisioning and provisioning in seconds.", + Buckets: metrics.DurationBuckets(), + }, +) diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index fa91166eacba..d792dbcf2825 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -31,6 +31,7 @@ import ( schedulingevents "github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling/events" "github.com/aws/karpenter-core/pkg/controllers/state" "github.com/aws/karpenter-core/pkg/events" + "github.com/aws/karpenter-core/pkg/metrics" "github.com/aws/karpenter-core/pkg/scheduling" "github.com/aws/karpenter-core/pkg/utils/pod" "github.com/aws/karpenter-core/pkg/utils/resources" @@ -136,6 +137,7 @@ func (r Results) PodSchedulingErrors() string { } func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) (*Results, error) { + defer metrics.Measure(schedulingSimulationDuration)() // We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few // issues including pods with affinity to another pod in the batch. We could topo-sort to solve this, but it wouldn't // solve the problem of scheduling pods where a particular order is needed to prevent a max-skew violation. E.g. if we diff --git a/pkg/metrics/constants.go b/pkg/metrics/constants.go index b1c739a70f4f..8dbc25dca497 100644 --- a/pkg/metrics/constants.go +++ b/pkg/metrics/constants.go @@ -42,8 +42,10 @@ const ( func DurationBuckets() []float64 { // Use same bucket thresholds as controller-runtime. // https://github.com/kubernetes-sigs/controller-runtime/blob/v0.10.0/pkg/internal/controller/metrics/metrics.go#L47-L48 + // Add in values larger than 60 for singleton controllers that do not have a timeout. return []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, - 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60} + 1.25, 1.5, 1.75, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 40, 50, 60, 120, 150, 300, 450, 600, + } } // Returns a map of summary objectives (quantile-error pairs)