From c5da4f09ecb30d740653ef7c8631f4ba16869f92 Mon Sep 17 00:00:00 2001 From: runkecheng <1131648942@qq.com> Date: Wed, 4 Aug 2021 10:29:22 +0800 Subject: [PATCH] syncer: add rolling update feature code annotation #165 --- cluster/syncer/statefulset.go | 70 +++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/cluster/syncer/statefulset.go b/cluster/syncer/statefulset.go index 4f0b8df29..c4c726f1b 100644 --- a/cluster/syncer/statefulset.go +++ b/cluster/syncer/statefulset.go @@ -57,10 +57,10 @@ type StatefulSetSyncer struct { sfs *appsv1.StatefulSet - // configmap resourceVersion. + // Configmap resourceVersion. cmRev string - // secret resourceVersion. + // Secret resourceVersion. sctRev string } @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() } func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() } // Sync persists data into the external store. +// It's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle. // See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68 func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) { var err error @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) result.Operation, err = s.createOrUpdate(ctx) + // Get namespace and name. key := client.ObjectKeyFromObject(s.sfs) + // Get groupVersionKind. gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme()) if gvkErr != nil { kind = fmt.Sprintf("%T", s.sfs) } else { kind = gvk.String() } - + // Print log. + // Info: owner is deleted or ignored error. + // Waring: other errors. + // Normal: no error. switch { case errors.Is(err, syncer.ErrOwnerDeleted): log.Info(string(result.Operation), "key", key, "kind", kind, "error", err) @@ -133,9 +139,10 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) } // createOrUpdate creates or updates the statefulset in the Kubernetes cluster. -// see https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.9.2/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate +// See https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.9.2/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) { var err error + // Check if sfs exist. if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil { if !k8serrors.IsNotFound(err) { return controllerutil.OperationResultNone, err @@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil. return controllerutil.OperationResultCreated, nil } } - + // Deep copy the old statefulset from StatefulSetSyncer. existing := s.sfs.DeepCopyObject() + // Sync data from cluster.spec to statefulset. if err = s.mutate(); err != nil { return controllerutil.OperationResultNone, err } - + // Check if statefulset changed. if equality.Semantic.DeepEqual(existing, s.sfs) { return controllerutil.OperationResultNone, nil } - + // If changed, update statefulset. if err := s.cli.Update(ctx, s.sfs); err != nil { return controllerutil.OperationResultNone, err } - + // Update every pods of statefulset. if err := s.updatePod(ctx); err != nil { return controllerutil.OperationResultNone, err } - + // Update pvc. if err := s.updatePVC(ctx); err != nil { return controllerutil.OperationResultNone, err } - return controllerutil.OperationResultUpdated, nil } -// updatePod update the pods. +// updatePod update the pods, update follower nodes first. +// This can reduce the number of master-slave switching during the update process. func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas { return nil @@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { log.Info("can't start/continue 'update': waiting for all replicas are ready") return nil } - + // Get all pods. pods := corev1.PodList{} if err := s.cli.List(ctx, &pods, @@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { ); err != nil { return err } - var leaderPod corev1.Pod var followerPods []corev1.Pod for _, pod := range pods.Items { + // Check if the pod is healthy. if pod.ObjectMeta.Labels["healthy"] != "yes" { return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name) } - + // Skip if pod is leader. if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" { leaderPod = pod continue } - followerPods = append(followerPods, pod) + // If pod is not leader, direct update. if err := s.applyNWait(ctx, &pod); err != nil { return err } } - + // All followers have been updated now, then update leader. if leaderPod.Name != "" { + // When replicas is two (one leader and one follower). if len(followerPods) == 1 { if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil { return err } } + // Replicas greater than two. if err := s.applyNWait(ctx, &leaderPod); err != nil { return err } @@ -232,7 +242,10 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error { return nil } -// preUpdate run before update the leader pod when replica is 2. +// preUpdate run before update the leader pod when replicas is 2. +// For a two-node cluster, we need to adjust the parameter rpl_semi_sync_master_timeout +// before upgrading from the node. +// Otherwise, the leader node will hang due to semi-sync. func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error { if s.sfs.Status.Replicas != 2 { return nil @@ -243,7 +256,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri port := utils.MysqlPort nameSpace := s.Namespace - // get secrets. + // Get secrets. secret := &corev1.Secret{} if err := s.cli.Get(context.TODO(), types.NamespacedName{ @@ -276,13 +289,13 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri defer leaderRunner.Close() if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) { - // set leader read only. + // Set leader read only. if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil { log.Error(err, "failed to set leader read only", "node", leader) return false, err } - // make sure the master has sent all binlog to slave. + // Make sure the master has sent all binlog to slave. success, err := leaderRunner.CheckProcesslist() if err != nil { return false, err @@ -297,7 +310,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace) if err = retry(time.Second*5, time.Second*60, func() (bool, error) { - // check whether is leader. + // Check whether is leader. status, err := checkRole(followerHost, rootPasswd) if err != nil { log.Error(err, "failed to check role", "pod", follower) @@ -356,15 +369,15 @@ func (s *StatefulSetSyncer) mutate() error { } } - // set owner reference only if owner resource is not being deleted, otherwise the owner + // Set owner reference only if owner resource is not being deleted, otherwise the owner // reference will be reset in case of deleting with cascade=false. if s.Unwrap().GetDeletionTimestamp().IsZero() { if err := controllerutil.SetControllerReference(s.Unwrap(), s.sfs, s.cli.Scheme()); err != nil { return err } } else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() { - // the owner is deleted, don't recreate the resource if does not exist, because gc - // will not delete it again because has no owner reference set + // The owner is deleted, don't recreate the resource if does not exist, because gc + // will not delete it again because has no owner reference set. return fmt.Errorf("owner is deleted") } return nil @@ -438,6 +451,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error { } func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error { + // Check version, if not latest, delete node. if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision { log.Info("pod is already updated", "pod name", pod.Name) } else { @@ -451,7 +465,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err } } - // wait the pod restart and healthy. + // Wait the pod restart and healthy. return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) { err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod) if err != nil && !k8serrors.IsNotFound(err) { @@ -480,9 +494,9 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err }) } -// retry runs func "f" every "in" time until "limit" is reached -// it also doesn't have an extra tail wait after the limit is reached -// and f func runs first time instantly +// Retry runs func "f" every "in" time until "limit" is reached. +// It also doesn't have an extra tail wait after the limit is reached +// and f func runs first time instantly. func retry(in, limit time.Duration, f func() (bool, error)) error { fdone, err := f() if err != nil {