Skip to content

Commit

Permalink
feat: drain add fallback to remove pods with grace period 0
Browse files Browse the repository at this point in the history
  • Loading branch information
aldor007 committed Sep 27, 2022
1 parent e526c49 commit dd070f8
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 40 deletions.
34 changes: 23 additions & 11 deletions actions/drain_node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface)
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 5 * time.Minute,
podsDeleteTimeout: 2 * time.Minute,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
Expand Down Expand Up @@ -86,9 +86,20 @@ func (h *drainNodeHandler) Handle(ctx context.Context, data interface{}) error {
// If force is set and evict timeout exceeded delete pods.
deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteCancel()
if err := h.deleteNodePods(deleteCtx, log, node); err != nil {
err = h.deleteNodePods(deleteCtx, log, metav1.DeleteOptions{}, node)
if err == nil {
log.Info("node drained")
return nil
}
if !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("forcefully deleting pods: %w", err)
}

deleteForceCtx, deleteForceCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout)
defer deleteForceCancel()
if err = h.deleteNodePods(deleteForceCtx, log, *metav1.NewDeleteOptions(0), node); err != nil {
return fmt.Errorf("deleting gracePeriod=0 pods: %w", err)
}
}

log.Info("node drained")
Expand Down Expand Up @@ -119,36 +130,36 @@ func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLo

log.Infof("evicting %d pods", len(pods))

if err := h.sendPodsRequests(ctx, pods, h.evictPod); err != nil {
if err := h.sendPodsRequests(ctx, metav1.DeleteOptions{}, pods, h.evictPod); err != nil {
return fmt.Errorf("sending evict pods requests: %w", err)
}

return h.waitNodePodsTerminated(ctx, node)
}

func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error {
func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, options metav1.DeleteOptions, node *v1.Node) error {
pods, err := h.listNodePodsToEvict(ctx, node)
if err != nil {
return err
}

log.Infof("forcefully deleting %d pods", len(pods))
log.Infof("forcefully deleting %d pods with options %+v", len(pods), options)

if err := h.sendPodsRequests(ctx, pods, h.deletePod); err != nil {
if err := h.sendPodsRequests(ctx, options, pods, h.deletePod); err != nil {
return fmt.Errorf("sending delete pods requests: %w", err)
}

return h.waitNodePodsTerminated(ctx, node)
}

func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, pods []v1.Pod, f func(context.Context, v1.Pod) error) error {
func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, options metav1.DeleteOptions, pods []v1.Pod, f func(context.Context, metav1.DeleteOptions, v1.Pod) error) error {
const batchSize = 5

for _, batch := range lo.Chunk(pods, batchSize) {
g, ctx := errgroup.WithContext(ctx)
for _, pod := range batch {
pod := pod
g.Go(func() error { return f(ctx, pod) })
g.Go(func() error { return f(ctx, options, pod) })
}
if err := g.Wait(); err != nil {
return err
Expand Down Expand Up @@ -195,7 +206,7 @@ func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, node *v1.

// evictPod from the k8s node. Error handling is based on eviction api documentation:
// https://kubernetes.io/docs/tasks/administer-cluster/safely-drain-node/#the-eviction-api
func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
func (h *drainNodeHandler) evictPod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod) error {
b := backoff.WithContext(backoff.NewConstantBackOff(h.cfg.podEvictRetryDelay), ctx) // nolint:gomnd
action := func() error {
err := h.clientset.CoreV1().Pods(pod.Namespace).Evict(ctx, &v1beta1.Eviction{
Expand All @@ -207,6 +218,7 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: &options,
})

if err != nil {
Expand All @@ -230,10 +242,10 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod) error {
return nil
}

func (h *drainNodeHandler) deletePod(ctx context.Context, pod v1.Pod) error {
func (h *drainNodeHandler) deletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod) error {
b := backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(h.cfg.podDeleteRetryDelay), h.cfg.podDeleteRetries), ctx) // nolint:gomnd
action := func() error {
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
err := h.clientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, options)
if err != nil {
// Pod is not found - ignore.
if apierrors.IsNotFound(err) {
Expand Down
51 changes: 51 additions & 0 deletions actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package actions

import (
"context"
"fmt"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -112,6 +114,55 @@ func TestDrainNodeHandler(t *testing.T) {
_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.NoError(err)
})

t.Run("eviction timeout - force remove pods", func(t *testing.T) {
r := require.New(t)
nodeName := "node1"
podName := "pod1"
clientset := setupFakeClientWithNodePodEviction(nodeName, podName)

h := drainNodeHandler{
log: log,
clientset: clientset,
cfg: drainNodeConfig{
podsDeleteTimeout: 7 * time.Second,
podDeleteRetries: 5,
podDeleteRetryDelay: 5 * time.Second,
podEvictRetryDelay: 5 * time.Second,
podsTerminationWaitRetryDelay: 10 * time.Second,
}}

req := &castai.ActionDrainNode{
NodeName: "node1",
DrainTimeoutSeconds: 1,
Force: true,
}

clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
deleteAction := action.(ktest.DeleteActionImpl)
fmt.Println("--->", deleteAction, deleteAction.DeleteOptions)
if deleteAction.Name == podName {
if deleteAction.DeleteOptions.GracePeriodSeconds == nil {
fmt.Println("error should be")
return true, nil, context.DeadlineExceeded
}
fmt.Println("all good")
return true, nil, nil
}
return false, nil, nil
})

err := h.Handle(context.Background(), req)
r.EqualError(err, "eviciting node pods: sending evict pods requests: evicting pod pod1 in namespace default: internal")

n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
r.NoError(err)
r.True(n.Spec.Unschedulable)

_, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{})
r.NoError(err)

})
}

func prependEvictionReaction(t *testing.T, c *fake.Clientset, success bool) {
Expand Down
25 changes: 13 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ require (
go.uber.org/goleak v1.1.12
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
helm.sh/helm/v3 v3.9.2
k8s.io/api v0.24.2
k8s.io/apimachinery v0.24.2
k8s.io/api v0.25.2
k8s.io/apimachinery v0.25.2
k8s.io/apiserver v0.24.2
k8s.io/cli-runtime v0.24.2
k8s.io/client-go v0.24.2
k8s.io/client-go v0.25.2
sigs.k8s.io/controller-runtime v0.8.3
sigs.k8s.io/yaml v1.3.0
)
Expand Down Expand Up @@ -49,6 +49,7 @@ require (
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/emicklei/go-restful v2.9.5+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.8.0 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
github.com/fatih/color v1.13.0 // indirect
Expand All @@ -64,7 +65,7 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/gorilla/mux v1.8.0 // indirect
Expand Down Expand Up @@ -126,29 +127,29 @@ require (
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368 // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.24.2 // indirect
k8s.io/component-base v0.24.2 // indirect
k8s.io/klog/v2 v2.60.1 // indirect
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect
k8s.io/kubectl v0.24.2 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
oras.land/oras-go v1.2.0 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/kustomize/api v0.11.4 // indirect
sigs.k8s.io/kustomize/kyaml v0.13.6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)
Loading

0 comments on commit dd070f8

Please sign in to comment.