diff --git a/charts/karpenter-core/templates/clusterrole.yaml b/charts/karpenter-core/templates/clusterrole.yaml index 39ec5e92a6..bb62a26c3d 100644 --- a/charts/karpenter-core/templates/clusterrole.yaml +++ b/charts/karpenter-core/templates/clusterrole.yaml @@ -30,7 +30,7 @@ metadata: rules: # Read - apiGroups: ["karpenter.sh"] - resources: ["provisioners", "provisioners/status"] + resources: ["provisioners", "provisioners/status", "machines", "machines/status"] verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"] @@ -49,7 +49,7 @@ rules: verbs: [ "get", "list", "watch" ] # Write - apiGroups: ["karpenter.sh"] - resources: ["provisioners/status"] + resources: ["provisioners/status", "machines", "machines/status"] verbs: ["create", "delete", "patch"] - apiGroups: [""] resources: ["events"] diff --git a/pkg/apis/crds/karpenter.sh_machines.yaml b/pkg/apis/crds/karpenter.sh_machines.yaml index 3f220eec7b..84d5298c07 100644 --- a/pkg/apis/crds/karpenter.sh_machines.yaml +++ b/pkg/apis/crds/karpenter.sh_machines.yaml @@ -17,7 +17,14 @@ spec: singular: machine scope: Cluster versions: - - name: v1alpha5 + - additionalPrinterColumns: + - jsonPath: .status.conditions[?(@.type=="Ready")].status + name: Ready + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha5 schema: openAPIV3Schema: description: Machine is the Schema for the Machines API diff --git a/pkg/apis/v1alpha5/machine.go b/pkg/apis/v1alpha5/machine.go index 465760f1f0..61b2bff20f 100644 --- a/pkg/apis/v1alpha5/machine.go +++ b/pkg/apis/v1alpha5/machine.go @@ -125,6 +125,8 @@ type ResourceRequirements struct { // +kubebuilder:object:root=true // +kubebuilder:resource:path=machines,scope=Cluster,categories=karpenter // +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status",description="" +// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="" type Machine struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 4a66cfd501..7ba4b7b0fc 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -42,17 +42,17 @@ type CloudProvider struct { InstanceTypes []*cloudprovider.InstanceType // CreateCalls contains the arguments for every create call that was made since it was cleared - mu sync.Mutex + mu sync.RWMutex CreateCalls []*v1alpha5.Machine AllowedCreateCalls int + CreatedMachines map[string]*v1alpha5.Machine Drifted bool } -var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) - func NewCloudProvider() *CloudProvider { return &CloudProvider{ AllowedCreateCalls: math.MaxInt, + CreatedMachines: map[string]*v1alpha5.Machine{}, } } @@ -61,26 +61,29 @@ func (c *CloudProvider) Reset() { c.mu.Lock() defer c.mu.Unlock() c.CreateCalls = []*v1alpha5.Machine{} + c.CreatedMachines = map[string]*v1alpha5.Machine{} c.AllowedCreateCalls = math.MaxInt } func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) (*v1alpha5.Machine, error) { c.mu.Lock() + defer c.mu.Unlock() + c.CreateCalls = append(c.CreateCalls, machine) if len(c.CreateCalls) > c.AllowedCreateCalls { - c.mu.Unlock() return &v1alpha5.Machine{}, fmt.Errorf("erroring as number of AllowedCreateCalls has been exceeded") } - c.mu.Unlock() - requirements := scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...) + reqs := scheduling.NewNodeSelectorRequirements(machine.Spec.Requirements...) instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, &v1alpha5.Provisioner{})), func(i *cloudprovider.InstanceType, _ int) bool { - return requirements.Get(v1.LabelInstanceTypeStable).Has(i.Name) + return reqs.Compatible(i.Requirements) == nil && + len(i.Offerings.Requirements(reqs).Available()) > 0 && + resources.Fits(resources.Merge(machine.Spec.Resources.Requests, i.Overhead.Total()), i.Capacity) }) // Order instance types so that we get the cheapest instance types of the available offerings sort.Slice(instanceTypes, func(i, j int) bool { - iOfferings := instanceTypes[i].Offerings.Available().Requirements(requirements) - jOfferings := instanceTypes[j].Offerings.Available().Requirements(requirements) + iOfferings := instanceTypes[i].Offerings.Available().Requirements(reqs) + jOfferings := instanceTypes[j].Offerings.Available().Requirements(reqs) return iOfferings.Cheapest().Price < jOfferings.Cheapest().Price }) instanceType := instanceTypes[0] @@ -93,7 +96,7 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( } // Find Offering for _, o := range instanceType.Offerings.Available() { - if requirements.Compatible(scheduling.NewRequirements( + if reqs.Compatible(scheduling.NewRequirements( scheduling.NewRequirement(v1.LabelTopologyZone, v1.NodeSelectorOpIn, o.Zone), scheduling.NewRequirement(v1alpha5.LabelCapacityType, v1.NodeSelectorOpIn, o.CapacityType), )) == nil { @@ -103,22 +106,30 @@ func (c *CloudProvider) Create(ctx context.Context, machine *v1alpha5.Machine) ( } } name := test.RandomName() - return &v1alpha5.Machine{ + created := &v1alpha5.Machine{ ObjectMeta: metav1.ObjectMeta{ Name: name, Labels: labels, }, Spec: *machine.Spec.DeepCopy(), Status: v1alpha5.MachineStatus{ - ProviderID: fmt.Sprintf("fake://%s", name), + ProviderID: test.ProviderID(name), Capacity: functional.FilterMap(instanceType.Capacity, func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }), Allocatable: functional.FilterMap(instanceType.Allocatable(), func(_ v1.ResourceName, v resource.Quantity) bool { return !resources.IsZero(v) }), }, - }, nil + } + c.CreatedMachines[machine.Name] = created + return created, nil } -func (c *CloudProvider) Get(context.Context, string, string) (*v1alpha5.Machine, error) { - return nil, nil +func (c *CloudProvider) Get(_ context.Context, machineName string, _ string) (*v1alpha5.Machine, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + if machine, ok := c.CreatedMachines[machineName]; ok { + return machine.DeepCopy(), nil + } + return nil, cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", machineName)) } func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisioner) ([]*cloudprovider.InstanceType, error) { @@ -165,8 +176,15 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, _ *v1alpha5.Provisio }, nil } -func (c *CloudProvider) Delete(context.Context, *v1alpha5.Machine) error { - return nil +func (c *CloudProvider) Delete(_ context.Context, m *v1alpha5.Machine) error { + c.mu.Lock() + defer c.mu.Unlock() + + if _, ok := c.CreatedMachines[m.Name]; ok { + delete(c.CreatedMachines, m.Name) + return nil + } + return cloudprovider.NewMachineNotFoundError(fmt.Errorf("no machine exists with name '%s'", m.Name)) } func (c *CloudProvider) IsMachineDrifted(context.Context, *v1alpha5.Machine) (bool, error) { diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index d6b13d6775..096c4e82be 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -25,6 +25,7 @@ import ( "github.com/aws/karpenter-core/pkg/controllers/counter" "github.com/aws/karpenter-core/pkg/controllers/deprovisioning" "github.com/aws/karpenter-core/pkg/controllers/inflightchecks" + "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" metricspod "github.com/aws/karpenter-core/pkg/controllers/metrics/pod" metricsprovisioner "github.com/aws/karpenter-core/pkg/controllers/metrics/provisioner" metricsstate "github.com/aws/karpenter-core/pkg/controllers/metrics/state" @@ -36,7 +37,6 @@ import ( "github.com/aws/karpenter-core/pkg/events" "github.com/aws/karpenter-core/pkg/metrics" "github.com/aws/karpenter-core/pkg/operator/controller" - "github.com/aws/karpenter-core/pkg/operator/settingsstore" ) func init() { @@ -49,25 +49,25 @@ func NewControllers( kubeClient client.Client, kubernetesInterface kubernetes.Interface, cluster *state.Cluster, - eventRecorder events.Recorder, - settingsStore settingsstore.Store, + recorder events.Recorder, cloudProvider cloudprovider.CloudProvider, ) []controller.Controller { - provisioner := provisioning.NewProvisioner(ctx, kubeClient, kubernetesInterface.CoreV1(), eventRecorder, cloudProvider, cluster) + provisioner := provisioning.NewProvisioner(ctx, kubeClient, kubernetesInterface.CoreV1(), recorder, cloudProvider, cluster) + terminator := terminator.NewTerminator(clock, kubeClient, cloudProvider, terminator.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), recorder)) return []controller.Controller{ provisioner, metricsstate.NewController(cluster), - deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, eventRecorder, cluster), - provisioning.NewController(kubeClient, provisioner, eventRecorder), + deprovisioning.NewController(clock, kubeClient, provisioner, cloudProvider, recorder, cluster), + provisioning.NewController(kubeClient, provisioner, recorder), informer.NewNodeController(kubeClient, cluster), informer.NewPodController(kubeClient, cluster), informer.NewProvisionerController(kubeClient, cluster), node.NewController(clock, kubeClient, cloudProvider, cluster), - termination.NewController(clock, kubeClient, termination.NewEvictionQueue(ctx, kubernetesInterface.CoreV1(), eventRecorder), eventRecorder, cloudProvider), + termination.NewController(kubeClient, terminator, recorder), metricspod.NewController(kubeClient), metricsprovisioner.NewController(kubeClient), counter.NewController(kubeClient, cluster), - inflightchecks.NewController(clock, kubeClient, eventRecorder, cloudProvider), + inflightchecks.NewController(clock, kubeClient, recorder, cloudProvider), } } diff --git a/pkg/controllers/machine/controller.go b/pkg/controllers/machine/controller.go new file mode 100644 index 0000000000..14308e2f91 --- /dev/null +++ b/pkg/controllers/machine/controller.go @@ -0,0 +1,203 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "fmt" + + "github.com/samber/lo" + "go.uber.org/multierr" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/utils/clock" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" + "github.com/aws/karpenter-core/pkg/events" + corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" + "github.com/aws/karpenter-core/pkg/utils/result" +) + +type machineReconciler interface { + Reconcile(context.Context, *v1alpha5.Machine) (reconcile.Result, error) +} + +var _ corecontroller.FinalizingTypedController[*v1alpha5.Machine] = (*Controller)(nil) + +// Controller is a Machine Controller +type Controller struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + recorder events.Recorder + terminator *terminator.Terminator + + launch *Launch + registration *Registration + initialization *Initialization + liveness *Liveness +} + +// NewController is a constructor for the Machine Controller +func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, + terminator *terminator.Terminator, recorder events.Recorder) corecontroller.Controller { + return corecontroller.Typed[*v1alpha5.Machine](kubeClient, &Controller{ + kubeClient: kubeClient, + cloudProvider: cloudProvider, + recorder: recorder, + terminator: terminator, + + launch: &Launch{kubeClient: kubeClient, cloudProvider: cloudProvider}, + registration: &Registration{kubeClient: kubeClient}, + initialization: &Initialization{kubeClient: kubeClient}, + liveness: &Liveness{clock: clk, kubeClient: kubeClient}, + }) +} + +func (*Controller) Name() string { + return "machine" +} + +func (c *Controller) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + // Add the finalizer immediately since we shouldn't launch if we don't yet have the finalizer. + // Otherwise, we could leak resources + stored := machine.DeepCopy() + controllerutil.AddFinalizer(machine, v1alpha5.TerminationFinalizer) + if !equality.Semantic.DeepEqual(machine, stored) { + if err := c.kubeClient.Patch(ctx, machine, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + } + + stored = machine.DeepCopy() + var results []reconcile.Result + var errs error + for _, reconciler := range []machineReconciler{ + c.launch, + c.registration, + c.initialization, + c.liveness, + } { + res, err := reconciler.Reconcile(ctx, machine) + errs = multierr.Append(errs, err) + results = append(results, res) + } + if !equality.Semantic.DeepEqual(stored, machine) { + statusCopy := machine.DeepCopy() + if err := c.kubeClient.Patch(ctx, machine, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(multierr.Append(errs, err)) + } + if err := c.kubeClient.Status().Patch(ctx, statusCopy, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(multierr.Append(errs, err)) + } + } + return result.Min(results...), errs +} + +func (c *Controller) Finalize(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + stored := machine.DeepCopy() + if !controllerutil.ContainsFinalizer(machine, v1alpha5.TerminationFinalizer) { + return reconcile.Result{}, nil + } + if err := c.cleanupNodeForMachine(ctx, machine); err != nil { + if terminator.IsNodeDrainError(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, nil + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID)) + if err := c.cloudProvider.Delete(ctx, machine); cloudprovider.IgnoreMachineNotFoundError(err) != nil { + return reconcile.Result{}, fmt.Errorf("terminating cloudprovider instance, %w", err) + } + controllerutil.RemoveFinalizer(machine, v1alpha5.TerminationFinalizer) + if !equality.Semantic.DeepEqual(stored, machine) { + if err := c.kubeClient.Patch(ctx, machine, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("removing machine termination finalizer, %w", err)) + } + logging.FromContext(ctx).Infof("deleted machine") + } + return reconcile.Result{}, nil +} + +func (c *Controller) Builder(ctx context.Context, m manager.Manager) corecontroller.Builder { + return corecontroller.Adapt(controllerruntime. + NewControllerManagedBy(m). + For(&v1alpha5.Machine{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Watches( + &source.Kind{Type: &v1.Node{}}, + handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + node := o.(*v1.Node) + machineList := &v1alpha5.MachineList{} + if err := c.kubeClient.List(ctx, machineList, client.MatchingFields{"status.providerID": node.Spec.ProviderID}); err != nil { + return []reconcile.Request{} + } + return lo.Map(machineList.Items, func(m v1alpha5.Machine, _ int) reconcile.Request { + return reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(&m), + } + }) + }), + ). + WithOptions(controller.Options{MaxConcurrentReconciles: 50})) // higher concurrency limit since we want fast reaction to node syncing and launch +} + +func (c *Controller) cleanupNodeForMachine(ctx context.Context, machine *v1alpha5.Machine) error { + node, err := nodeForMachine(ctx, c.kubeClient, machine) + if err != nil { + // We don't clean the node if we either don't find a node or have violated the single machine to single node invariant + if IsNodeNotFoundError(err) || IsDuplicateNodeError(err) { + return nil + } + return err + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", node.Name)) + if err = c.terminator.Cordon(ctx, node); err != nil { + if terminator.IsNodeDrainError(err) { + c.recorder.Publish(events.NodeFailedToDrain(node, err)) + } + return fmt.Errorf("cordoning node, %w", err) + } + if err = c.terminator.Drain(ctx, node); err != nil { + return fmt.Errorf("draining node, %w", err) + } + return nil +} + +func nodeForMachine(ctx context.Context, c client.Client, machine *v1alpha5.Machine) (*v1.Node, error) { + nodeList := v1.NodeList{} + if err := c.List(ctx, &nodeList, client.MatchingFields{"spec.providerID": machine.Status.ProviderID}, client.Limit(2)); err != nil { + return nil, fmt.Errorf("listing nodes, %w", err) + } + if len(nodeList.Items) > 1 { + return nil, &DuplicateNodeError{ProviderID: machine.Status.ProviderID} + } + if len(nodeList.Items) == 0 { + return nil, &NodeNotFoundError{ProviderID: machine.Status.ProviderID} + } + return &nodeList.Items[0], nil +} diff --git a/pkg/controllers/machine/errors.go b/pkg/controllers/machine/errors.go new file mode 100644 index 0000000000..91f97e65b7 --- /dev/null +++ b/pkg/controllers/machine/errors.go @@ -0,0 +1,52 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "errors" + "fmt" +) + +type NodeNotFoundError struct { + ProviderID string +} + +func (e *NodeNotFoundError) Error() string { + return fmt.Sprintf("no nodes found for provider id '%s'", e.ProviderID) +} + +func IsNodeNotFoundError(err error) bool { + if err == nil { + return false + } + nnfErr := &NodeNotFoundError{} + return errors.As(err, &nnfErr) +} + +type DuplicateNodeError struct { + ProviderID string +} + +func (e *DuplicateNodeError) Error() string { + return fmt.Sprintf("multiple found for provider id '%s'", e.ProviderID) +} + +func IsDuplicateNodeError(err error) bool { + if err == nil { + return false + } + dnErr := &DuplicateNodeError{} + return errors.As(err, &dnErr) +} diff --git a/pkg/controllers/machine/initialization.go b/pkg/controllers/machine/initialization.go new file mode 100644 index 0000000000..8e11e0e06e --- /dev/null +++ b/pkg/controllers/machine/initialization.go @@ -0,0 +1,119 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "fmt" + + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + nodeutil "github.com/aws/karpenter-core/pkg/utils/node" + "github.com/aws/karpenter-core/pkg/utils/resources" +) + +type Initialization struct { + kubeClient client.Client +} + +// Reconcile checks for initialization based on if: +// a) its current status is set to Ready +// b) all the startup taints have been removed from the node +// c) all extended resources have been registered +// This method handles both nil provisioners and nodes without extended resources gracefully. +func (i *Initialization) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + if machine.Status.ProviderID == "" { + return reconcile.Result{}, nil + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID)) + node, err := nodeForMachine(ctx, i.kubeClient, machine) + if err != nil { + machine.StatusConditions().MarkFalse(v1alpha5.MachineInitialized, "NodeNotFound", "Node not registered with cluster") + return reconcile.Result{}, nil //nolint:nilerr + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", node.Name)) + if nodeutil.GetCondition(node, v1.NodeReady).Status != v1.ConditionTrue { + machine.StatusConditions().MarkFalse(v1alpha5.MachineInitialized, "NodeNotReady", "Node status is NotReady") + return reconcile.Result{}, nil + } + if taint, ok := isStartupTaintRemoved(node, machine); !ok { + machine.StatusConditions().MarkFalse(v1alpha5.MachineInitialized, "StartupTaintsExist", "StartupTaint '%s' still exists", formatTaint(taint)) + return reconcile.Result{}, nil + } + if name, ok := requestedResourcesRegistered(node, machine); !ok { + machine.StatusConditions().MarkFalse(v1alpha5.MachineInitialized, "ResourceNotRegistered", "Resource '%s' was requested but not registered", name) + return reconcile.Result{}, nil + } + stored := node.DeepCopy() + node.Labels = lo.Assign(node.Labels, map[string]string{v1alpha5.LabelNodeInitialized: "true"}) + if !equality.Semantic.DeepEqual(stored, node) { + if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + return reconcile.Result{}, err + } + logging.FromContext(ctx).Debugf("node initialized") + } + machine.StatusConditions().MarkTrue(v1alpha5.MachineInitialized) + return reconcile.Result{}, nil +} + +// isStartupTaintRemoved returns true if there are no startup taints registered for the provisioner, or if all startup +// taints have been removed from the node +func isStartupTaintRemoved(node *v1.Node, machine *v1alpha5.Machine) (*v1.Taint, bool) { + if machine != nil { + for _, startupTaint := range machine.Spec.StartupTaints { + for i := range node.Spec.Taints { + // if the node still has a startup taint applied, it's not ready + if startupTaint.MatchTaint(&node.Spec.Taints[i]) { + return &node.Spec.Taints[i], false + } + } + } + } + return nil, true +} + +// requestedResourcesRegistered returns true if there are no extended resources on the node, or they have all been +// registered by device plugins +func requestedResourcesRegistered(node *v1.Node, machine *v1alpha5.Machine) (v1.ResourceName, bool) { + for resourceName, quantity := range machine.Spec.Resources.Requests { + if quantity.IsZero() { + continue + } + // kubelet will zero out both the capacity and allocatable for an extended resource on startup, so if our + // annotation says the resource should be there, but it's zero'd in both then the device plugin hasn't + // registered it yet. + // We wait on allocatable since this is the value that is used in scheduling + if resources.IsZero(node.Status.Allocatable[resourceName]) { + return resourceName, false + } + } + return "", true +} + +func formatTaint(taint *v1.Taint) string { + if taint == nil { + return "" + } + if taint.Value == "" { + return fmt.Sprintf("%s:%s", taint.Key, taint.Effect) + } + return fmt.Sprintf("%s=%s:%s", taint.Key, taint.Value, taint.Effect) +} diff --git a/pkg/controllers/machine/launch.go b/pkg/controllers/machine/launch.go new file mode 100644 index 0000000000..7db13a364e --- /dev/null +++ b/pkg/controllers/machine/launch.go @@ -0,0 +1,64 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "fmt" + + "github.com/samber/lo" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider" +) + +type Launch struct { + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +func (l *Launch) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + if machine.Status.ProviderID != "" { + return reconcile.Result{}, nil + } + retrieved, err := l.cloudProvider.Get(ctx, machine.Name, machine.Labels[v1alpha5.ProvisionerNameLabelKey]) + if err != nil { + if cloudprovider.IsMachineNotFoundError(err) { + logging.FromContext(ctx).Debugf("creating machine") + retrieved, err = l.cloudProvider.Create(ctx, machine) + if err != nil { + return reconcile.Result{}, fmt.Errorf("creating machine, %w", err) + } + } else { + return reconcile.Result{}, fmt.Errorf("getting machine, %w", err) + } + } + populateMachineDetails(machine, retrieved) + machine.StatusConditions().MarkTrue(v1alpha5.MachineCreated) + return reconcile.Result{}, nil +} + +func populateMachineDetails(machine, retrieved *v1alpha5.Machine) { + machine.Labels = lo.Assign(machine.Labels, retrieved.Labels, map[string]string{ + v1alpha5.MachineNameLabelKey: machine.Name, + }) + machine.Annotations = lo.Assign(machine.Annotations, retrieved.Annotations) + machine.Status.ProviderID = retrieved.Status.ProviderID + machine.Status.Allocatable = retrieved.Status.Allocatable + machine.Status.Capacity = retrieved.Status.Capacity +} diff --git a/pkg/controllers/machine/liveness.go b/pkg/controllers/machine/liveness.go new file mode 100644 index 0000000000..fed84fc427 --- /dev/null +++ b/pkg/controllers/machine/liveness.go @@ -0,0 +1,43 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "time" + + "k8s.io/utils/clock" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" +) + +const registrationTTL = 10 * time.Minute + +type Liveness struct { + clock clock.Clock + kubeClient client.Client +} + +func (l *Liveness) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + // Delete the machine if we believe the machine won't register since we haven't seen the node + if !machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() && !machine.CreationTimestamp.IsZero() && + machine.CreationTimestamp.Add(registrationTTL).Before(l.clock.Now()) { + + return reconcile.Result{}, client.IgnoreNotFound(l.kubeClient.Delete(ctx, machine)) + } + return reconcile.Result{}, nil +} diff --git a/pkg/controllers/machine/registration.go b/pkg/controllers/machine/registration.go new file mode 100644 index 0000000000..b4540cad92 --- /dev/null +++ b/pkg/controllers/machine/registration.go @@ -0,0 +1,82 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "context" + "fmt" + + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "knative.dev/pkg/logging" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/operator/scheme" + "github.com/aws/karpenter-core/pkg/scheduling" +) + +type Registration struct { + kubeClient client.Client +} + +func (r *Registration) Reconcile(ctx context.Context, machine *v1alpha5.Machine) (reconcile.Result, error) { + if machine.Status.ProviderID == "" { + return reconcile.Result{}, nil + } + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", machine.Status.ProviderID)) + node, err := nodeForMachine(ctx, r.kubeClient, machine) + if err != nil { + if IsNodeNotFoundError(err) { + machine.StatusConditions().MarkFalse(v1alpha5.MachineRegistered, "NodeNotFound", "Node not registered with cluster") + return reconcile.Result{RequeueAfter: registrationTTL}, nil // Requeue later to check up to the registration timeout + } + if IsDuplicateNodeError(err) { + machine.StatusConditions().MarkFalse(v1alpha5.MachineRegistered, "MultipleNodesFound", "Invariant violated, machine matched multiple nodes") + return reconcile.Result{}, nil + } + return reconcile.Result{}, fmt.Errorf("getting node for machine, %w", err) + } + logging.WithLogger(ctx, logging.FromContext(ctx).With("node", node.Name)) + if err = r.syncNode(ctx, machine, node); err != nil { + return reconcile.Result{}, fmt.Errorf("syncing node, %w", err) + } + machine.StatusConditions().MarkTrue(v1alpha5.MachineRegistered) + return reconcile.Result{}, nil +} + +func (r *Registration) syncNode(ctx context.Context, machine *v1alpha5.Machine, node *v1.Node) error { + stored := node.DeepCopy() + controllerutil.AddFinalizer(node, v1alpha5.TerminationFinalizer) + node.Labels = lo.Assign(node.Labels, machine.Labels) + node.Annotations = lo.Assign(node.Annotations, machine.Annotations) + + // Sync all taints inside of Machine into the Node taints + node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.Taints) + if !machine.StatusConditions().GetCondition(v1alpha5.MachineRegistered).IsTrue() { + node.Spec.Taints = scheduling.Taints(node.Spec.Taints).Merge(machine.Spec.StartupTaints) + } + lo.Must0(controllerutil.SetOwnerReference(machine, node, scheme.Scheme)) + if !equality.Semantic.DeepEqual(stored, node) { + if err := r.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + return fmt.Errorf("syncing node labels, %w", err) + } + logging.FromContext(ctx).Debugf("synced node") + } + return nil +} diff --git a/pkg/controllers/machine/suite_test.go b/pkg/controllers/machine/suite_test.go new file mode 100644 index 0000000000..6c8eab8e9d --- /dev/null +++ b/pkg/controllers/machine/suite_test.go @@ -0,0 +1,814 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine_test + +import ( + "context" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clock "k8s.io/utils/clock/testing" + . "knative.dev/pkg/logging/testing" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/aws/karpenter-core/pkg/apis" + "github.com/aws/karpenter-core/pkg/apis/config/settings" + "github.com/aws/karpenter-core/pkg/apis/v1alpha5" + "github.com/aws/karpenter-core/pkg/cloudprovider/fake" + "github.com/aws/karpenter-core/pkg/controllers/machine" + "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" + "github.com/aws/karpenter-core/pkg/operator/controller" + "github.com/aws/karpenter-core/pkg/operator/scheme" + . "github.com/aws/karpenter-core/pkg/test/expectations" + + "github.com/aws/karpenter-core/pkg/test" +) + +var ctx context.Context +var machineController controller.Controller +var env *test.Environment +var fakeClock *clock.FakeClock +var cloudProvider *fake.CloudProvider + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "Node") +} + +var _ = BeforeSuite(func() { + fakeClock = clock.NewFakeClock(time.Now()) + env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...), test.WithFieldIndexers(func(c cache.Cache) error { + return c.IndexField(ctx, &v1.Node{}, "spec.providerID", func(obj client.Object) []string { + return []string{obj.(*v1.Node).Spec.ProviderID} + }) + })) + ctx = settings.ToContext(ctx, test.Settings()) + + cloudProvider = fake.NewCloudProvider() + recorder := test.NewEventRecorder() + terminator := terminator.NewTerminator(fakeClock, env.Client, cloudProvider, terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), recorder)) + machineController = machine.NewController(fakeClock, env.Client, cloudProvider, terminator, recorder) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = Describe("Controller", func() { + AfterEach(func() { + fakeClock.SetTime(time.Now()) + ExpectCleanedUp(ctx, env.Client) + cloudProvider.Reset() + }) + + Context("Finalizer", func() { + It("should add the finalizer if it doesn't exist", func() { + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + _, ok := lo.Find(machine.Finalizers, func(f string) bool { + return f == v1alpha5.TerminationFinalizer + }) + Expect(ok).To(BeTrue()) + }) + }) + Context("Launch", func() { + It("should launch an instance when a new Machine is created", func() { + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + Expect(cloudProvider.CreateCalls).To(HaveLen(1)) + Expect(cloudProvider.CreatedMachines).To(HaveLen(1)) + _, err := cloudProvider.Get(ctx, machine.Name, "") + Expect(err).ToNot(HaveOccurred()) + }) + It("should get an instance and hydrate the Machine when the Machine is already created", func() { + machine := test.Machine() + cloudProviderMachine := &v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Name: machine.Name, + Labels: map[string]string{ + v1.LabelInstanceTypeStable: "small-instance-type", + v1.LabelTopologyZone: "test-zone-1a", + v1.LabelTopologyRegion: "test-zone", + v1alpha5.LabelCapacityType: v1alpha5.CapacityTypeSpot, + }, + }, + Status: v1alpha5.MachineStatus{ + ProviderID: test.RandomProviderID(), + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourceEphemeralStorage: resource.MustParse("20Gi"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourceEphemeralStorage: resource.MustParse("18Gi"), + }, + }, + } + cloudProvider.CreatedMachines[machine.Name] = cloudProviderMachine + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + + Expect(machine.Status.ProviderID).To(Equal(cloudProviderMachine.Status.ProviderID)) + ExpectResources(machine.Status.Capacity, cloudProviderMachine.Status.Capacity) + ExpectResources(machine.Status.Allocatable, cloudProviderMachine.Status.Allocatable) + + Expect(machine.Labels).To(HaveKeyWithValue(v1.LabelInstanceTypeStable, "small-instance-type")) + Expect(machine.Labels).To(HaveKeyWithValue(v1.LabelTopologyZone, "test-zone-1a")) + Expect(machine.Labels).To(HaveKeyWithValue(v1.LabelTopologyRegion, "test-zone")) + Expect(machine.Labels).To(HaveKeyWithValue(v1alpha5.LabelCapacityType, v1alpha5.CapacityTypeSpot)) + }) + It("should add the MachineCreated status condition after creating the Machine", func() { + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineCreated).Status).To(Equal(v1.ConditionTrue)) + }) + }) + Context("Registration", func() { + It("should match the Machine to the Node when the Node comes online", func() { + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + }) + It("should add the owner reference to the Node when the Node comes online", func() { + machine := test.Machine() + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + node = ExpectExists(ctx, env.Client, node) + ExpectOwnerReferenceExists(node, machine) + }) + It("should sync the labels to the Node when the Node comes online", func() { + machine := test.Machine(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "custom-label": "custom-value", + "other-custom-label": "other-custom-value", + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(machine.Labels).To(HaveKeyWithValue("custom-label", "custom-value")) + Expect(machine.Labels).To(HaveKeyWithValue("other-custom-label", "other-custom-value")) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + + // Expect Node to have all the labels that the Machine has + for k, v := range machine.Labels { + Expect(node.Labels).To(HaveKeyWithValue(k, v)) + } + }) + It("should sync the annotations to the Node when the Node comes online", func() { + machine := test.Machine(v1alpha5.Machine{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + v1alpha5.DoNotConsolidateNodeAnnotationKey: "true", + "my-custom-annotation": "my-custom-value", + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(machine.Annotations).To(HaveKeyWithValue(v1alpha5.DoNotConsolidateNodeAnnotationKey, "true")) + Expect(machine.Annotations).To(HaveKeyWithValue("my-custom-annotation", "my-custom-value")) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + + // Expect Node to have all the annotations that the Machine has + for k, v := range machine.Annotations { + Expect(node.Annotations).To(HaveKeyWithValue(k, v)) + } + }) + It("should sync the taints to the Node when the Node comes online", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Taints: []v1.Taint{ + { + Key: "custom-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-value", + }, + { + Key: "other-custom-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-value", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(machine.Spec.Taints).To(ContainElements( + v1.Taint{ + Key: "custom-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-value", + }, + v1.Taint{ + Key: "other-custom-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-value", + }, + )) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + + Expect(node.Spec.Taints).To(ContainElements( + v1.Taint{ + Key: "custom-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-value", + }, + v1.Taint{ + Key: "other-custom-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-value", + }, + )) + }) + It("should sync the startupTaints to the Node when the Node comes online", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Taints: []v1.Taint{ + { + Key: "custom-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-value", + }, + { + Key: "other-custom-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-value", + }, + }, + StartupTaints: []v1.Taint{ + { + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + { + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(machine.Spec.StartupTaints).To(ContainElements( + v1.Taint{ + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + v1.Taint{ + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + )) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + + Expect(node.Spec.Taints).To(ContainElements( + v1.Taint{ + Key: "custom-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-value", + }, + v1.Taint{ + Key: "other-custom-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-value", + }, + v1.Taint{ + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + v1.Taint{ + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + )) + }) + It("should not re-sync the startupTaints to the Node when the startupTaints are removed", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + StartupTaints: []v1.Taint{ + { + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + { + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ProviderID: machine.Status.ProviderID}) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + + Expect(node.Spec.Taints).To(ContainElements( + v1.Taint{ + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + v1.Taint{ + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + )) + node.Spec.Taints = []v1.Taint{} + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + Expect(node.Spec.Taints).To(HaveLen(0)) + }) + }) + Context("Initialization", func() { + It("should consider the Machine initialized when all initialization conditions are met", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + + node = ExpectExists(ctx, env.Client, node) + node.Status.Capacity = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + } + node.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + } + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionTrue)) + }) + It("should add the initialization label to the node when the Machine is initialized", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + node = ExpectExists(ctx, env.Client, node) + Expect(node.Labels).To(HaveKeyWithValue(v1alpha5.LabelNodeInitialized, "true")) + }) + It("should not consider the Node to be initialized when the status of the Node is NotReady", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + ReadyStatus: v1.ConditionFalse, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + }) + It("should not consider the Node to be initialized when all requested resources aren't registered", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + // Update the machine to add mock the instance type having an extended resource + machine.Status.Capacity[fake.ResourceGPUVendorA] = resource.MustParse("2") + machine.Status.Allocatable[fake.ResourceGPUVendorA] = resource.MustParse("2") + ExpectApplied(ctx, env.Client, machine) + + // Extended resource hasn't registered yet by the daemonset + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + }) + It("should consider the node to be initialized once all the resources are registered", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + // Update the machine to add mock the instance type having an extended resource + machine.Status.Capacity[fake.ResourceGPUVendorA] = resource.MustParse("2") + machine.Status.Allocatable[fake.ResourceGPUVendorA] = resource.MustParse("2") + ExpectApplied(ctx, env.Client, machine) + + // Extended resource hasn't registered yet by the daemonset + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + + // Node now registers the resource + node = ExpectExists(ctx, env.Client, node) + node.Status.Capacity[fake.ResourceGPUVendorA] = resource.MustParse("2") + node.Status.Allocatable[fake.ResourceGPUVendorA] = resource.MustParse("2") + ExpectApplied(ctx, env.Client, node) + + // Reconcile the machine and the Machine/Node should now be initilized + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionTrue)) + }) + It("should not consider the Node to be initialized when all startupTaints aren't removed", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }, + StartupTaints: []v1.Taint{ + { + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + { + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + + // Should add the startup taints to the node + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + node = ExpectExists(ctx, env.Client, node) + Expect(node.Spec.Taints).To(ContainElements( + v1.Taint{ + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + v1.Taint{ + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + )) + + // Shouldn't consider the node ready since the startup taints still exist + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + }) + It("should consider the Node to be initialized once the startup taints are removed", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + }, + }, + StartupTaints: []v1.Taint{ + { + Key: "custom-startup-taint", + Effect: v1.TaintEffectNoSchedule, + Value: "custom-startup-value", + }, + { + Key: "other-custom-startup-taint", + Effect: v1.TaintEffectNoExecute, + Value: "other-custom-startup-value", + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + + // Shouldn't consider the node ready since the startup taints still exist + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionFalse)) + + node = ExpectExists(ctx, env.Client, node) + node.Spec.Taints = []v1.Taint{} + ExpectApplied(ctx, env.Client, node) + + // Machine should now be ready since all startup taints are removed + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineRegistered).Status).To(Equal(v1.ConditionTrue)) + Expect(ExpectStatusConditionExists(machine, v1alpha5.MachineInitialized).Status).To(Equal(v1.ConditionTrue)) + }) + }) + Context("Liveness", func() { + It("should delete the Machine when the Node hasn't registered to the Machine past the liveness TTL", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + // If the node hasn't registered in the liveness timeframe, then we deprovision the Machine + fakeClock.Step(time.Minute * 11) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) // Reconcile again to handle termination flow + ExpectNotFound(ctx, env.Client, machine) + }) + }) + Context("Termination", func() { + It("should cordon, drain, and delete the Machine on terminate", func() { + machine := test.Machine(v1alpha5.Machine{ + Spec: v1alpha5.MachineSpec{ + Resources: v1alpha5.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2"), + v1.ResourceMemory: resource.MustParse("50Mi"), + v1.ResourcePods: resource.MustParse("5"), + fake.ResourceGPUVendorA: resource.MustParse("1"), + }, + }, + }, + }) + ExpectApplied(ctx, env.Client, machine) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + machine = ExpectExists(ctx, env.Client, machine) + + node := test.Node(test.NodeOptions{ + ProviderID: machine.Status.ProviderID, + Capacity: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("100Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("8"), + v1.ResourceMemory: resource.MustParse("80Mi"), + v1.ResourcePods: resource.MustParse("110"), + }, + }) + ExpectApplied(ctx, env.Client, node) + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + + Expect(cloudProvider.CreatedMachines).To(HaveLen(1)) + + // Kickoff the deletion flow for the machine + Expect(env.Client.Delete(ctx, machine)).To(Succeed()) + + // Machine should delete and the Node deletion should cascade shortly after + ExpectReconcileSucceeded(ctx, machineController, client.ObjectKeyFromObject(machine)) + ExpectNotFound(ctx, env.Client, machine) + }) + }) +}) diff --git a/pkg/controllers/termination/eviction.go b/pkg/controllers/machine/terminator/eviction.go similarity index 88% rename from pkg/controllers/termination/eviction.go rename to pkg/controllers/machine/terminator/eviction.go index 60a773242a..740064dee5 100644 --- a/pkg/controllers/termination/eviction.go +++ b/pkg/controllers/machine/terminator/eviction.go @@ -12,17 +12,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package termination +package terminator import ( "context" + "errors" "fmt" "time" set "github.com/deckarep/golang-set" v1 "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" - "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -38,6 +39,22 @@ const ( evictionQueueMaxDelay = 10 * time.Second ) +type NodeDrainError struct { + error +} + +func NewNodeDrainError(err error) *NodeDrainError { + return &NodeDrainError{error: err} +} + +func IsNodeDrainError(err error) bool { + if err == nil { + return false + } + var nodeDrainErr *NodeDrainError + return errors.As(err, &nodeDrainErr) +} + type EvictionQueue struct { workqueue.RateLimitingInterface set.Set @@ -97,10 +114,10 @@ func (e *EvictionQueue) evict(ctx context.Context, nn types.NamespacedName) bool }) // status codes for the eviction API are defined here: // https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works - if errors.IsNotFound(err) { // 404 + if apierrors.IsNotFound(err) { // 404 return true } - if errors.IsTooManyRequests(err) { // 429 - PDB violation + if apierrors.IsTooManyRequests(err) { // 429 - PDB violation e.recorder.Publish(events.NodeFailedToDrain(&v1.Node{ObjectMeta: metav1.ObjectMeta{ Name: nn.Name, Namespace: nn.Namespace, diff --git a/pkg/controllers/machine/terminator/metrics.go b/pkg/controllers/machine/terminator/metrics.go new file mode 100644 index 0000000000..1d531eb2b9 --- /dev/null +++ b/pkg/controllers/machine/terminator/metrics.go @@ -0,0 +1,38 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terminator + +import ( + "github.com/prometheus/client_golang/prometheus" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + + "github.com/aws/karpenter-core/pkg/metrics" +) + +var ( + terminationSummary = prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: "karpenter", + Subsystem: "nodes", + Name: "termination_time_seconds", + Help: "The time taken between a node's deletion request and the removal of its finalizer", + Objectives: metrics.SummaryObjectives(), + }, + ) +) + +func init() { + crmetrics.Registry.MustRegister(terminationSummary) +} diff --git a/pkg/controllers/termination/terminate.go b/pkg/controllers/machine/terminator/terminator.go similarity index 71% rename from pkg/controllers/termination/terminate.go rename to pkg/controllers/machine/terminator/terminator.go index e30ac247fa..e8aa87a7da 100644 --- a/pkg/controllers/termination/terminate.go +++ b/pkg/controllers/machine/terminator/terminator.go @@ -12,23 +12,20 @@ See the License for the specific language governing permissions and limitations under the License. */ -package termination +package terminator import ( "context" - "errors" "fmt" "time" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/utils/clock" - "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - - v1 "k8s.io/api/core/v1" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/samber/lo" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider" @@ -37,38 +34,40 @@ import ( ) type Terminator struct { - EvictionQueue *EvictionQueue - KubeClient client.Client - CloudProvider cloudprovider.CloudProvider - Clock clock.Clock + clock clock.Clock + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + evictionQueue *EvictionQueue } -type NodeDrainErr error - -func IsNodeDrainErr(err error) bool { - var nodeDrainErr NodeDrainErr - return errors.As(err, &nodeDrainErr) +func NewTerminator(clk clock.Clock, kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, eq *EvictionQueue) *Terminator { + return &Terminator{ + clock: clk, + kubeClient: kubeClient, + cloudProvider: cloudProvider, + evictionQueue: eq, + } } -// cordon cordons a node -func (t *Terminator) cordon(ctx context.Context, node *v1.Node) error { +// Cordon cordons a node +func (t *Terminator) Cordon(ctx context.Context, node *v1.Node) error { stored := node.DeepCopy() node.Spec.Unschedulable = true node.Labels = lo.Assign(node.Labels, map[string]string{ v1.LabelNodeExcludeBalancers: "karpenter", }) if !equality.Semantic.DeepEqual(node, stored) { - if err := t.KubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { - return client.IgnoreNotFound(err) - } logging.FromContext(ctx).Infof("cordoned node") + if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + return err + } } return nil } -// drain evicts pods from the node and returns true when all pods are evicted +// Drain evicts pods from the node and returns true when all pods are evicted // https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown -func (t *Terminator) drain(ctx context.Context, node *v1.Node) error { +func (t *Terminator) Drain(ctx context.Context, node *v1.Node) error { // Get evictable pods pods, err := t.getPods(ctx, node) if err != nil { @@ -78,7 +77,7 @@ func (t *Terminator) drain(ctx context.Context, node *v1.Node) error { // Skip node due to pods that are not able to be evicted for _, p := range pods { if podutil.HasDoNotEvict(p) { - return NodeDrainErr(fmt.Errorf("pod %s/%s has do-not-evict annotation", p.Namespace, p.Name)) + return NewNodeDrainError(fmt.Errorf("pod %s/%s has do-not-evict annotation", p.Namespace, p.Name)) } // Ignore if unschedulable is tolerated, since they will reschedule if podutil.ToleratesUnschedulableTaint(p) { @@ -92,22 +91,27 @@ func (t *Terminator) drain(ctx context.Context, node *v1.Node) error { } // Enqueue for eviction t.evict(podsToEvict) - return lo.Ternary(len(podsToEvict) > 0, NodeDrainErr(fmt.Errorf("%d pods are waiting to be evicted", len(podsToEvict))), nil) + + if len(podsToEvict) > 0 { + return NewNodeDrainError(fmt.Errorf("%d pods are waiting to be evicted", len(podsToEvict))) + } + return nil } -// terminate calls cloud provider delete then removes the finalizer to delete the node -func (t *Terminator) terminate(ctx context.Context, node *v1.Node) error { +// TerminateNode calls cloud provider delete then removes the finalizer to delete the node +func (t *Terminator) TerminateNode(ctx context.Context, node *v1.Node) error { stored := node.DeepCopy() // Delete the instance associated with node - if err := t.CloudProvider.Delete(ctx, machineutil.NewFromNode(node)); cloudprovider.IgnoreMachineNotFoundError(err) != nil { + if err := t.cloudProvider.Delete(ctx, machineutil.NewFromNode(node)); cloudprovider.IgnoreMachineNotFoundError(err) != nil { return fmt.Errorf("terminating cloudprovider instance, %w", err) } controllerutil.RemoveFinalizer(node, v1alpha5.TerminationFinalizer) if !equality.Semantic.DeepEqual(node, stored) { - if err := t.KubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { - return client.IgnoreNotFound(err) - } logging.FromContext(ctx).Infof("deleted node") + if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + return err + } + terminationSummary.Observe(time.Since(node.DeletionTimestamp.Time).Seconds()) } return nil } @@ -115,7 +119,7 @@ func (t *Terminator) terminate(ctx context.Context, node *v1.Node) error { // getPods returns a list of evictable pods for the node func (t *Terminator) getPods(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { podList := &v1.PodList{} - if err := t.KubeClient.List(ctx, podList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { + if err := t.kubeClient.List(ctx, podList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil { return nil, fmt.Errorf("listing pods on node, %w", err) } var pods []*v1.Pod @@ -149,9 +153,9 @@ func (t *Terminator) evict(pods []*v1.Pod) { } // 2. Evict critical pods if all noncritical are evicted if len(nonCritical) == 0 { - t.EvictionQueue.Add(critical) + t.evictionQueue.Add(critical) } else { - t.EvictionQueue.Add(nonCritical) + t.evictionQueue.Add(nonCritical) } } @@ -159,5 +163,5 @@ func (t *Terminator) isStuckTerminating(pod *v1.Pod) bool { if pod.DeletionTimestamp == nil { return false } - return t.Clock.Now().After(pod.DeletionTimestamp.Time.Add(1 * time.Minute)) + return t.clock.Now().After(pod.DeletionTimestamp.Time.Add(1 * time.Minute)) } diff --git a/pkg/controllers/state/node.go b/pkg/controllers/state/node.go index 11d6c9d757..b1db420405 100644 --- a/pkg/controllers/state/node.go +++ b/pkg/controllers/state/node.go @@ -180,8 +180,7 @@ func (in *Node) Taints() []v1.Taint { // by a Machine inside of cluster state func (in *Node) Initialized() bool { if in.Machine != nil { - if in.Node != nil && in.Machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized) != nil && - in.Machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).Status == v1.ConditionTrue { + if in.Node != nil && in.Machine.StatusConditions().GetCondition(v1alpha5.MachineInitialized).IsTrue() { return true } return false diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index ad4d028a33..f38efbaaea 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -220,7 +220,7 @@ var _ = Describe("Inflight Nodes", func() { }, }, Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -258,7 +258,7 @@ var _ = Describe("Inflight Nodes", func() { }, }, Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), Capacity: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("32Gi"), @@ -306,7 +306,7 @@ var _ = Describe("Inflight Nodes", func() { }, }, Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), Capacity: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("32Gi"), @@ -390,7 +390,7 @@ var _ = Describe("Inflight Nodes", func() { }, }, Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), Capacity: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("32Gi"), @@ -439,7 +439,7 @@ var _ = Describe("Inflight Nodes", func() { It("should continue node nomination when an inflight node becomes a real node", func() { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -460,7 +460,7 @@ var _ = Describe("Inflight Nodes", func() { It("should continue MarkedForDeletion when an inflight node becomes a real node", func() { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -961,7 +961,7 @@ var _ = Describe("Node Resource Level", func() { }, }, Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), Capacity: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("32Gi"), @@ -1232,7 +1232,7 @@ var _ = Describe("Cluster State Sync", func() { // Deploy 1000 nodes and sync them all with the cluster for i := 0; i < 1000; i++ { node := test.Node(test.NodeOptions{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }) ExpectApplied(ctx, env.Client, node) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) @@ -1258,7 +1258,7 @@ var _ = Describe("Cluster State Sync", func() { } Expect(cluster.Synced(ctx)).To(BeTrue()) for i := 0; i < 1000; i++ { - nodes[i].Spec.ProviderID = test.ProviderID() + nodes[i].Spec.ProviderID = test.RandomProviderID() ExpectApplied(ctx, env.Client, nodes[i]) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(nodes[i])) } @@ -1269,7 +1269,7 @@ var _ = Describe("Cluster State Sync", func() { for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -1281,7 +1281,7 @@ var _ = Describe("Cluster State Sync", func() { // Deploy 250 nodes to the cluster that also have machines for i := 0; i < 250; i++ { node := test.Node(test.NodeOptions{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }) machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ @@ -1295,7 +1295,7 @@ var _ = Describe("Cluster State Sync", func() { // Deploy 250 nodes to the cluster for i := 0; i < 250; i++ { node := test.Node(test.NodeOptions{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }) ExpectApplied(ctx, env.Client, node) ExpectReconcileSucceeded(ctx, nodeController, client.ObjectKeyFromObject(node)) @@ -1304,7 +1304,7 @@ var _ = Describe("Cluster State Sync", func() { for i := 0; i < 500; i++ { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -1317,7 +1317,7 @@ var _ = Describe("Cluster State Sync", func() { for i := 0; i < 500; i++ { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) node := test.Node(test.NodeOptions{ @@ -1334,7 +1334,7 @@ var _ = Describe("Cluster State Sync", func() { for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) // One of them doesn't have its providerID @@ -1351,7 +1351,7 @@ var _ = Describe("Cluster State Sync", func() { for i := 0; i < 1000; i++ { machine := test.Machine(v1alpha5.Machine{ Status: v1alpha5.MachineStatus{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }, }) ExpectApplied(ctx, env.Client, machine) @@ -1367,7 +1367,7 @@ var _ = Describe("Cluster State Sync", func() { // Deploy 1000 nodes and sync them all with the cluster for i := 0; i < 1000; i++ { node := test.Node(test.NodeOptions{ - ProviderID: test.ProviderID(), + ProviderID: test.RandomProviderID(), }) ExpectApplied(ctx, env.Client, node) diff --git a/pkg/controllers/termination/controller.go b/pkg/controllers/termination/controller.go index 3fd8b8684f..cc5fe75ed9 100644 --- a/pkg/controllers/termination/controller.go +++ b/pkg/controllers/termination/controller.go @@ -22,62 +22,34 @@ import ( "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" - "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" - crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/prometheus/client_golang/prometheus" - "github.com/aws/karpenter-core/pkg/apis/v1alpha5" - "github.com/aws/karpenter-core/pkg/cloudprovider" + "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" "github.com/aws/karpenter-core/pkg/events" - "github.com/aws/karpenter-core/pkg/metrics" corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" ) -var ( - terminationSummary = prometheus.NewSummary( - prometheus.SummaryOpts{ - Namespace: "karpenter", - Subsystem: "nodes", - Name: "termination_time_seconds", - Help: "The time taken between a node's deletion request and the removal of its finalizer", - Objectives: metrics.SummaryObjectives(), - }, - ) -) - -func init() { - crmetrics.Registry.MustRegister(terminationSummary) -} - var _ corecontroller.FinalizingTypedController[*v1.Node] = (*Controller)(nil) // Controller for the resource type Controller struct { - Terminator *Terminator - KubeClient client.Client - Recorder events.Recorder + terminator *terminator.Terminator + kubeClient client.Client + recorder events.Recorder } -// NewController constructs a terminationController instance -func NewController(clk clock.Clock, kubeClient client.Client, evictionQueue *EvictionQueue, - recorder events.Recorder, cloudProvider cloudprovider.CloudProvider) corecontroller.Controller { - +// NewController constructs a controller instance +func NewController(kubeClient client.Client, terminator *terminator.Terminator, recorder events.Recorder) corecontroller.Controller { return corecontroller.Typed[*v1.Node](kubeClient, &Controller{ - KubeClient: kubeClient, - Terminator: &Terminator{ - KubeClient: kubeClient, - CloudProvider: cloudProvider, - EvictionQueue: evictionQueue, - Clock: clk, - }, - Recorder: recorder, + kubeClient: kubeClient, + terminator: terminator, + recorder: recorder, }) } @@ -93,20 +65,19 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if !controllerutil.ContainsFinalizer(node, v1alpha5.TerminationFinalizer) { return reconcile.Result{}, nil } - if err := c.Terminator.cordon(ctx, node); err != nil { + if err := c.terminator.Cordon(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("cordoning node, %w", err) } - if err := c.Terminator.drain(ctx, node); err != nil { - if IsNodeDrainErr(err) { - c.Recorder.Publish(events.NodeFailedToDrain(node, err)) + if err := c.terminator.Drain(ctx, node); err != nil { + if terminator.IsNodeDrainError(err) { + c.recorder.Publish(events.NodeFailedToDrain(node, err)) return reconcile.Result{Requeue: true}, nil } return reconcile.Result{}, fmt.Errorf("draining node, %w", err) } - if err := c.Terminator.terminate(ctx, node); err != nil { + if err := c.terminator.TerminateNode(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("terminating node, %w", err) } - terminationSummary.Observe(time.Since(node.DeletionTimestamp.Time).Seconds()) return reconcile.Result{}, nil } diff --git a/pkg/controllers/termination/suite_test.go b/pkg/controllers/termination/suite_test.go index 6511985496..0e050915f1 100644 --- a/pkg/controllers/termination/suite_test.go +++ b/pkg/controllers/termination/suite_test.go @@ -29,6 +29,7 @@ import ( "github.com/aws/karpenter-core/pkg/apis" "github.com/aws/karpenter-core/pkg/apis/v1alpha5" "github.com/aws/karpenter-core/pkg/cloudprovider/fake" + "github.com/aws/karpenter-core/pkg/controllers/machine/terminator" "github.com/aws/karpenter-core/pkg/controllers/termination" "github.com/aws/karpenter-core/pkg/operator/controller" "github.com/aws/karpenter-core/pkg/operator/scheme" @@ -47,7 +48,7 @@ import ( var ctx context.Context var terminationController controller.Controller -var evictionQueue *termination.EvictionQueue +var evictionQueue *terminator.EvictionQueue var env *test.Environment var defaultOwnerRefs = []metav1.OwnerReference{{Kind: "ReplicaSet", APIVersion: "appsv1", Name: "rs", UID: "1234567890"}} var fakeClock *clock.FakeClock @@ -64,8 +65,8 @@ var _ = BeforeSuite(func() { cloudProvider := fake.NewCloudProvider() eventRecorder := test.NewEventRecorder() - evictionQueue = termination.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), eventRecorder) - terminationController = termination.NewController(fakeClock, env.Client, evictionQueue, eventRecorder, cloudProvider) + evictionQueue = terminator.NewEvictionQueue(ctx, env.KubernetesInterface.CoreV1(), eventRecorder) + terminationController = termination.NewController(env.Client, terminator.NewTerminator(fakeClock, env.Client, cloudProvider, evictionQueue), eventRecorder) }) var _ = AfterSuite(func() { @@ -598,7 +599,7 @@ var _ = Describe("Termination", func() { }) }) -func ExpectNotEnqueuedForEviction(e *termination.EvictionQueue, pods ...*v1.Pod) { +func ExpectNotEnqueuedForEviction(e *terminator.EvictionQueue, pods ...*v1.Pod) { for _, pod := range pods { ExpectWithOffset(1, e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse()) } diff --git a/pkg/scheduling/taints.go b/pkg/scheduling/taints.go index 104e253fe6..1eeb08a690 100644 --- a/pkg/scheduling/taints.go +++ b/pkg/scheduling/taints.go @@ -17,6 +17,7 @@ package scheduling import ( "fmt" + "github.com/samber/lo" "go.uber.org/multierr" v1 "k8s.io/api/core/v1" ) @@ -38,3 +39,18 @@ func (ts Taints) Tolerates(pod *v1.Pod) (errs error) { } return errs } + +// Merge merges in taints with the passed in taints. +func (ts Taints) Merge(with Taints) Taints { + res := lo.Map(ts, func(t v1.Taint, _ int) v1.Taint { + return t + }) + for _, taint := range with { + if _, ok := lo.Find(res, func(t v1.Taint) bool { + return taint.MatchTaint(&t) + }); !ok { + res = append(res, taint) + } + } + return res +} diff --git a/pkg/test/environment.go b/pkg/test/environment.go index 0f361e24b8..33e5fd9c6c 100644 --- a/pkg/test/environment.go +++ b/pkg/test/environment.go @@ -16,6 +16,7 @@ package test import ( "context" + "log" "os" "strings" @@ -102,6 +103,9 @@ func NewEnvironment(scheme *runtime.Scheme, options ...functional.Option[Environ go func() { lo.Must0(cache.Start(ctx)) }() + if !cache.WaitForCacheSync(ctx) { + log.Fatalf("cache failed to sync") + } } return &Environment{ Environment: environment, diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 17fe82fcb8..fe8b0ad533 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -36,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/apis" "knative.dev/pkg/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -267,21 +268,25 @@ func ExpectReconcileSucceeded(ctx context.Context, reconciler reconcile.Reconcil } func ExpectReconcileFailed(ctx context.Context, reconciler reconcile.Reconciler, key client.ObjectKey) { - result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) - ExpectWithOffset(1, err).ToNot(Succeed(), fmt.Sprintf("got result, %v", result)) + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + ExpectWithOffset(1, err).To(HaveOccurred()) } -func ExpectMetric(prefix string) *prometheus.MetricFamily { - metrics, err := metrics.Registry.Gather() - ExpectWithOffset(1, err).To(BeNil()) - var selected *prometheus.MetricFamily - for _, mf := range metrics { - if mf.GetName() == prefix { - selected = mf - } - } - ExpectWithOffset(1, selected).ToNot(BeNil(), fmt.Sprintf("expected to find a '%s' metric", prefix)) - return selected +func ExpectStatusConditionExists(obj apis.ConditionsAccessor, t apis.ConditionType) apis.Condition { + conds := obj.GetConditions() + cond, ok := lo.Find(conds, func(c apis.Condition) bool { + return c.Type == t + }) + ExpectWithOffset(1, ok).To(BeTrue()) + return cond +} + +func ExpectOwnerReferenceExists(obj, owner client.Object) metav1.OwnerReference { + or, found := lo.Find(obj.GetOwnerReferences(), func(o metav1.OwnerReference) bool { + return o.UID == owner.GetUID() + }) + Expect(found).To(BeTrue()) + return or } // FindMetricWithLabelValues attempts to find a metric with a name with a set of label values diff --git a/pkg/test/metadata.go b/pkg/test/metadata.go index dc268d4e27..8cd0f9e7c8 100644 --- a/pkg/test/metadata.go +++ b/pkg/test/metadata.go @@ -65,6 +65,10 @@ func MustMerge[T interface{}](dest T, srcs ...T) T { return dest } -func ProviderID() string { - return fmt.Sprintf("fake://%s", randomdata.Alphanumeric(17)) +func RandomProviderID() string { + return ProviderID(randomdata.Alphanumeric(17)) +} + +func ProviderID(base string) string { + return fmt.Sprintf("fake:///%s", base) } diff --git a/pkg/utils/machine/machine.go b/pkg/utils/machine/machine.go index 55abd5e93e..1e093ab28b 100644 --- a/pkg/utils/machine/machine.go +++ b/pkg/utils/machine/machine.go @@ -29,14 +29,14 @@ import ( // Deprecated: This Machine generator function can be removed when v1alpha6 migration has completed. func New(node *v1.Node, provisioner *v1alpha5.Provisioner) *v1alpha5.Machine { machine := NewFromNode(node) - machine.Annotations = lo.Assign(machine.Annotations, v1alpha5.ProviderAnnotation(provisioner.Spec.Provider)) + machine.Annotations = lo.Assign(provisioner.Annotations, v1alpha5.ProviderAnnotation(provisioner.Spec.Provider)) + machine.Labels = lo.Assign(provisioner.Labels, map[string]string{v1alpha5.ProvisionerNameLabelKey: provisioner.Name}) machine.Spec.Kubelet = provisioner.Spec.KubeletConfiguration machine.Spec.Taints = provisioner.Spec.Taints - machine.Spec.Requirements = provisioner.Spec.Requirements machine.Spec.StartupTaints = provisioner.Spec.StartupTaints + machine.Spec.Requirements = provisioner.Spec.Requirements machine.Spec.MachineTemplateRef = provisioner.Spec.ProviderRef lo.Must0(controllerutil.SetOwnerReference(provisioner, machine, scheme.Scheme)) - return machine }