Skip to content

Commit

Permalink
perf: Implement timeouts and Max-batch size for Consolidation (aws#472)
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran authored Aug 16, 2023
1 parent 291e6b4 commit da2c2f5
Show file tree
Hide file tree
Showing 10 changed files with 553 additions and 231 deletions.
261 changes: 261 additions & 0 deletions pkg/controllers/deprovisioning/emptiness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// nolint:gosec
package deprovisioning_test

import (
"sort"
"sync"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/test"
. "github.com/aws/karpenter-core/pkg/test/expectations"
)

var _ = Describe("Empty Nodes (TTLSecondsAfterEmpty)", func() {
var prov *v1alpha5.Provisioner
var machine *v1alpha5.Machine
var node *v1.Node

BeforeEach(func() {
prov = test.Provisioner(test.ProvisionerOptions{
TTLSecondsAfterEmpty: ptr.Int64(30),
})
machine, node = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType,
v1.LabelTopologyZone: mostExpensiveOffering.Zone,
},
},
Status: v1alpha5.MachineStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("32"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
machine.StatusConditions().MarkTrue(v1alpha5.MachineEmpty)
})
It("can delete empty nodes with TTLSecondsAfterEmpty", func() {
ExpectApplied(ctx, env.Client, prov, machine, node)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine)

// we should delete the empty node
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, machine, node)
})
It("should ignore TTLSecondsAfterEmpty nodes without the empty status condition", func() {
_ = machine.StatusConditions().ClearCondition(v1alpha5.MachineEmpty)
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
It("should ignore TTLSecondsAfterEmpty nodes with the empty status condition set to false", func() {
machine.StatusConditions().MarkFalse(v1alpha5.MachineEmpty, "", "")
ExpectApplied(ctx, env.Client, machine, node, prov)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node}, []*v1alpha5.Machine{machine})

fakeClock.Step(10 * time.Minute)

ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Expect to not create or delete more machines
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine)
})
})

var _ = Describe("Empty Nodes (Consolidation)", func() {
var prov *v1alpha5.Provisioner
var machine1, machine2 *v1alpha5.Machine
var node1, node2 *v1.Node

BeforeEach(func() {
prov = test.Provisioner(test.ProvisionerOptions{Consolidation: &v1alpha5.Consolidation{Enabled: ptr.Bool(true)}})
machine1, node1 = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType,
v1.LabelTopologyZone: mostExpensiveOffering.Zone,
},
},
Status: v1alpha5.MachineStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("32"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
machine1.StatusConditions().MarkTrue(v1alpha5.MachineEmpty)
machine2, node2 = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: mostExpensiveInstance.Name,
v1alpha5.LabelCapacityType: mostExpensiveOffering.CapacityType,
v1.LabelTopologyZone: mostExpensiveOffering.Zone,
},
},
Status: v1alpha5.MachineStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("32"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
machine2.StatusConditions().MarkTrue(v1alpha5.MachineEmpty)
})
It("can delete empty nodes with consolidation", func() {
ExpectApplied(ctx, env.Client, machine1, node1, prov)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1})

fakeClock.Step(10 * time.Minute)

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
wg.Wait()

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine1)

// we should delete the empty node
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, machine1, node1)
})
It("can delete multiple empty nodes with consolidation", func() {
ExpectApplied(ctx, env.Client, machine1, node1, machine2, node2, prov)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1, node2}, []*v1alpha5.Machine{machine1, machine2})

fakeClock.Step(10 * time.Minute)
wg := sync.WaitGroup{}
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, types.NamespacedName{})

// Cascade any deletion of the machine to the node
ExpectMachinesCascadeDeletion(ctx, env.Client, machine1, machine2)

// we should delete the empty nodes
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(0))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(0))
ExpectNotFound(ctx, env.Client, machine1)
ExpectNotFound(ctx, env.Client, machine2)
})
It("considers pending pods when consolidating", func() {
largeTypes := lo.Filter(cloudProvider.InstanceTypes, func(item *cloudprovider.InstanceType, index int) bool {
return item.Capacity.Cpu().Cmp(resource.MustParse("64")) >= 0
})
sort.Slice(largeTypes, func(i, j int) bool {
return largeTypes[i].Offerings[0].Price < largeTypes[j].Offerings[0].Price
})

largeCheapType := largeTypes[0]
machine1, node1 = test.MachineAndNode(v1alpha5.Machine{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: prov.Name,
v1.LabelInstanceTypeStable: largeCheapType.Name,
v1alpha5.LabelCapacityType: largeCheapType.Offerings[0].CapacityType,
v1.LabelTopologyZone: largeCheapType.Offerings[0].Zone,
},
},
Status: v1alpha5.MachineStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: *largeCheapType.Capacity.Cpu(),
v1.ResourcePods: *largeCheapType.Capacity.Pods(),
},
},
})

// there is a pending pod that should land on the node
pod := test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
},
},
})
unsched := test.UnschedulablePod(test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("62"),
},
},
})

ExpectApplied(ctx, env.Client, machine1, node1, pod, unsched, prov)

