From a557b9b95a5d0964d26fd0821f95f578bdb962ff Mon Sep 17 00:00:00 2001 From: naoki-take Date: Tue, 25 Jun 2024 08:54:43 +0000 Subject: [PATCH] Evict 'OnDelete' DaemonSet pods Signed-off-by: naoki-take --- op/reboot.go | 129 ++++++++++++++++++++++++++++++++++++++++ op/reboot_decide.go | 67 ++++++++++++++++++--- server/strategy.go | 2 + server/strategy_test.go | 1 + 4 files changed, 190 insertions(+), 9 deletions(-) diff --git a/op/reboot.go b/op/reboot.go index 5cc207e4..a69336ba 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -201,6 +201,135 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure return nil } +type rebootEvictDaemonSetPodOp struct { + finished bool + + entries []*cke.RebootQueueEntry + config *cke.Reboot + apiserver *cke.Node + + mu sync.Mutex + failedNodes []string +} + +func RebootEvictDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator { + return &rebootEvictDaemonSetPodOp{ + entries: entries, + config: config, + apiserver: apiserver, + } +} + +type rebootEvictDaemonSetPodCommand struct { + entries []*cke.RebootQueueEntry + protectedNamespaces *metav1.LabelSelector + apiserver *cke.Node + evictAttempts int + evictInterval time.Duration + + notifyFailedNode func(string) +} + +func (o *rebootEvictDaemonSetPodOp) Name() string { + return "reboot-evict-daemonset-pod" +} + +func (o *rebootEvictDaemonSetPodOp) notifyFailedNode(node string) { + o.mu.Lock() + o.failedNodes = append(o.failedNodes, node) + o.mu.Unlock() +} + +func (o *rebootEvictDaemonSetPodOp) Targets() []string { + ipAddresses := make([]string, len(o.entries)) + for i, entry := range o.entries { + ipAddresses[i] = entry.Node + } + return ipAddresses +} + +func (o *rebootEvictDaemonSetPodOp) Info() string { + if len(o.failedNodes) == 0 { + return "" + } + return fmt.Sprintf("failed to evict DaemonSet pods on some nodes: %v", o.failedNodes) +} + +func (o *rebootEvictDaemonSetPodOp) NextCommand() cke.Commander { + if o.finished { + return nil + } + o.finished = true + + attempts := 1 + if o.config.EvictRetries != nil { + attempts = *o.config.EvictRetries + 1 + } + interval := 0 * time.Second + if o.config.EvictInterval != nil { + interval = time.Second * time.Duration(*o.config.EvictInterval) + } + + return rebootEvictDaemonSetPodCommand{ + entries: o.entries, + protectedNamespaces: o.config.ProtectedNamespaces, + apiserver: o.apiserver, + notifyFailedNode: o.notifyFailedNode, + evictAttempts: attempts, + evictInterval: interval, + } +} + +func (c rebootEvictDaemonSetPodCommand) Command() cke.Command { + ipAddresses := make([]string, len(c.entries)) + for i, entry := range c.entries { + ipAddresses[i] = entry.Node + } + return cke.Command{ + Name: "rebootEvictDaemonSetPodCommand", + Target: strings.Join(ipAddresses, ","), + } +} + +func (c rebootEvictDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { + cs, err := inf.K8sClient(ctx, c.apiserver) + if err != nil { + return err + } + + protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces) + if err != nil { + return err + } + + // evict DaemonSet pod on each node + // cordon is unnecessary for DaemonSet pods, so dry-run eviction is also skipped. + for _, entry := range c.entries { + // keep entry.Status as RebootStatusDraining and don't update it here. + + log.Info("start eviction of DaemonSet pod", map[string]interface{}{ + "name": entry.Node, + }) + err := evictOrDeleteOnDeleteDaemonSetPod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval) + if err != nil { + log.Warn("eviction of DaemonSet pod failed", map[string]interface{}{ + "name": entry.Node, + log.FnError: err, + }) + c.notifyFailedNode(entry.Node) + err = drainBackOff(ctx, inf, entry, err) + if err != nil { + return err + } + log.Info("eviction of DaemonSet pod succeeded", map[string]interface{}{ + "name": entry.Node, + }) + } + } + + return nil +} + // type rebootRebootOp struct { diff --git a/op/reboot_decide.go b/op/reboot_decide.go index c630470d..434a010b 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -9,6 +9,7 @@ import ( "github.com/cybozu-go/cke" "github.com/cybozu-go/log" "github.com/cybozu-go/well" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -61,25 +62,75 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string, return nil } +// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets. +// It calls podHandler for each target pods. +// If the handler returns error, this function returns the error immediately. +// Note: This function does not distinguish API errors and state evaluation returned from subfunction. +func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string, + podHandler func(pod *corev1.Pod) error) error { + + daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + for _, ds := range daemonSets.Items { + if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType { + labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector) + pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return err + } + + for _, pod := range pods.Items { + if pod.Spec.NodeName == node { + err = podHandler(&pod) + if err != nil { + return err + } + } + } + } + } + + return nil +} + // dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed. // It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed. func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error { - return doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, 0, true) + return enumeratePods(ctx, cs, node, + doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, time.Duration(0), true), + func(pod *corev1.Pod) error { + return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) + }, + ) } // evictOrDeleteNodePod evicts or delete Pods on the specified Node. // If a running Job Pod exists, this function returns an error. func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { - return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false) + return enumeratePods(ctx, cs, node, + doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false), + func(pod *corev1.Pod) error { + return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) + }, + ) +} + +// evictOrDeleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets. +func evictOrDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error { + return enumerateOnDeleteDaemonSetPods(ctx, cs, node, doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)) } -// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node. +// doEvictOrDeleteNodePod returns a pod handler that evicts or delete Pods on the specified Node. // It first tries eviction. // If the eviction failed and the Pod's namespace is not protected, it deletes the Pod. // If the eviction failed and the Pod's namespace is protected, it retries after `interval` interval at most `attempts` times. -// If a running Job Pod exists, this function returns an error. // If `dry` is true, it performs dry run and `attempts` and `interval` are ignored. -func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) error { +func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) func(pod *corev1.Pod) error { var deleteOptions *metav1.DeleteOptions if dry { deleteOptions = &metav1.DeleteOptions{ @@ -87,7 +138,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node } } - return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error { + return func(pod *corev1.Pod) error { if dry && !protected[pod.Namespace] { // in case of dry-run for Pods in non-protected namespace, // return immediately because its "eviction or deletion" never fails @@ -158,9 +209,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err) } return nil - }, func(pod *corev1.Pod) error { - return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase) - }) + } } // checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted. diff --git a/server/strategy.go b/server/strategy.go index c24fbd05..ca3e0a2e 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp } if len(rebootArgs.DrainCompleted) > 0 { + // After eviction of normal pods, evict "OnDelete" daemonset pods. + ops = append(ops, op.RebootEvictDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) } if len(rebootArgs.NewlyDrained) > 0 { diff --git a/server/strategy_test.go b/server/strategy_test.go index c09a684a..69bd6b5c 100644 --- a/server/strategy_test.go +++ b/server/strategy_test.go @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) { }, }), ExpectedOps: []opData{ + {"reboot-evict-daemonset-pod", 1}, {"reboot-reboot", 1}, }, },