Skip to content

Commit

Permalink
fix: add more visibility for when we can't consolidate (aws#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzneal authored Apr 13, 2023
1 parent 31975c4 commit efcfedb
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 62 deletions.
24 changes: 12 additions & 12 deletions pkg/controllers/deprovisioning/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *consolidation) ShouldDeprovision(_ context.Context, cn *Candidate) bool
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
defer metrics.Measure(deprovisioningDurationHistogram.WithLabelValues("Replace/Delete"))()
// Run scheduling simulation to compute consolidation option
newMachines, allPodsScheduled, err := simulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
results, err := simulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand All @@ -116,26 +116,26 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
}

// if not all of the pods were scheduled, we can't do anything
if !allPodsScheduled {
if !results.AllPodsScheduled() {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "not all pods would schedule")...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, results.PodSchedulingErrors())...)
}
return Command{action: actionDoNothing}, nil
}

// were we able to schedule all the pods on the inflight candidates?
if len(newMachines) == 0 {
if len(results.NewMachines) == 0 {
return Command{
candidates: candidates,
action: actionDelete,
}, nil
}

// we're not going to turn a single node into multiple candidates
if len(newMachines) != 1 {
if len(results.NewMachines) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(newMachines)))...)
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, fmt.Sprintf("can't remove without creating %d candidates", len(results.NewMachines)))...)
}
return Command{action: actionDoNothing}, nil
}
Expand All @@ -146,8 +146,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if err != nil {
return Command{}, fmt.Errorf("getting offering price from candidate node, %w", err)
}
newMachines[0].InstanceTypeOptions = filterByPrice(newMachines[0].InstanceTypeOptions, newMachines[0].Requirements, nodesPrice)
if len(newMachines[0].InstanceTypeOptions) == 0 {
results.NewMachines[0].InstanceTypeOptions = filterByPrice(results.NewMachines[0].InstanceTypeOptions, results.NewMachines[0].Requirements, nodesPrice)
if len(results.NewMachines[0].InstanceTypeOptions) == 0 {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace with a cheaper node")...)
}
Expand All @@ -166,7 +166,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
}

