Skip to content

Commit

Permalink
test: Alter tracking bindings in testing (ExpectProvisioned, etc.) (a…
Browse files Browse the repository at this point in the history
…ws#193)

* Alter tracking bindings in testing

* Update provisioning in testing across all files
  • Loading branch information
jonathan-innis authored Feb 1, 2023
1 parent ad440c7 commit 9766dd1
Show file tree
Hide file tree
Showing 21 changed files with 1,037 additions and 1,098 deletions.
6 changes: 3 additions & 3 deletions pkg/controllers/deprovisioning/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

//nolint:gocyclo
func simulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner,
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Node, allPodsScheduled bool, err error) {
candidateNodes ...CandidateNode) (newNodes []*pscheduling.Machine, allPodsScheduled bool, err error) {

candidateNodeNames := sets.NewString(lo.Map(candidateNodes, func(t CandidateNode, i int) string { return t.Name })...)
nodes := cluster.Nodes()
Expand Down Expand Up @@ -198,7 +198,7 @@ func candidateNodes(ctx context.Context, cluster *state.Cluster, kubeClient clie
}

// Skip nodes that aren't initialized
// This also means that the real Node doesn't exist for it
// This also means that the real Machine doesn't exist for it
if !n.Initialized() {
return true
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func clamp(min, val, max float64) float64 {
return val
}

// mapNodes maps from a list of *v1.Node to candidateNode
// mapNodes maps from a list of *v1.Machine to candidateNode
func mapNodes(nodes []*v1.Node, candidateNodes []CandidateNode) []CandidateNode {
verifyNodeNames := sets.NewString(lo.Map(nodes, func(t *v1.Node, i int) string { return t.Name })...)
var ret []CandidateNode
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *MultiNodeConsolidation) firstNNodeConsolidationOption(ctx context.Conte
// This code sees that t3a.small is the cheapest type in both lists and filters it and anything more expensive out
// leaving the valid consolidation:
// nodes=[t3a.2xlarge, t3a.2xlarge, t3a.small] -> 1 of t3a.nano
func filterOutSameType(newNode *scheduling.Node, consolidate []CandidateNode) []*cloudprovider.InstanceType {
func filterOutSameType(newNode *scheduling.Machine, consolidate []CandidateNode) []*cloudprovider.InstanceType {
existingInstanceTypes := sets.NewString()
nodePricesByInstanceType := map[string]float64{}

Expand Down
23 changes: 10 additions & 13 deletions pkg/controllers/deprovisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
Expand All @@ -47,6 +48,7 @@ import (
"github.com/aws/karpenter-core/pkg/controllers/provisioning"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/controllers/state/informer"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
"github.com/aws/karpenter-core/pkg/test"
Expand All @@ -60,7 +62,6 @@ var deprovisioningController *deprovisioning.Controller
var provisioningController controller.Controller
var provisioner *provisioning.Provisioner
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var nodeStateController controller.Controller
var fakeClock *clock.FakeClock
var onDemandInstances []*cloudprovider.InstanceType
Expand All @@ -82,9 +83,8 @@ var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeStateController = informer.NewNodeController(env.Client, cluster)
recorder = test.NewEventRecorder()
provisioner = provisioning.NewProvisioner(ctx, env.Client, env.KubernetesInterface.CoreV1(), recorder, cloudProvider, cluster)
provisioningController = provisioning.NewController(env.Client, provisioner, recorder)
provisioner = provisioning.NewProvisioner(ctx, env.Client, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
provisioningController = provisioning.NewController(env.Client, provisioner, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -122,13 +122,12 @@ var _ = BeforeEach(func() {
mostExpensiveInstance = onDemandInstances[len(onDemandInstances)-1]
mostExpensiveOffering = mostExpensiveInstance.Offerings[0]

recorder.Reset()
// ensure any waiters on our clock are allowed to proceed before resetting our clock time
for fakeClock.HasWaiters() {
fakeClock.Step(1 * time.Minute)
}
fakeClock.SetTime(time.Now())
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, recorder, cluster)
deprovisioningController = deprovisioning.NewController(fakeClock, env.Client, provisioner, cloudProvider, events.NewRecorder(&record.FakeRecorder{}), cluster)
// Reset Feature Flags to test defaults
ctx = settings.ToContext(ctx, test.Settings(test.SettingsOptions{DriftEnabled: true}))
})
Expand Down Expand Up @@ -1793,11 +1792,12 @@ var _ = Describe("Parallelization", func() {
}, time.Second*10).Should(Succeed())
wg.Wait()
// Add a new pending pod that should schedule while node is not yet deleted
pods := ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, test.UnschedulablePod())
pod = test.UnschedulablePod()
ExpectProvisioned(ctx, env.Client, cluster, provisioner, pod)
nodes := &v1.NodeList{}
Expect(env.Client.List(ctx, nodes)).To(Succeed())
Expect(len(nodes.Items)).To(Equal(2))
Expect(pods[0].Spec.NodeName).NotTo(Equal(node.Name))
ExpectScheduled(ctx, env.Client, pod)
})
It("should not consolidate a node that is launched for pods on a deleting node", func() {
labels := map[string]string{
Expand Down Expand Up @@ -1838,7 +1838,7 @@ var _ = Describe("Parallelization", func() {
pods = append(pods, pod)
}
ExpectApplied(ctx, env.Client, rs, prov)
ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)
ExpectProvisionedNoBinding(ctx, env.Client, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)

nodeList := &v1.NodeList{}
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
Expand All @@ -1847,13 +1847,10 @@ var _ = Describe("Parallelization", func() {
// Update cluster state with new node
ExpectReconcileSucceeded(ctx, nodeStateController, client.ObjectKeyFromObject(&nodeList.Items[0]))

// Reset the bindings so we can re-record bindings
recorder.ResetBindings()

// Mark the node for deletion and re-trigger reconciliation
oldNodeName := nodeList.Items[0].Name
cluster.MarkForDeletion(nodeList.Items[0].Name)
ExpectProvisionedNoBinding(ctx, env.Client, provisioningController, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)
ExpectProvisionedNoBinding(ctx, env.Client, provisioner, lo.Map(pods, func(p *v1.Pod, _ int) *v1.Pod { return p.DeepCopy() })...)

// Make sure that the cluster state is aware of the current node state
Expect(env.Client.List(ctx, nodeList)).To(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/deprovisioning/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (a action) String() string {
type Command struct {
nodesToRemove []*v1.Node
action action
replacementNodes []*scheduling.Node
replacementNodes []*scheduling.Machine
}

func (o Command) String() string {
Expand Down
48 changes: 46 additions & 2 deletions pkg/controllers/inflightchecks/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package inflightchecks_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -47,7 +48,7 @@ var inflightController controller.Controller
var env *test.Environment
var fakeClock *clock.FakeClock
var cp *fake.CloudProvider
var recorder *test.EventRecorder
var recorder *FakeEventRecorder

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -60,7 +61,7 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...))
ctx = settings.ToContext(ctx, test.Settings())
cp = &fake.CloudProvider{}
recorder = test.NewEventRecorder()
recorder = NewFakeEventRecorder()
inflightController = inflightchecks.NewController(fakeClock, env.Client, recorder, cp)
})

Expand Down Expand Up @@ -183,6 +184,49 @@ var _ = Describe("Controller", func() {
})
})

var _ events.Recorder = (*FakeEventRecorder)(nil)

// FakeEventRecorder is a mock event recorder that is used to facilitate testing.
type FakeEventRecorder struct {
mu sync.RWMutex
calls map[string]int
events []events.Event
}

func NewFakeEventRecorder() *FakeEventRecorder {
return &FakeEventRecorder{
calls: map[string]int{},
}
}

func (e *FakeEventRecorder) Publish(evt events.Event) {
e.mu.Lock()
defer e.mu.Unlock()
e.events = append(e.events, evt)
e.calls[evt.Reason]++
}

func (e *FakeEventRecorder) Calls(reason string) int {
e.mu.RLock()
defer e.mu.RUnlock()
return e.calls[reason]
}

func (e *FakeEventRecorder) Reset() {
e.mu.Lock()
defer e.mu.Unlock()
e.events = nil
e.calls = map[string]int{}
}

func (e *FakeEventRecorder) ForEachEvent(f func(evt events.Event)) {
e.mu.RLock()
defer e.mu.RUnlock()
for _, e := range e.events {
f(e)
}
}

func ExpectDetectedEvent(msg string) {
foundEvent := false
recorder.ForEachEvent(func(evt events.Event) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/machine/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine)
node, err := nodeForMachine(ctx, r.kubeClient, machine)
if err != nil {
if IsNodeNotFoundError(err) {
machine.StatusConditions().MarkFalse(v1alpha5.MachineRegistered, "NodeNotFound", "Node not registered with cluster")
machine.StatusConditions().MarkFalse(v1alpha5.MachineRegistered, "NodeNotFound", "Machine not registered with cluster")
return reconcile.Result{RequeueAfter: registrationTTL}, nil // Requeue later to check up to the registration timeout
}
if IsDuplicateNodeError(err) {
Expand All @@ -66,7 +66,7 @@ func (r *Registration) syncNode(ctx context.Context, machine *v1alpha5.Machine,
node.Labels = lo.Assign(node.Labels, machine.Labels)
node.Annotations = lo.Assign(node.Annotations, machine.Annotations)

// Sync all taints inside of Machine into the Node taints
// Sync all taints inside of Machine into the Machine taints
node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints)
if !machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() {
node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.StartupTaints)
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/machine/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/aws/karpenter-core/pkg/cloudprovider/fake"
"github.com/aws/karpenter-core/pkg/controllers/machine"
"github.com/aws/karpenter-core/pkg/controllers/machine/terminator"
"github.com/aws/karpenter-core/pkg/events"
"github.com/aws/karpenter-core/pkg/operator/controller"
"github.com/aws/karpenter-core/pkg/operator/scheme"
. "github.com/aws/karpenter-core/pkg/test/expectations"
Expand Down Expand Up @@ -65,9 +67,8 @@ var _ = BeforeSuite(func() {
ctx = settings.ToContext(ctx, test.Settings())

cloudProvider = fake.NewCloudProvider()
recorder := test.NewEventRecorder()
terminator := terminator.NewTerminator(fakeClock, env.Client, cloudProvider, terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), recorder))
machineController = machine.NewController(fakeClock, env.Client, cloudProvider, terminator, recorder)
terminator := terminator.NewTerminator(fakeClock, env.Client, cloudProvider, terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), events.NewRecorder(&record.FakeRecorder{})))
machineController = machine.NewController(fakeClock, env.Client, cloudProvider, terminator, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expand Down
83 changes: 39 additions & 44 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,56 +108,26 @@ func (p *Provisioner) Reconcile(ctx context.Context, _ reconcile.Request) (resul
return reconcile.Result{}, nil
}

// We collect the nodes with their used capacities before we get the list of pending pods. This ensures that
// the node capacities we schedule against are always >= what the actual capacity is at any given instance. This
// prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next
// scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods
// that have bound which we then provision new un-needed capacity for.
// -------
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
nodes := p.cluster.Nodes()

// Get pods, exit if nothing to do
pendingPods, err := p.GetPendingPods(ctx)
if err != nil {
return reconcile.Result{}, err
}
// Get pods from nodes that are preparing for deletion
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := nodes.Deleting().Pods(ctx, p.kubeClient)
if err != nil {
return reconcile.Result{}, err
}
pods := append(pendingPods, deletingNodePods...)
if len(pods) == 0 {
return reconcile.Result{}, nil
}

// Schedule pods to potential nodes, exit if nothing to do
machines, err := p.schedule(ctx, pods, nodes.Active())
machines, _, err := p.Schedule(ctx)
if err != nil {
return reconcile.Result{}, err
}
if len(machines) == 0 {
return reconcile.Result{}, nil
}

nodeNames, err := p.LaunchMachines(ctx, machines, RecordPodNomination)
machineNames, err := p.LaunchMachines(ctx, machines, RecordPodNomination)

// Any successfully created node is going to have the nodeName value filled in the slice
successfullyCreatedNodeCount := lo.CountBy(nodeNames, func(name string) bool { return name != "" })
successfullyCreatedNodeCount := lo.CountBy(machineNames, func(name string) bool { return name != "" })
metrics.NodesCreatedCounter.WithLabelValues(metrics.ProvisioningReason).Add(float64(successfullyCreatedNodeCount))

return reconcile.Result{}, err
}

// LaunchMachines launches nodes passed into the function in parallel. It returns a slice of the successfully created node
// names as well as a multierr of any errors that occurred while launching nodes
func (p *Provisioner) LaunchMachines(ctx context.Context, machines []*scheduler.Node, opts ...functional.Option[LaunchOptions]) ([]string, error) {
func (p *Provisioner) LaunchMachines(ctx context.Context, machines []*scheduler.Machine, opts ...functional.Option[LaunchOptions]) ([]string, error) {
// Launch capacity and bind pods
errs := make([]error, len(machines))
machineNames := make([]string, len(machines))
Expand All @@ -167,7 +137,7 @@ func (p *Provisioner) LaunchMachines(ctx context.Context, machines []*scheduler.
// register the provisioner on the context so we can pull it off for tagging purposes
// TODO: rethink this, maybe just pass the provisioner down instead of hiding it in the context?
ctx = injection.WithNamespacedName(ctx, types.NamespacedName{Name: machines[i].Labels[v1alpha5.ProvisionerNameLabelKey]})
if machineName, err := p.launch(ctx, machines[i], opts...); err != nil {
if machineName, err := p.Launch(ctx, machines[i], opts...); err != nil {
errs[i] = fmt.Errorf("launching machine, %w", err)
} else {
machineNames[i] = machineName
Expand Down Expand Up @@ -224,7 +194,7 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, po v1.Pod) {
}
}

// nolint: gocyclo
//nolint:gocyclo
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.Node, opts scheduler.SchedulerOptions) (*scheduler.Scheduler, error) {
// Build node templates
var machines []*scheduler.MachineTemplate
Expand Down Expand Up @@ -293,20 +263,45 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
return scheduler.NewScheduler(ctx, p.kubeClient, machines, provisionerList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, opts), nil
}

func (p *Provisioner) schedule(ctx context.Context, pods []*v1.Pod, stateNodes []*state.Node) ([]*scheduler.Node, error) {
func (p *Provisioner) Schedule(ctx context.Context) ([]*scheduler.Machine, []*scheduler.ExistingNode, error) {
defer metrics.Measure(schedulingDuration.WithLabelValues(injection.GetNamespacedName(ctx).Name))()

scheduler, err := p.NewScheduler(ctx, pods, stateNodes, scheduler.SchedulerOptions{})
// We collect the nodes with their used capacities before we get the list of pending pods. This ensures that
// the node capacities we schedule against are always >= what the actual capacity is at any given instance. This
// prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next
// scheduling loop when we Launch a new node. When this order is reversed, our node capacity may be reduced by pods
// that have bound which we then provision new un-needed capacity for.
// -------
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
nodes := p.cluster.Nodes()

// Get pods, exit if nothing to do
pendingPods, err := p.GetPendingPods(ctx)
if err != nil {
return nil, fmt.Errorf("creating scheduler, %w", err)
return nil, nil, err
}

// don't care about inflight scheduling results in this context
nodes, _, err := scheduler.Solve(ctx, pods)
return nodes, err
// Get pods from nodes that are preparing for deletion
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := nodes.Deleting().Pods(ctx, p.kubeClient)
if err != nil {
return nil, nil, err
}
pods := append(pendingPods, deletingNodePods...)
if len(pods) == 0 {
return nil, nil, nil
}
scheduler, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.SchedulerOptions{})
if err != nil {
return nil, nil, fmt.Errorf("creating scheduler, %w", err)
}
return scheduler.Solve(ctx, pods)
}

func (p *Provisioner) launch(ctx context.Context, machine *scheduler.Node, opts ...functional.Option[LaunchOptions]) (string, error) {
func (p *Provisioner) Launch(ctx context.Context, machine *scheduler.Machine, opts ...functional.Option[LaunchOptions]) (string, error) {
// Check limits
latest := &v1alpha5.Provisioner{}
if err := p.kubeClient.Get(ctx, types.NamespacedName{Name: machine.ProvisionerName}, latest); err != nil {
Expand Down
Loading

0 comments on commit 9766dd1

Please sign in to comment.