Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete 'OnDelete' DaemonSet pods #746

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,137 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure

//

type rebootEvictDaemonSetPodOp struct {
finished bool

entries []*cke.RebootQueueEntry
config *cke.Reboot
apiserver *cke.Node

mu sync.Mutex
failedNodes []string
}

func RebootEvictDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator {
return &rebootEvictDaemonSetPodOp{
entries: entries,
config: config,
apiserver: apiserver,
}
}

type rebootEvictDaemonSetPodCommand struct {
entries []*cke.RebootQueueEntry
protectedNamespaces *metav1.LabelSelector
apiserver *cke.Node
evictAttempts int
evictInterval time.Duration

notifyFailedNode func(string)
}

func (o *rebootEvictDaemonSetPodOp) Name() string {
return "reboot-evict-daemonset-pod"
}

func (o *rebootEvictDaemonSetPodOp) notifyFailedNode(node string) {
o.mu.Lock()
o.failedNodes = append(o.failedNodes, node)
o.mu.Unlock()
}

func (o *rebootEvictDaemonSetPodOp) Targets() []string {
ipAddresses := make([]string, len(o.entries))
for i, entry := range o.entries {
ipAddresses[i] = entry.Node
}
return ipAddresses
}

func (o *rebootEvictDaemonSetPodOp) Info() string {
if len(o.failedNodes) == 0 {
return ""
}
return fmt.Sprintf("failed to evict DaemonSet pods on some nodes: %v", o.failedNodes)
}

func (o *rebootEvictDaemonSetPodOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true

attempts := 1
if o.config.EvictRetries != nil {
attempts = *o.config.EvictRetries + 1
}
interval := 0 * time.Second
if o.config.EvictInterval != nil {
interval = time.Second * time.Duration(*o.config.EvictInterval)
}

return rebootEvictDaemonSetPodCommand{
entries: o.entries,
protectedNamespaces: o.config.ProtectedNamespaces,
apiserver: o.apiserver,
notifyFailedNode: o.notifyFailedNode,
evictAttempts: attempts,
evictInterval: interval,
}
}

func (c rebootEvictDaemonSetPodCommand) Command() cke.Command {
ipAddresses := make([]string, len(c.entries))
for i, entry := range c.entries {
ipAddresses[i] = entry.Node
}
return cke.Command{
Name: "rebootEvictDaemonSetPodCommand",
Target: strings.Join(ipAddresses, ","),
}
}

func (c rebootEvictDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cs, err := inf.K8sClient(ctx, c.apiserver)
if err != nil {
return err
}

protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces)
if err != nil {
return err
}

// evict DaemonSet pod on each node
// cordon is unnecessary for DaemonSet pods, so dry-run eviction is also skipped.
for _, entry := range c.entries {
// keep entry.Status as RebootStatusDraining and don't update it here.

log.Info("start eviction of DaemonSet pod", map[string]interface{}{
"name": entry.Node,
})
err := evictOrDeleteOnDeleteDaemonSetPod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval)
if err != nil {
log.Warn("eviction of DaemonSet pod failed", map[string]interface{}{
"name": entry.Node,
log.FnError: err,
})
c.notifyFailedNode(entry.Node)
err = drainBackOff(ctx, inf, entry, err)
if err != nil {
return err
}
log.Info("eviction of DaemonSet pod succeeded", map[string]interface{}{
"name": entry.Node,
})
}
}

return nil
}

//

type rebootRebootOp struct {
finished bool

Expand Down
67 changes: 58 additions & 9 deletions op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,33 +62,83 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string,
return nil
}

// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets.
// It calls podHandler for each target pods.
// If the handler returns error, this function returns the error immediately.
// Note: This function does not distinguish API errors and state evaluation returned from subfunction.
func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string,
podHandler func(pod *corev1.Pod) error) error {

daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, ds := range daemonSets.Items {
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector)
pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}

for _, pod := range pods.Items {
if pod.Spec.NodeName == node {
err = podHandler(&pod)
if err != nil {
return err
}
}
}
}
}

return nil
}

// dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed.
// It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed.
func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error {
return doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, 0, true)
return enumeratePods(ctx, cs, node,
doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, time.Duration(0), true),
func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
},
)
}

// evictOrDeleteNodePod evicts or delete Pods on the specified Node.
// If a running Job Pod exists, this function returns an error.
func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error {
return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)
return enumeratePods(ctx, cs, node,
doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false),
func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
},
)
}

// evictOrDeleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets.
func evictOrDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error {
return enumerateOnDeleteDaemonSetPods(ctx, cs, node, doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false))
}

// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node.
// doEvictOrDeleteNodePod returns a pod handler that evicts or delete Pods on the specified Node.
// It first tries eviction.
// If the eviction failed and the Pod's namespace is not protected, it deletes the Pod.
// If the eviction failed and the Pod's namespace is protected, it retries after `interval` interval at most `attempts` times.
// If a running Job Pod exists, this function returns an error.
// If `dry` is true, it performs dry run and `attempts` and `interval` are ignored.
func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) error {
func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) func(pod *corev1.Pod) error {
var deleteOptions *metav1.DeleteOptions
if dry {
deleteOptions = &metav1.DeleteOptions{
DryRun: []string{"All"},
}
}

return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error {
return func(pod *corev1.Pod) error {
if dry && !protected[pod.Namespace] {
// in case of dry-run for Pods in non-protected namespace,
// return immediately because its "eviction or deletion" never fails
Expand Down Expand Up @@ -158,9 +209,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node
return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err)
}
return nil
}, func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
})
}
}

// checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted.
Expand Down
2 changes: 2 additions & 0 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
}

if len(rebootArgs.DrainCompleted) > 0 {
// After eviction of normal pods, evict "OnDelete" daemonset pods.
ops = append(ops, op.RebootEvictDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
}
if len(rebootArgs.NewlyDrained) > 0 {
Expand Down
1 change: 1 addition & 0 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) {
},
}),
ExpectedOps: []opData{
{"reboot-evict-daemonset-pod", 1},
{"reboot-reboot", 1},
},
},
Expand Down