if allExistingAreSpot &&
newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
results.NewMachines[0].Requirements.Get(v1alpha5.LabelCapacityType).Has(v1alpha5.CapacityTypeSpot) {
if len(candidates) == 1 {
c.recorder.Publish(deprovisioningevents.Unconsolidatable(candidates[0].Node, "can't replace a spot node with a spot node")...)
}
Expand All @@ -177,15 +177,15 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// assumption, that the spot variant will launch. We also need to add a requirement to the node to ensure that if
// spot capacity is insufficient we don't replace the node with a more expensive on-demand node. Instead the launch
// should fail and we'll just leave the node alone.
ctReq := newMachines[0].Requirements.Get(v1alpha5.LabelCapacityType)
ctReq := results.NewMachines[0].Requirements.Get(v1alpha5.LabelCapacityType)
if ctReq.Has(v1alpha5.CapacityTypeSpot) && ctReq.Has(v1alpha5.CapacityTypeOnDemand) {
newMachines[0].Requirements.Add(scheduling.NewRequirement(v1alpha5.LabelCapacityType, v1.NodeSelectorOpIn, v1alpha5.CapacityTypeSpot))
results.NewMachines[0].Requirements.Add(scheduling.NewRequirement(v1alpha5.LabelCapacityType, v1.NodeSelectorOpIn, v1alpha5.CapacityTypeSpot))
}

return Command{
candidates: candidates,
action: actionReplace,
replacements: newMachines,
replacements: results.NewMachines,
}, nil
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/deprovisioning/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman

for _, candidate := range candidates {
// Check if we need to create any machines.
newMachines, allPodsScheduled, err := simulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate)
results, err := simulateScheduling(ctx, d.kubeClient, d.cluster, d.provisioner, candidate)
if err != nil {
// if a candidate machine is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand All @@ -74,10 +74,10 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman
return Command{}, err
}
// Log when all pods can't schedule, as the command will get executed immediately.
if !allPodsScheduled {
logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods")
if !results.AllPodsScheduled() {
logging.FromContext(ctx).With("node", candidate.Node.Name).Debug("Continuing to terminate drifted machine after scheduling simulation failed to schedule all pods %s", results.PodSchedulingErrors())
}
if len(newMachines) == 0 {
if len(results.NewMachines) == 0 {
return Command{
candidates: []*Candidate{candidate},
action: actionDelete,
Expand All @@ -86,7 +86,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, nodes ...*Candidate) (Comman
return Command{
candidates: []*Candidate{candidate},
action: actionReplace,
replacements: newMachines,
replacements: results.NewMachines,
}, nil
}
return Command{action: actionDoNothing}, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/deprovisioning/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C

for _, candidate := range candidates {
// Check if we need to create any nodes.
newMachines, allPodsScheduled, err := simulateScheduling(ctx, e.kubeClient, e.cluster, e.provisioner, candidate)
results, err := simulateScheduling(ctx, e.kubeClient, e.cluster, e.provisioner, candidate)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
Expand All @@ -95,16 +95,16 @@ func (e *Expiration) ComputeCommand(ctx context.Context, nodes ...*Candidate) (C
return Command{}, err
}
// Log when all pods can't schedule, as the command will get executed immediately.
if !allPodsScheduled {
logging.FromContext(ctx).With("node", candidate.Name).Debugf("continuing to expire node after scheduling simulation failed to schedule all pods")
if !results.AllPodsScheduled() {
logging.FromContext(ctx).With("node", candidate.Name).Debugf("continuing to expire node after scheduling simulation failed to schedule all pods, %s", results.PodSchedulingErrors())
}

logging.FromContext(ctx).With("ttl", time.Duration(ptr.Int64Value(candidates[0].provisioner.Spec.TTLSecondsUntilExpired))*time.Second).
With("delay", time.Since(node.GetExpirationTime(candidates[0].Node, candidates[0].provisioner))).Infof("triggering termination for expired node after TTL")
return Command{
candidates: []*Candidate{candidate},
action: actionReplace,
replacements: newMachines,
replacements: results.NewMachines,
}, nil
}
return Command{action: actionDoNothing}, nil
Expand Down
31 changes: 13 additions & 18 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func filterCandidates(ctx context.Context, kubeClient client.Client, recorder ev

//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidates ...*Candidate) (newMachines []*pscheduling.Machine, allPodsScheduled bool, err error) {
candidates ...*Candidate) (*pscheduling.Results, error) {

candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...)
nodes := cluster.Nodes()
Expand All @@ -81,18 +81,18 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
if _, ok := lo.Find(deletingNodes, func(n *state.StateNode) bool {
return candidateNames.Has(n.Name())
}); ok {
return nil, false, errCandidateDeleting
return nil, errCandidateDeleting
}

// We get the pods that are on nodes that are deleting
deletingNodePods, err := deletingNodes.Pods(ctx, kubeClient)
if err != nil {
return nil, false, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
return nil, fmt.Errorf("failed to get pods from deleting nodes, %w", err)
}
// start by getting all pending pods
pods, err := provisioner.GetPendingPods(ctx)
if err != nil {
return nil, false, fmt.Errorf("determining pending pods, %w", err)
return nil, fmt.Errorf("determining pending pods, %w", err)
}

for _, n := range candidates {
Expand All @@ -104,31 +104,26 @@ func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
})

if err != nil {
return nil, false, fmt.Errorf("creating scheduler, %w", err)
return nil, fmt.Errorf("creating scheduler, %w", err)
}

newMachines, ifn, err := scheduler.Solve(ctx, pods)
results, err := scheduler.Solve(ctx, pods)
if err != nil {
return nil, false, fmt.Errorf("simulating scheduling, %w", err)
}

podsScheduled := 0
for _, n := range newMachines {
podsScheduled += len(n.Pods)
}
for _, n := range ifn {
podsScheduled += len(n.Pods)
return nil, fmt.Errorf("simulating scheduling, %w", err)
}

// check if the scheduling relied on an existing node that isn't ready yet, if so we fail
// to schedule since we want to assume that we can delete a node and its pods will immediately
// move to an existing node which won't occur if that node isn't ready.
for _, n := range ifn {
for _, n := range results.ExistingNodes {
if !n.Initialized() {
return nil, false, nil
for _, p := range n.Pods {
results.PodErrors[p] = fmt.Errorf("would schedule against a non-initialized node %s", n.Name())
}
}
}
return newMachines, podsScheduled == len(pods), nil

return results, nil
}

// instanceTypesAreSubset returns true if the lhs slice of instance types are a subset of the rhs.
Expand Down
51 changes: 49 additions & 2 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -67,6 +68,7 @@ var cloudProvider *fake.CloudProvider
var nodeStateController controller.Controller
var machineStateController controller.Controller
var fakeClock *clock.FakeClock
var fakeRecorder *record.FakeRecorder
var onDemandInstances []*cloudprovider.InstanceType
var mostExpensiveInstance *cloudprovider.InstanceType
var mostExpensiveOffering cloudprovider.Offering
Expand All @@ -84,10 +86,11 @@ var _ = BeforeSuite(func() {
ctx = settings.ToContext(ctx, test.Settings(settings.Settings{DriftEnabled: true}))
cloudProvider = fake.NewCloudProvider()
fakeClock = clock.NewFakeClock(time.Now())
fakeRecorder = record.NewFakeRecorder(500)
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeStateController = informer.NewNodeController(env.Client, cluster)
machineStateController = informer.NewMachineController(env.Client, cluster)
provisioner = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
provisioner = provisioning.NewProvisioner(env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(fakeRecorder), cloudProvider, cluster)
})

var _ = AfterSuite(func() {
Expand All @@ -98,6 +101,7 @@ var _ = BeforeEach(func() {
cloudProvider.CreateCalls = nil
cloudProvider.InstanceTypes = fake.InstanceTypesAssorted()
cloudProvider.AllowedCreateCalls = math.MaxInt
fakeRecorder.Events = make(chan string, 500) // clear the events
onDemandInstances = lo.Filter(cloudProvider.InstanceTypes, func(i *cloudprovider.InstanceType, _ int) bool {
for _, o := range i.Offerings.Available() {
if o.CapacityType == v1alpha5.CapacityTypeOnDemand {
Expand All @@ -121,7 +125,7 @@ var _ = BeforeEach(func() {
}
fakeClock.SetTime(time.Now())
cluster.SetConsolidated(false)
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, events.NewRecorder(fakeRecorder), cluster)

// Reset Feature Flags to test defaults
ctx = settings.ToContext(ctx, test.Settings(settings.Settings{DriftEnabled: true}))
Expand Down Expand Up @@ -1052,6 +1056,49 @@ var _ = Describe("Delete Node", func() {
// and will not be recreated
ExpectNotFound(ctx, env.Client, node2)
})
It("won't delete node if it would require pods to schedule on an un-initialized node", func() {
labels := map[string]string{
"app": "test",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
pods := test.Pods(3, test.PodOptions{
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
}}})
ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], pods[2], machine1, node1, machine2, node2, prov)

// bind pods to node
ExpectManualBinding(ctx, env.Client, pods[0], node1)
ExpectManualBinding(ctx, env.Client, pods[1], node1)
ExpectManualBinding(ctx, env.Client, pods[2], node2)

// inform cluster state about nodes and machines, intentionally leaving node1 as not ready
ExpectMakeReadyAndStateUpdated(ctx, env.Client, nodeStateController, machineStateController, []*v1.Node{node2}, []*v1alpha5.Machine{machine1, machine2})

fakeClock.Step(10 * time.Minute)

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
ExpectReconcileSucceeded(ctx, deprovisioningController, client.ObjectKey{})
wg.Wait()

// shouldn't delete the node
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(2))
Expect(fakeRecorder.Events).To(HaveLen(1))
event := <-fakeRecorder.Events
Expect(strings.Contains(event, "not all pods would schedule")).To(BeTrue())
Expect(strings.Contains(event, "would schedule against a non-initialized node")).To(BeTrue())
})
})

var _ = Describe("Node Lifetime Consideration", func() {
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/deprovisioning/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
return false, nil
}

newMachines, allPodsScheduled, err := simulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
results, err := simulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
if err != nil {
return false, fmt.Errorf("simluating scheduling, %w", err)
}
if !allPodsScheduled {
if !results.AllPodsScheduled() {
return false, nil
}

Expand All @@ -138,7 +138,7 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
// len(newMachines) > 1, something in the cluster changed so that the candidates we were going to delete can no longer
// be deleted without producing more than one machine
// len(newMachines) == 1, as long as the machine looks like what we were expecting, this is valid
if len(newMachines) == 0 {
if len(results.NewMachines) == 0 {
if len(cmd.replacements) == 0 {
// scheduling produced zero new machines and we weren't expecting any, so this is valid.
return true, nil
Expand All @@ -149,7 +149,7 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
}

// we need more than one replacement machine which is never valid currently (all of our node replacement is m->1, never m->n)
if len(newMachines) > 1 {
if len(results.NewMachines) > 1 {
return false, nil
}

Expand All @@ -170,7 +170,7 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
// a 4xlarge and replace it with a 2xlarge. If things have changed and the scheduling simulation we just performed
// now says that we need to launch a 4xlarge. It's still launching the correct number of machines, but it's just
// as expensive or possibly more so we shouldn't validate.
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, newMachines[0].InstanceTypeOptions) {
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewMachines[0].InstanceTypeOptions) {
return false, nil
}

Expand Down
Loading

0 comments on commit efcfedb

Please sign in to comment.