// bind one of the pods to the node
ExpectManualBinding(ctx, env.Client, pod, node1)

// inform cluster state about nodes and machines
ExpectMakeInitializedAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node1}, []*v1alpha5.Machine{machine1})

ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})

// we don't need any new nodes and consolidation should notice the huge pending pod that needs the large
// node to schedule, which prevents the large expensive node from being replaced
Expect(ExpectMachines(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, machine1)
})
})
1 change: 0 additions & 1 deletion pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func filterCandidates(ctx context.Context, kubeClient client.Client, recorder ev
//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidates ...*Candidate) (*pscheduling.Results, error) {

candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
nodes := cluster.Nodes()
deletingNodes := nodes.Deleting()
Expand Down
13 changes: 13 additions & 0 deletions pkg/controllers/deprovisioning/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
deprovisioningSubsystem = "deprovisioning"
deprovisionerLabel = "deprovisioner"
actionLabel = "action"
consolidationType = "consolidationType"

multiMachineConsolidationLabelValue = "multi-machine"
singleMachineConsolidationLabelValue = "single-machine"
)

var (
Expand Down Expand Up @@ -70,4 +74,13 @@ var (
},
[]string{deprovisionerLabel},
)
deprovisioningConsolidationTimeoutsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: deprovisioningSubsystem,
Name: "consolidation_timeouts",
Help: "Number of times the Consolidation algorithm has reached a timeout. Labeled by consolidationType.",
},
[]string{consolidationType},
)
)
23 changes: 20 additions & 3 deletions pkg/controllers/deprovisioning/multimachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ import (
"context"
"fmt"
"math"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/clock"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/samber/lo"

"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/provisioning/scheduling"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/events"
)

const MultiMachineConsolidationTimeoutDuration = 1 * time.Minute

type MultiMachineConsolidation struct {
consolidation
}
Expand All @@ -50,8 +55,10 @@ func (m *MultiMachineConsolidation) ComputeCommand(ctx context.Context, candidat
}
deprovisioningEligibleMachinesGauge.WithLabelValues(m.String()).Set(float64(len(candidates)))

// For now, we will consider up to every machine in the cluster, might be configurable in the future.
maxParallel := len(candidates)
// Only consider a maximum batch of 100 machines to save on computation.
// This could be further configurable in the future.
maxParallel := lo.Clamp(len(candidates), 0, 100)

cmd, err := m.firstNMachineConsolidationOption(ctx, candidates, maxParallel)
if err != nil {
return Command{}, err
Expand Down Expand Up @@ -89,10 +96,20 @@ func (m *MultiMachineConsolidation) firstNMachineConsolidationOption(ctx context
}

lastSavedCommand := Command{}
// Set a timeout
timeout := m.clock.Now().Add(MultiMachineConsolidationTimeoutDuration)
// binary search to find the maximum number of machines we can terminate
for min <= max {
if m.clock.Now().After(timeout) {
deprovisioningConsolidationTimeoutsCounter.WithLabelValues(multiMachineConsolidationLabelValue).Inc()
if lastSavedCommand.candidates == nil {
logging.FromContext(ctx).Debugf("failed to find a multi-machine consolidation after timeout, last considered batch had %d machines", (min+max)/2)
} else {
logging.FromContext(ctx).Debugf("stopping multi-machine consolidation after timeout, returning last valid command %s", lastSavedCommand)
}
return lastSavedCommand, nil
}
mid := (min + max) / 2

candidatesToConsolidate := candidates[0 : mid+1]

cmd, err := m.computeConsolidation(ctx, candidatesToConsolidate...)
Expand Down
14 changes: 13 additions & 1 deletion pkg/controllers/deprovisioning/singlemachineconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package deprovisioning
import (
"context"
"fmt"
"time"

"k8s.io/utils/clock"
"knative.dev/pkg/logging"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/aws/karpenter-core/pkg/events"
)

const SingleMachineConsolidationTimeoutDuration = 3 * time.Minute

// SingleMachineConsolidation is the consolidation controller that performs single machine consolidation.
type SingleMachineConsolidation struct {
consolidation
Expand All @@ -51,7 +54,16 @@ func (c *SingleMachineConsolidation) ComputeCommand(ctx context.Context, candida
deprovisioningEligibleMachinesGauge.WithLabelValues(c.String()).Set(float64(len(candidates)))

v := NewValidation(consolidationTTL, c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder)
for _, candidate := range candidates {

// Set a timeout
timeout := c.clock.Now().Add(SingleMachineConsolidationTimeoutDuration)
// binary search to find the maximum number of machines we can terminate
for i, candidate := range candidates {
if c.clock.Now().After(timeout) {
deprovisioningConsolidationTimeoutsCounter.WithLabelValues(singleMachineConsolidationLabelValue).Inc()
logging.FromContext(ctx).Debugf("abandoning single-machine consolidation due to timeout after evaluating %d candidates", i)
return Command{}, nil
}
// compute a possible consolidation option
cmd, err := c.computeConsolidation(ctx, candidate)
if err != nil {
Expand Down
Loading

0 comments on commit da2c2f5

Please sign in to comment.