Skip to content

Commit

Permalink
Merge pull request #746 from cybozu-go/delete-daemonset-ondelete-pods
Browse files Browse the repository at this point in the history
Delete 'OnDelete' DaemonSet pods
  • Loading branch information
tkna authored Jun 27, 2024
2 parents 8f02dde + 6c9df33 commit 72df03b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 0 deletions.
103 changes: 103 additions & 0 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,109 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure

//

type rebootDeleteDaemonSetPodOp struct {
finished bool

entries []*cke.RebootQueueEntry
config *cke.Reboot
apiserver *cke.Node

mu sync.Mutex
failedNodes []string
}

func RebootDeleteDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator {
return &rebootDeleteDaemonSetPodOp{
entries: entries,
config: config,
apiserver: apiserver,
}
}

type rebootDeleteDaemonSetPodCommand struct {
entries []*cke.RebootQueueEntry
apiserver *cke.Node

notifyFailedNode func(string)
}

func (o *rebootDeleteDaemonSetPodOp) Name() string {
return "reboot-delete-daemonset-pod"
}

func (o *rebootDeleteDaemonSetPodOp) notifyFailedNode(node string) {
o.mu.Lock()
o.failedNodes = append(o.failedNodes, node)
o.mu.Unlock()
}

func (o *rebootDeleteDaemonSetPodOp) Targets() []string {
ipAddresses := make([]string, len(o.entries))
for i, entry := range o.entries {
ipAddresses[i] = entry.Node
}
return ipAddresses
}

func (o *rebootDeleteDaemonSetPodOp) Info() string {
if len(o.failedNodes) == 0 {
return ""
}
return fmt.Sprintf("failed to delete DaemonSet pods on some nodes: %v", o.failedNodes)
}

func (o *rebootDeleteDaemonSetPodOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true

return rebootDeleteDaemonSetPodCommand{
entries: o.entries,
apiserver: o.apiserver,
notifyFailedNode: o.notifyFailedNode,
}
}

func (c rebootDeleteDaemonSetPodCommand) Command() cke.Command {
ipAddresses := make([]string, len(c.entries))
for i, entry := range c.entries {
ipAddresses[i] = entry.Node
}
return cke.Command{
Name: "rebootDeleteDaemonSetPodCommand",
Target: strings.Join(ipAddresses, ","),
}
}

func (c rebootDeleteDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cs, err := inf.K8sClient(ctx, c.apiserver)
if err != nil {
return err
}

// delete DaemonSet pod on each node
for _, entry := range c.entries {
// keep entry.Status as RebootStatusDraining and don't update it here.

log.Info("start deletion of DaemonSet pod", map[string]interface{}{
"name": entry.Node,
})
err := deleteOnDeleteDaemonSetPod(ctx, cs, entry.Node)
if err != nil {
log.Warn("deletion of DaemonSet pod failed", map[string]interface{}{
"name": entry.Node,
log.FnError: err,
})
c.notifyFailedNode(entry.Node)
}
}

return nil
}

//

type rebootRebootOp struct {
finished bool

Expand Down
57 changes: 57 additions & 0 deletions op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,6 +62,42 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string,
return nil
}

// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets.
// It calls podHandler for each target pods.
// If the handler returns error, this function returns the error immediately.
// Note: This function does not distinguish API errors and state evaluation returned from subfunction.
func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string,
podHandler func(pod *corev1.Pod) error) error {

daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, ds := range daemonSets.Items {
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector)
pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}

for _, pod := range pods.Items {
if pod.Spec.NodeName == node {
err = podHandler(&pod)
if err != nil {
return err
}
}
}
}
}

return nil
}

// dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed.
// It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed.
func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error {
Expand All @@ -73,6 +110,11 @@ func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node st
return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)
}

// deleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets.
func deleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error {
return doDeleteOnDeleteDaemonSetPod(ctx, cs, node)
}

// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node.
// It first tries eviction.
// If the eviction failed and the Pod's namespace is not protected, it deletes the Pod.
Expand Down Expand Up @@ -163,6 +205,21 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node
})
}

// doDeleteOnDeleteDaemonSetPod deletes 'OnDelete' DaemonSet pods on the specified Node.
func doDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error {
return enumerateOnDeleteDaemonSetPods(ctx, cs, node, func(pod *corev1.Pod) error {
err := cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
log.Info("deleted daemonset pod", map[string]interface{}{
"namespace": pod.Namespace,
"name": pod.Name,
})
return nil
})
}

// checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted.
// If those pods still exist, this function returns an error.
func checkPodDeletion(ctx context.Context, cs *kubernetes.Clientset, node string) error {
Expand Down
2 changes: 2 additions & 0 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
}

if len(rebootArgs.DrainCompleted) > 0 {
// After eviction of normal pods, evict "OnDelete" daemonset pods.
ops = append(ops, op.RebootDeleteDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
}
if len(rebootArgs.NewlyDrained) > 0 {
Expand Down
1 change: 1 addition & 0 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) {
},
}),
ExpectedOps: []opData{
{"reboot-delete-daemonset-pod", 1},
{"reboot-reboot", 1},
},
},
Expand Down

0 comments on commit 72df03b

Please sign in to comment.