Skip to content

Commit

Permalink
feat: drain add fallback to remove pods with grace period 0 (#43)
Browse files Browse the repository at this point in the history
* feat: drain add fallback to remove pods with grace period 0

* tests: add unit tests for failures

* chore: improve logs

* chore: refactor

* build: update helm package
  • Loading branch information
aldor007 authored Oct 3, 2022
1 parent e526c49 commit fbeae32
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 45 deletions.
49 changes: 36 additions & 13 deletions actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface)
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 5 * time.Minute,
podsDeleteTimeout: 2 * time.Minute,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
Expand All @@ -51,10 +51,10 @@ type drainNodeHandler struct {
func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
req, ok := data.(*castai.ActionDrainNode)
if !ok {
return fmt.Errorf("unexpected type %T for delete drain handler", data)
return fmt.Errorf("unexpected type %T for drain handler", data)
}

log := h.log.WithField("node_name", req.NodeName)
log := h.log.WithFields(logrus.Fields{"node_name": req.NodeName, "action": "drain"})

node, err := h.clientset.CoreV1().Nodes().Get(ctx, req.NodeName, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -83,12 +83,27 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
if !req.Force {
return err
}
// If force is set and evict timeout exceeded delete pods.
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteCancel()
if err := h.deleteNodePods(deleteCtx, log, node); err != nil {
return fmt.Errorf("forcefully deleting pods: %w", err)
// Try deleting pods gracefully first, then delete with 0 grace period.
options := []metav1.DeleteOptions{
{},
*metav1.NewDeleteOptions(0),
}

var err error
for _, o := range options {
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteCancel()

err = h.deleteNodePods(deleteCtx, log, node, o)
if err == nil {
break
}
if !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("forcefully deleting pods: %w", err)
}
}

return err
}

log.Info("node drained")
Expand Down Expand Up @@ -126,15 +141,23 @@ func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLo
return h.waitNodePodsTerminated(ctx, node)
}

func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node, options metav1.DeleteOptions) error {
pods, err := h.listNodePodsToEvict(ctx, node)
if err != nil {
return err
}

log.Infof("forcefully deleting %d pods", len(pods))
if options.GracePeriodSeconds != nil {
log.Infof("forcefully deleting %d pods with gracePeriod %d", len(pods), *options.GracePeriodSeconds)
} else {
log.Infof("forcefully deleting %d pods", len(pods))
}

deletePod := func(ctx context.Context, pod v1.Pod) error {
return h.deletePod(ctx, options, pod)
}

if err := h.sendPodsRequests(ctx, pods, h.deletePod); err != nil {
if err := h.sendPodsRequests(ctx, pods, deletePod); err != nil {
return fmt.Errorf("sending delete pods requests: %w", err)
}

Expand Down Expand Up @@ -230,10 +253,10 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
return nil
}

func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
func (h *drainNodeHandler) deletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod) error {
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
action := func() error {
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, options)
if err != nil {
// Pod is not found - ignore.
if apierrors.IsNotFound(err) {
Expand Down
125 changes: 125 additions & 0 deletions actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package actions
import (
"context"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -112,6 +113,130 @@ func TestDrainNodeHandler(t *testing.T) {
_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.NoError(err)
})

t.Run("eviction timeout - force remove pods", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
podName := "pod1"
clientset := setupFakeClientWithNodePodEviction(nodeName, podName)

h := drainNodeHandler{
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 7 * time.Second,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsTerminationWaitRetryDelay: 10 * time.Second,
}}

req := &castai.ActionDrainNode{
NodeName: "node1",
DrainTimeoutSeconds: 1,
Force: true,
}

clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
deleteAction := action.(ktest.DeleteActionImpl)
if deleteAction.Name == podName {
if deleteAction.DeleteOptions.GracePeriodSeconds == nil {
return true, nil, context.DeadlineExceeded
}
return false, nil, nil
}
return false, nil, nil
})

err := h.Handle(context.Background(), req)
r.NoError(err)

n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.NoError(err)
r.True(n.Spec.Unschedulable)

_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.True(apierrors.IsNotFound(err))
})

t.Run("eviction timeout - force remove pods - failure", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
podName := "pod1"
clientset := setupFakeClientWithNodePodEviction(nodeName, podName)

h := drainNodeHandler{
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 7 * time.Second,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsTerminationWaitRetryDelay: 10 * time.Second,
}}

req := &castai.ActionDrainNode{
NodeName: "node1",
DrainTimeoutSeconds: 1,
Force: true,
}

clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
deleteAction := action.(ktest.DeleteActionImpl)
if deleteAction.Name == podName {
return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonInternalError, Message: "internal"}}
}
return false, nil, nil
})

err := h.Handle(context.Background(), req)
r.EqualError(err, "forcefully deleting pods: sending delete pods requests: deleting pod pod1 in namespace default: internal")

_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.NoError(err)
})

t.Run("eviction timeout - force remove pods with grace 0 - failure", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
podName := "pod1"
clientset := setupFakeClientWithNodePodEviction(nodeName, podName)

h := drainNodeHandler{
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 7 * time.Second,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsTerminationWaitRetryDelay: 10 * time.Second,
}}

req := &castai.ActionDrainNode{
NodeName: "node1",
DrainTimeoutSeconds: 1,
Force: true,
}

clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
deleteAction := action.(ktest.DeleteActionImpl)
if deleteAction.Name == podName {
if deleteAction.DeleteOptions.GracePeriodSeconds == nil {
return true, nil, context.DeadlineExceeded
}
return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonInternalError, Message: "internal"}}
}
return false, nil, nil
})

err := h.Handle(context.Background(), req)
r.EqualError(err, "forcefully deleting pods: sending delete pods requests: deleting pod pod1 in namespace default: internal")

_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.NoError(err)
})
}

func prependEvictionReaction(t *testing.T, c *fake.Clientset, success bool) {
Expand Down
27 changes: 14 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ require (
github.com/stretchr/testify v1.7.2
go.uber.org/goleak v1.1.12
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
helm.sh/helm/v3 v3.9.2
k8s.io/api v0.24.2
k8s.io/apimachinery v0.24.2
helm.sh/helm/v3 v3.9.4
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
k8s.io/apiserver v0.24.2
k8s.io/cli-runtime v0.24.2
k8s.io/client-go v0.24.2
k8s.io/client-go v0.25.2
sigs.k8s.io/controller-runtime v0.8.3
sigs.k8s.io/yaml v1.3.0
)
Expand Down Expand Up @@ -49,6 +49,7 @@ require (
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/color v1.13.0 // indirect
Expand All @@ -64,7 +65,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/mux v1.8.0 // indirect
Expand Down Expand Up @@ -126,29 +127,29 @@ require (
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368 // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.24.2 // indirect
k8s.io/component-base v0.24.2 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/kubectl v0.24.2 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
oras.land/oras-go v1.2.0 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/kustomize/api v0.11.4 // indirect
sigs.k8s.io/kustomize/kyaml v0.13.6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)
Loading

0 comments on commit fbeae32

Please sign in to comment.