Skip to content

Commit

Permalink
pkg/k8s: Move drain lease into separate file
Browse files Browse the repository at this point in the history
Restructure code to move lease functionality into separate file.

Signed-off-by: Heathcliff <[email protected]>
  • Loading branch information
heathcliff26 committed Nov 17, 2024
1 parent 11879d8 commit b133e65
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 46 deletions.
53 changes: 7 additions & 46 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package k8s
import (
"context"
"log/slog"
"time"

"github.com/heathcliff26/fleetlock/pkg/k8s/utils"
systemdutils "github.com/heathcliff26/fleetlock/pkg/systemd-utils"

coordv1 "k8s.io/api/coordination/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -57,43 +54,18 @@ func NewFakeClient() (*Client, *fake.Clientset) {
// Drain a node from all pods and set it to unschedulable.
// Status will be tracked in lease, only one drain will be run at a time.
func (c *Client) DrainNode(node string) error {
lease, err := c.client.CoordinationV1().Leases(c.namespace).Get(context.Background(), drainLeaseName(node), metav1.GetOptions{})
if errors.IsNotFound(err) {
lease = &coordv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: drainLeaseName(node),
},
Spec: coordv1.LeaseSpec{
HolderIdentity: utils.Pointer("draining"),
LeaseDurationSeconds: utils.Pointer(int32(300)),
AcquireTime: &metav1.MicroTime{Time: time.Now()},
},
}

lease, err = c.client.CoordinationV1().Leases(c.namespace).Create(context.Background(), lease, metav1.CreateOptions{})
if err != nil {
return err
}
} else if err != nil {
lease := NewLease(drainLeaseName(node), c.client.CoordinationV1().Leases(c.namespace))
err := lease.Lock(context.Background(), 300)
if err != nil {
return err
} else if lease.Spec.AcquireTime != nil && time.Now().After(lease.Spec.AcquireTime.Time.Add(5*time.Minute)) {
lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Now()}
lease, err = c.client.CoordinationV1().Leases(c.namespace).Update(context.Background(), lease, metav1.UpdateOptions{})
if err != nil {
return err
}
} else {
return NewErrorDrainIsLocked()
}

err = c.drainNode(node)
if err != nil {
return err
}

_, err = c.client.CoordinationV1().Leases(c.namespace).Patch(context.Background(), lease.GetName(), types.MergePatchType, []byte("{\"spec\":{\"holderIdentity\":\"done\"}}"), metav1.PatchOptions{})
return err
return lease.Done(context.Background())
}

// Drain a node of all pods, skipping daemonsets
Expand Down Expand Up @@ -173,22 +145,11 @@ func (c *Client) UncordonNode(node string) error {
if err != nil {
return err
}
err = c.client.CoordinationV1().Leases(c.namespace).Delete(context.Background(), drainLeaseName(node), metav1.DeleteOptions{})
if errors.IsNotFound(err) {
return nil
} else {
return err
}

return NewLease(drainLeaseName(node), c.client.CoordinationV1().Leases(c.namespace)).Delete(context.Background())
}

// Check if a node has been drained
func (c *Client) IsDrained(node string) (bool, error) {
lease, err := c.client.CoordinationV1().Leases(c.namespace).Get(context.Background(), drainLeaseName(node), metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}

return *lease.Spec.HolderIdentity == "done", nil
return NewLease(drainLeaseName(node), c.client.CoordinationV1().Leases(c.namespace)).IsDone(context.Background())
}
18 changes: 18 additions & 0 deletions pkg/k8s/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,24 @@ func TestDrainNode(t *testing.T) {
err := c.DrainNode(testNodeName)
assert.Equal(t, NewErrorDrainIsLocked(), err, "Should return an error signaling that a drain is already in progress")
})
t.Run("LeaseInvalid", func(t *testing.T) {
c, client := NewFakeClient()
initTestCluster(client)

lease := &coordv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: drainLeaseName(testNodeName),
},
Spec: coordv1.LeaseSpec{
HolderIdentity: utils.Pointer("draining"),
},
}
_, _ = client.CoordinationV1().Leases(testNamespace).Create(context.Background(), lease, metav1.CreateOptions{})

err := c.DrainNode(testNodeName)
assert.Equal(t, NewErrorInvalidLease(), err, "Should return an error signaling that the lease is invalid")
})
t.Run("LeaseExpired", func(t *testing.T) {
c, client := NewFakeClient()
initTestCluster(client)
Expand Down
20 changes: 20 additions & 0 deletions pkg/k8s/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,23 @@ func NewErrorDrainIsLocked() error {
func (e ErrorDrainIsLocked) Error() string {
return "Can't drain node, as another drain is already in progress"
}

type ErrorInvalidLease struct{}

func NewErrorInvalidLease() error {
return ErrorInvalidLease{}
}

func (e ErrorInvalidLease) Error() string {
return "Invalid lease, either AcquireTime or LeaseDurationSeconds are nil"
}

type ErrorLeaseNil struct{}

func NewErrorLeaseNil() error {
return ErrorLeaseNil{}
}

func (e ErrorLeaseNil) Error() string {
return "Tried changing lease, but lease status on cluster is unknown"
}
109 changes: 109 additions & 0 deletions pkg/k8s/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package k8s

import (
"context"
"time"

"github.com/heathcliff26/fleetlock/pkg/k8s/utils"
coordv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
client "k8s.io/client-go/kubernetes/typed/coordination/v1"
)

type lease struct {
name string
lease *coordv1.Lease
client client.LeaseInterface
}

func NewLease(name string, client client.LeaseInterface) *lease {
return &lease{
name: name,
client: client,
}
}

func (l *lease) Lock(ctx context.Context, duration int32) error {
lease, err := l.client.Get(ctx, l.name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return l.create(ctx, duration)
} else if err != nil {
return err
}

if lease.Spec.AcquireTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return NewErrorInvalidLease()
}

validUntil := lease.Spec.AcquireTime.Time.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)

if time.Now().After(validUntil) {
lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Now()}
lease, err = l.client.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return err
}
} else {
return NewErrorDrainIsLocked()
}

l.lease = lease
return nil
}

func (l *lease) create(ctx context.Context, duration int32) error {
lease := &coordv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: l.name,
},
Spec: coordv1.LeaseSpec{
HolderIdentity: utils.Pointer("draining"),
LeaseDurationSeconds: utils.Pointer(duration),
AcquireTime: &metav1.MicroTime{Time: time.Now()},
},
}

lease, err := l.client.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
return err
}

l.lease = lease
return nil
}

func (l *lease) Done(ctx context.Context) error {
if l.lease == nil {
return NewErrorLeaseNil()
}

*l.lease.Spec.HolderIdentity = "done"
lease, err := l.client.Update(ctx, l.lease, metav1.UpdateOptions{})
if err != nil {
return err
}

l.lease = lease
return nil
}

func (l *lease) Delete(ctx context.Context) error {
err := l.client.Delete(ctx, l.name, metav1.DeleteOptions{})
if errors.IsNotFound(err) {
return nil
} else {
return err
}
}

func (l *lease) IsDone(ctx context.Context) (bool, error) {
lease, err := l.client.Get(context.Background(), l.name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}

return *lease.Spec.HolderIdentity == "done", nil
}

0 comments on commit b133e65

Please sign in to comment.