diff --git a/pkg/apis/provisioning/v1alpha1/provisioner.go b/pkg/apis/provisioning/v1alpha1/provisioner.go index af499a2e1035..add10963cfaf 100644 --- a/pkg/apis/provisioning/v1alpha1/provisioner.go +++ b/pkg/apis/provisioning/v1alpha1/provisioner.go @@ -100,9 +100,9 @@ var ( OperatingSystemLabelKey = "kubernetes.io/os" // Reserved labels - ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name" - ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace" - ProvisionerPhaseLabel = SchemeGroupVersion.Group + "/lifecycle-phase" + ProvisionerNameLabelKey = SchemeGroupVersion.Group + "/name" + ProvisionerNamespaceLabelKey = SchemeGroupVersion.Group + "/namespace" + ProvisionerUnderutilizedLabelKey = SchemeGroupVersion.Group + "/underutilized" // Reserved annotations ProvisionerTTLKey = SchemeGroupVersion.Group + "/ttl" @@ -113,9 +113,7 @@ var ( ) const ( - ProvisionerUnderutilizedPhase = "underutilized" - ProvisionerTerminablePhase = "terminable" - ProvisionerDrainingPhase = "draining" + KarpenterFinalizer = "karpenter.sh/termination" ) // Provisioner is the Schema for the Provisioners API diff --git a/pkg/apis/provisioning/v1alpha1/provisioner_validation.go b/pkg/apis/provisioning/v1alpha1/provisioner_validation.go index 78f06bafb15d..262015da833b 100644 --- a/pkg/apis/provisioning/v1alpha1/provisioner_validation.go +++ b/pkg/apis/provisioning/v1alpha1/provisioner_validation.go @@ -30,7 +30,7 @@ var ( OperatingSystemLabelKey, ProvisionerNameLabelKey, ProvisionerNamespaceLabelKey, - ProvisionerPhaseLabel, + ProvisionerUnderutilizedLabelKey, ProvisionerTTLKey, ZoneLabelKey, InstanceTypeLabelKey, diff --git a/pkg/apis/provisioning/v1alpha1/provisioner_validation_test.go b/pkg/apis/provisioning/v1alpha1/provisioner_validation_test.go index 69e6ec3dcd39..0b2269109235 100644 --- a/pkg/apis/provisioning/v1alpha1/provisioner_validation_test.go +++ b/pkg/apis/provisioning/v1alpha1/provisioner_validation_test.go @@ -80,7 +80,7 @@ var _ = Describe("Validation", func() { OperatingSystemLabelKey, ProvisionerNameLabelKey, ProvisionerNamespaceLabelKey, - ProvisionerPhaseLabel, + ProvisionerUnderutilizedLabelKey, ZoneLabelKey, InstanceTypeLabelKey, } { diff --git a/pkg/cloudprovider/aws/cloudprovider.go b/pkg/cloudprovider/aws/cloudprovider.go index 5e37ab83fbc6..39df68fbeb32 100644 --- a/pkg/cloudprovider/aws/cloudprovider.go +++ b/pkg/cloudprovider/aws/cloudprovider.go @@ -154,8 +154,8 @@ func (c *CloudProvider) GetInstanceTypes(ctx context.Context) ([]cloudprovider.I return c.instanceTypeProvider.Get(ctx) } -func (c *CloudProvider) Terminate(ctx context.Context, nodes []*v1.Node) error { - return c.instanceProvider.Terminate(ctx, nodes) +func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error { + return c.instanceProvider.Terminate(ctx, node) } // Validate cloud provider specific components of the cluster spec diff --git a/pkg/cloudprovider/aws/instance.go b/pkg/cloudprovider/aws/instance.go index d7dbb0cac747..6f005a9a9f46 100644 --- a/pkg/cloudprovider/aws/instance.go +++ b/pkg/cloudprovider/aws/instance.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" ) type InstanceProvider struct { @@ -108,31 +109,23 @@ func (p *InstanceProvider) Create(ctx context.Context, return createFleetOutput.Instances[0].InstanceIds[0], nil } -func (p *InstanceProvider) Terminate(ctx context.Context, nodes []*v1.Node) error { - if len(nodes) == 0 { - return nil - } - ids := p.getInstanceIDs(nodes) - - _, err := p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{ - InstanceIds: ids, - }) +func (p *InstanceProvider) Terminate(ctx context.Context, node *v1.Node) error { + id, err := p.getInstanceID(node) if err != nil { - return fmt.Errorf("terminating %d instances, %w", len(ids), err) + return fmt.Errorf("getting instance ID for node %s, %w", node.Name, err) + } + if _, err = p.ec2api.TerminateInstancesWithContext(ctx, &ec2.TerminateInstancesInput{ + InstanceIds: []*string{id}, + }); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("terminating instance %s, %w", node.Name, err) } - return nil } -func (p *InstanceProvider) getInstanceIDs(nodes []*v1.Node) []*string { - ids := []*string{} - for _, node := range nodes { - id := strings.Split(node.Spec.ProviderID, "/") - if len(id) < 5 { - zap.S().Debugf("Continuing after failure to parse instance id, %s has invalid format", node.Name) - continue - } - ids = append(ids, aws.String(id[4])) +func (p *InstanceProvider) getInstanceID(node *v1.Node) (*string, error) { + id := strings.Split(node.Spec.ProviderID, "/") + if len(id) < 5 { + return nil, fmt.Errorf("parsing instance id %s", node.Spec.ProviderID) } - return ids + return aws.String(id[4]), nil } diff --git a/pkg/cloudprovider/aws/suite_test.go b/pkg/cloudprovider/aws/suite_test.go index f502f079f96d..a5d6f2518ac9 100644 --- a/pkg/cloudprovider/aws/suite_test.go +++ b/pkg/cloudprovider/aws/suite_test.go @@ -31,6 +31,7 @@ import ( "github.com/awslabs/karpenter/pkg/cloudprovider/aws/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" "github.com/awslabs/karpenter/pkg/controllers/provisioning/v1alpha1/allocation" + "github.com/awslabs/karpenter/pkg/test" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 36667fb04eb2..b5de3f371ad6 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -106,6 +106,6 @@ func (c *CloudProvider) Validate(context.Context, *v1alpha1.Constraints) *apis.F return nil } -func (c *CloudProvider) Terminate(context.Context, []*v1.Node) error { +func (c *CloudProvider) Terminate(ctx context.Context, node *v1.Node) error { return nil } diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 5bb5868e32d7..6df901c132f7 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -33,8 +33,8 @@ type CloudProvider interface { GetInstanceTypes(context.Context) ([]InstanceType, error) // Validate is a hook for additional constraint validation logic specific to the cloud provider Validate(context.Context, *v1alpha1.Constraints) *apis.FieldError - // Terminate nodes in cloudprovider - Terminate(context.Context, []*v1.Node) error + // Terminate node in cloudprovider + Terminate(context.Context, *v1.Node) error } // Packing is a binpacking solution of equivalently schedulable pods to a set of diff --git a/pkg/controllers/provisioning/v1alpha1/allocation/bind.go b/pkg/controllers/provisioning/v1alpha1/allocation/bind.go index ddb21569746a..2d3f44772402 100644 --- a/pkg/controllers/provisioning/v1alpha1/allocation/bind.go +++ b/pkg/controllers/provisioning/v1alpha1/allocation/bind.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "go.uber.org/zap" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -32,7 +33,9 @@ type Binder struct { } func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error { - // 1. Mark NodeReady=Unknown + // 1. Add the Karpenter finalizer to the node to enable the termination workflow + node.Finalizers = append(node.Finalizers, v1alpha1.KarpenterFinalizer) + // 2. Mark NodeReady=Unknown // Unfortunately, this detail is necessary to prevent kube-scheduler from // scheduling pods to nodes before they're created. Node Lifecycle // Controller will attach a Effect=NoSchedule taint in response to this @@ -44,7 +47,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error Type: v1.NodeReady, Status: v1.ConditionUnknown, }} - // 2. Idempotently create a node. In rare cases, nodes can come online and + // 3. Idempotently create a node. In rare cases, nodes can come online and // self register before the controller is able to register a node object // with the API server. In the common case, we create the node object // ourselves to enforce the binding decision and enable images to be pulled @@ -55,7 +58,7 @@ func (b *Binder) Bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) error } } - // 3. Bind pods + // 4. Bind pods for _, pod := range pods { if err := b.bind(ctx, node, pod); err != nil { zap.S().Errorf("Continuing after failing to bind, %s", err.Error()) diff --git a/pkg/controllers/provisioning/v1alpha1/reallocation/controller.go b/pkg/controllers/provisioning/v1alpha1/reallocation/controller.go index 434a542046b4..15c90c350a3a 100644 --- a/pkg/controllers/provisioning/v1alpha1/reallocation/controller.go +++ b/pkg/controllers/provisioning/v1alpha1/reallocation/controller.go @@ -77,8 +77,8 @@ func (c *Controller) Reconcile(ctx context.Context, object client.Object) (recon return reconcile.Result{}, fmt.Errorf("removing ttl from node, %w", err) } - // 3. Mark any Node past TTL as expired - if err := c.utilization.markTerminable(ctx, provisioner); err != nil { + // 3. Delete any node past its TTL + if err := c.utilization.terminateExpired(ctx, provisioner); err != nil { return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err) } return reconcile.Result{}, nil diff --git a/pkg/controllers/provisioning/v1alpha1/reallocation/suite_test.go b/pkg/controllers/provisioning/v1alpha1/reallocation/suite_test.go index a016d04373a1..79f8289a55da 100644 --- a/pkg/controllers/provisioning/v1alpha1/reallocation/suite_test.go +++ b/pkg/controllers/provisioning/v1alpha1/reallocation/suite_test.go @@ -95,15 +95,15 @@ var _ = Describe("Reallocation", func() { updatedNode := &v1.Node{} Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed()) - Expect(updatedNode.Labels).To(HaveKeyWithValue(v1alpha1.ProvisionerPhaseLabel, v1alpha1.ProvisionerUnderutilizedPhase)) + Expect(updatedNode.Labels).To(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey)) Expect(updatedNode.Annotations).To(HaveKey(v1alpha1.ProvisionerTTLKey)) }) It("should remove labels from utilized nodes", func() { node := test.NodeWith(test.NodeOptions{ Labels: map[string]string{ - v1alpha1.ProvisionerNameLabelKey: provisioner.Name, - v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace, - v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase, + v1alpha1.ProvisionerNameLabelKey: provisioner.Name, + v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace, + v1alpha1.ProvisionerUnderutilizedLabelKey: "true", }, Annotations: map[string]string{ v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(100) * time.Second).Format(time.RFC3339), @@ -124,7 +124,7 @@ var _ = Describe("Reallocation", func() { updatedNode := &v1.Node{} Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed()) - Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerPhaseLabel)) + Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha1.ProvisionerUnderutilizedLabelKey)) Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha1.ProvisionerTTLKey)) }) }) diff --git a/pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go b/pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go index 995b8b360bbb..97aee73e4b95 100644 --- a/pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go +++ b/pkg/controllers/provisioning/v1alpha1/reallocation/utilization.go @@ -44,7 +44,7 @@ func (u *Utilization) Reconcile(ctx context.Context, provisioner *v1alpha1.Provi } // 3. Mark any Node past TTL as expired - if err := u.markTerminable(ctx, provisioner); err != nil { + if err := u.terminateExpired(ctx, provisioner); err != nil { return fmt.Errorf("marking nodes terminable, %w", err) } return nil @@ -77,7 +77,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph persisted := node.DeepCopy() node.Labels = functional.UnionStringMaps( node.Labels, - map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase}, + map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"}, ) node.Annotations = functional.UnionStringMaps( node.Annotations, @@ -94,9 +94,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph // clearUnderutilized removes the TTL on underutilized nodes if there is sufficient resource usage func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alpha1.Provisioner) error { // 1. Get underutilized nodes - nodes, err := u.getNodes(ctx, provisioner, map[string]string{ - v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerUnderutilizedPhase, - }) + nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"}) if err != nil { return fmt.Errorf("listing labeled underutilized nodes, %w", err) } @@ -110,7 +108,7 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp if !utilsnode.IsUnderutilized(node, pods) { persisted := node.DeepCopy() - delete(node.Labels, v1alpha1.ProvisionerPhaseLabel) + delete(node.Labels, v1alpha1.ProvisionerUnderutilizedLabelKey) delete(node.Annotations, v1alpha1.ProvisionerTTLKey) if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { zap.S().Debugf("Could not remove underutilized labels on node %s, %w", node.Name, err) @@ -122,24 +120,19 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp return nil } -// markTerminable checks if a node is past its ttl and marks it -func (u *Utilization) markTerminable(ctx context.Context, provisioner *v1alpha1.Provisioner) error { +// terminateExpired checks if a node is past its ttl and marks it +func (u *Utilization) terminateExpired(ctx context.Context, provisioner *v1alpha1.Provisioner) error { // 1. Get underutilized nodes - nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerPhaseLabel: "underutilized"}) + nodes, err := u.getNodes(ctx, provisioner, map[string]string{v1alpha1.ProvisionerUnderutilizedLabelKey: "true"}) if err != nil { return fmt.Errorf("listing underutilized nodes, %w", err) } - // 2. Check if node is past TTL + // 2. Delete node if past TTL for _, node := range nodes { if utilsnode.IsPastTTL(node) { - persisted := node.DeepCopy() - node.Labels = functional.UnionStringMaps( - node.Labels, - map[string]string{v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase}, - ) - if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { - return fmt.Errorf("patching node %s, %w", node.Name, err) + if err := u.kubeClient.Delete(ctx, node); err != nil { + return fmt.Errorf("sending delete for node %s, %w", node.Name, err) } } } diff --git a/pkg/controllers/terminating/v1alpha1/controller.go b/pkg/controllers/terminating/v1alpha1/controller.go index 3ea268551d91..3b5f6590064b 100644 --- a/pkg/controllers/terminating/v1alpha1/controller.go +++ b/pkg/controllers/terminating/v1alpha1/controller.go @@ -19,7 +19,9 @@ import ( "fmt" "time" + provisioning "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider" + "github.com/awslabs/karpenter/pkg/utils/functional" v1 "k8s.io/api/core/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -63,18 +65,29 @@ func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface } } -// Reconcile executes a reallocation control loop for the resource +// Reconcile executes a termination control loop for the resource func (c *Controller) Reconcile(ctx context.Context, object client.Object) (reconcile.Result, error) { - // 1. Cordon terminable nodes - if err := c.terminator.cordonNodes(ctx); err != nil { - return reconcile.Result{}, fmt.Errorf("cordoning terminable nodes, %w", err) + node := object.(*v1.Node) + // 1. Check if node is terminable + if node.DeletionTimestamp == nil || !functional.ContainsString(node.Finalizers, provisioning.KarpenterFinalizer) { + return reconcile.Result{}, nil } - - // 2. Drain and delete nodes - if err := c.terminator.terminateNodes(ctx); err != nil { - return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err) + // 2. Cordon node + if err := c.terminator.cordonNode(ctx, node); err != nil { + return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err) + } + // 3. Drain node + drained, err := c.terminator.drainNode(ctx, node) + if err != nil { + return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err) + } + // 4. If fully drained, terminate the node + if drained { + if err := c.terminator.terminateNode(ctx, node); err != nil { + return reconcile.Result{}, fmt.Errorf("terminating nodes, %w", err) + } } - return reconcile.Result{}, nil + return reconcile.Result{Requeue: !drained}, nil } func (c *Controller) Watches(context.Context) (source.Source, handler.EventHandler, builder.WatchesOption) { diff --git a/pkg/controllers/terminating/v1alpha1/suite_test.go b/pkg/controllers/terminating/v1alpha1/suite_test.go index 63d43a270f19..00fca6fccc43 100644 --- a/pkg/controllers/terminating/v1alpha1/suite_test.go +++ b/pkg/controllers/terminating/v1alpha1/suite_test.go @@ -15,16 +15,13 @@ limitations under the License. package v1alpha1 import ( - "strings" + "context" "testing" - "time" - "github.com/Pallinder/go-randomdata" "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/cloudprovider/fake" "github.com/awslabs/karpenter/pkg/cloudprovider/registry" "github.com/awslabs/karpenter/pkg/test" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" . "github.com/awslabs/karpenter/pkg/test/expectations" @@ -58,18 +55,9 @@ var _ = AfterSuite(func() { }) var _ = Describe("Reallocation", func() { - var provisioner *v1alpha1.Provisioner - + var ctx context.Context BeforeEach(func() { - // Create Provisioner to give some time for the node to reconcile - provisioner = &v1alpha1.Provisioner{ - ObjectMeta: metav1.ObjectMeta{Name: strings.ToLower(randomdata.SillyName()), - Namespace: "default", - }, - Spec: v1alpha1.ProvisionerSpec{ - Cluster: &v1alpha1.ClusterSpec{Name: "test-cluster", Endpoint: "http://test-cluster", CABundle: "dGVzdC1jbHVzdGVyCg=="}, - }, - } + ctx = context.Background() }) AfterEach(func() { @@ -77,19 +65,16 @@ var _ = Describe("Reallocation", func() { }) Context("Reconciliation", func() { - It("should terminate nodes marked terminable", func() { + It("should terminate deleted nodes", func() { node := test.NodeWith(test.NodeOptions{ + Finalizers: []string{v1alpha1.KarpenterFinalizer}, Labels: map[string]string{ - v1alpha1.ProvisionerNameLabelKey: provisioner.Name, - v1alpha1.ProvisionerNamespaceLabelKey: provisioner.Namespace, - v1alpha1.ProvisionerPhaseLabel: v1alpha1.ProvisionerTerminablePhase, - }, - Annotations: map[string]string{ - v1alpha1.ProvisionerTTLKey: time.Now().Add(time.Duration(-100) * time.Second).Format(time.RFC3339), + v1alpha1.ProvisionerNameLabelKey: "default", + v1alpha1.ProvisionerNamespaceLabelKey: "default", }, }) ExpectCreatedWithStatus(env.Client, node) - ExpectCreated(env.Client, provisioner) + Expect(env.Client.Delete(ctx, node)).To(Succeed()) ExpectNotFound(env.Client, node) }) }) diff --git a/pkg/controllers/terminating/v1alpha1/terminate.go b/pkg/controllers/terminating/v1alpha1/terminate.go index cf83997975ca..1146456e0339 100644 --- a/pkg/controllers/terminating/v1alpha1/terminate.go +++ b/pkg/controllers/terminating/v1alpha1/terminate.go @@ -38,105 +38,61 @@ type Terminator struct { coreV1Client corev1.CoreV1Interface } -// cordonNodes takes in a list of expired nodes as input and cordons them -func (t *Terminator) cordonNodes(ctx context.Context) error { - // 1. Get terminable nodes - nodeList, err := t.getLabeledNodes(ctx, provisioning.ProvisionerTerminablePhase) - if err != nil { - return err +// cordonNode cordons a node +func (t *Terminator) cordonNode(ctx context.Context, node *v1.Node) error { + if node.Spec.Unschedulable { + return nil } - // 2. Cordon nodes - for _, node := range nodeList { - persisted := node.DeepCopy() - node.Spec.Unschedulable = true - node.Labels = functional.UnionStringMaps( - node.Labels, - map[string]string{provisioning.ProvisionerPhaseLabel: provisioning.ProvisionerDrainingPhase}, - ) - if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { - return fmt.Errorf("patching node %s, %w", node.Name, err) - } - zap.S().Debugf("Cordoned node %s", node.Name) + persisted := node.DeepCopy() + node.Spec.Unschedulable = true + if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { + return fmt.Errorf("patching node %s, %w", node.Name, err) } + zap.S().Debugf("Cordoned node %s", node.Name) return nil } -// terminateNodes takes in a list of expired non-drained nodes and calls drain on them -func (t *Terminator) terminateNodes(ctx context.Context) error { - // 1. Get draining nodes - draining, err := t.getLabeledNodes(ctx, provisioning.ProvisionerDrainingPhase) +// drainNode evicts pods from the node and returns true when fully drained +func (t *Terminator) drainNode(ctx context.Context, node *v1.Node) (bool, error) { + // 1. Get pods on node + pods, err := t.getPods(ctx, node) if err != nil { - return fmt.Errorf("listing draining nodes, %w", err) + return false, fmt.Errorf("listing pods for node %s, %w", node.Name, err) } - // 2. Drain nodes - drained := []*v1.Node{} - for _, node := range draining { - // TODO: Check if Node should be drained - // - Disrupts PDB - // - Pods owned by controller object - // - Pod on Node can't be rescheduled elsewhere - - // 2a. Get pods on node - pods, err := t.getPods(ctx, node) - if err != nil { - return fmt.Errorf("listing pods for node %s, %w", node.Name, err) - } - // 2b. Evict pods on node - empty := true - for _, p := range pods { - if !pod.IsOwnedByDaemonSet(p) { - empty = false - if err := t.coreV1Client.Pods(p.Namespace).Evict(ctx, &v1beta1.Eviction{ - ObjectMeta: metav1.ObjectMeta{ - Name: p.Name, - }, - }); err != nil { - zap.S().Debugf("Continuing after failing to evict pods from node %s, %s", node.Name, err.Error()) - } + // 2. Evict pods on node + empty := true + for _, p := range pods { + if !pod.IsOwnedByDaemonSet(p) { + empty = false + if err := t.coreV1Client.Pods(p.Namespace).Evict(ctx, &v1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: p.Name, + }, + }); err != nil { + zap.S().Debugf("Continuing after failing to evict pods from node %s, %s", node.Name, err.Error()) } } - // 2c. If node is empty, add to list of nodes to delete - if empty { - drained = append(drained, node) - } - } - // 3. Delete empty nodes - if err := t.deleteNodes(ctx, drained); err != nil { - return fmt.Errorf("deleting %d nodes, %w", len(drained), err) } - return nil + return empty, nil } -// deleteNode uses a cloudprovider-specific delete to delete a set of nodes -func (t *Terminator) deleteNodes(ctx context.Context, nodes []*v1.Node) error { - // 1. Delete node in cloudprovider's instanceprovider - if err := t.cloudProvider.Terminate(ctx, nodes); err != nil { +// terminateNode terminates the node then removes the finalizer to delete the node +func (t *Terminator) terminateNode(ctx context.Context, node *v1.Node) error { + // 1. Terminate instance associated with node + if err := t.cloudProvider.Terminate(ctx, node); err != nil { return fmt.Errorf("terminating cloudprovider instance, %w", err) } - // 2. Delete node in APIServer - for _, node := range nodes { - if err := t.kubeClient.Delete(ctx, node); err != nil { - zap.S().Debugf("Continuing after failing to delete node %s, %s", node.Name, err.Error()) - } - zap.S().Infof("Terminated node %s", node.Name) + zap.S().Infof("Terminated instance %s", node.Name) + // 2. Remove finalizer from node in APIServer + persisted := node.DeepCopy() + node.Finalizers = functional.StringSliceWithout(node.Finalizers, provisioning.KarpenterFinalizer) + if err := t.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil { + return fmt.Errorf("removing finalizer from node %s, %w", node.Name, err) } + zap.S().Debugf("Deleted node %s", node.Name) return nil } -// getLabeledNodes returns a list of nodes with the provisioner's labels and given labels -func (t *Terminator) getLabeledNodes(ctx context.Context, phaseLabel string) ([]*v1.Node, error) { - nodes := &v1.NodeList{} - if err := t.kubeClient.List(ctx, nodes, client.HasLabels([]string{ - provisioning.ProvisionerNameLabelKey, - provisioning.ProvisionerNamespaceLabelKey, - }), client.MatchingLabels(map[string]string{ - provisioning.ProvisionerPhaseLabel: phaseLabel, - })); err != nil { - return nil, fmt.Errorf("listing nodes, %w", err) - } - return ptr.NodeListToSlice(nodes), nil -} - // getPods returns a list of pods scheduled to a node func (t *Terminator) getPods(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) { pods := &v1.PodList{} diff --git a/pkg/test/expectations/expectations.go b/pkg/test/expectations/expectations.go index 3f370ba69f47..6c41c1c2a816 100644 --- a/pkg/test/expectations/expectations.go +++ b/pkg/test/expectations/expectations.go @@ -22,10 +22,12 @@ import ( "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/utils/conditions" "github.com/awslabs/karpenter/pkg/utils/log" + . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "knative.dev/pkg/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -74,9 +76,19 @@ func ExpectCreatedWithStatus(c client.Client, objects ...client.Object) { } } +func ExpectDeletedNode(c client.Client, n *v1.Node) { + persisted := n.DeepCopy() + n.Finalizers = []string{} + Expect(c.Patch(context.Background(), n, client.MergeFrom(persisted))).To(Succeed()) + ExpectDeleted(c, n) +} + func ExpectDeleted(c client.Client, objects ...client.Object) { for _, object := range objects { - Expect(c.Delete(context.Background(), object)).To(Succeed()) + Expect(c.Delete(context.Background(), object, &client.DeleteOptions{GracePeriodSeconds: ptr.Int64(0)})).To(Succeed()) + } + for _, object := range objects { + ExpectNotFound(c, object) } } @@ -110,7 +122,7 @@ func ExpectCleanedUp(c client.Client) { nodes := v1.NodeList{} Expect(c.List(ctx, &nodes)).To(Succeed()) for _, node := range nodes.Items { - ExpectDeleted(c, &node) + ExpectDeletedNode(c, &node) } provisioners := v1alpha1.ProvisionerList{} Expect(c.List(ctx, &provisioners)).To(Succeed()) diff --git a/pkg/test/nodes.go b/pkg/test/nodes.go index ef581c4241b1..de4c3498901d 100644 --- a/pkg/test/nodes.go +++ b/pkg/test/nodes.go @@ -29,6 +29,7 @@ type NodeOptions struct { ReadyStatus v1.ConditionStatus Unschedulable bool Allocatable v1.ResourceList + Finalizers []string } func NodeWith(options NodeOptions) *v1.Node { @@ -44,12 +45,16 @@ func NodeWith(options NodeOptions) *v1.Node { if options.Annotations == nil { options.Annotations = map[string]string{} } + if options.Finalizers == nil { + options.Finalizers = []string{} + } return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: options.Name, Labels: options.Labels, Annotations: options.Annotations, + Finalizers: options.Finalizers, }, Spec: v1.NodeSpec{ Unschedulable: options.Unschedulable, diff --git a/pkg/utils/functional/functional.go b/pkg/utils/functional/functional.go index 9a27d8e45b3a..d71c711c7521 100644 --- a/pkg/utils/functional/functional.go +++ b/pkg/utils/functional/functional.go @@ -31,6 +31,17 @@ func UnionStringMaps(maps ...map[string]string) map[string]string { return result } +func StringSliceWithout(vals []string, remove string) []string { + without := []string{} + for _, val := range vals { + if val == remove { + continue + } + without = append(without, val) + } + return without +} + // IntersectStringSlice takes the intersection of all string slices func IntersectStringSlice(slices ...[]string) []string { // count occurrences diff --git a/pkg/utils/node/predicates.go b/pkg/utils/node/predicates.go index bf86d616809f..9057094370f8 100644 --- a/pkg/utils/node/predicates.go +++ b/pkg/utils/node/predicates.go @@ -15,10 +15,11 @@ limitations under the License. package node import ( + "time" + "github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha1" "github.com/awslabs/karpenter/pkg/utils/pod" v1 "k8s.io/api/core/v1" - "time" ) func IsReadyAndSchedulable(node v1.Node) bool {