Skip to content

Commit

Permalink
syncer: add rolling update feature code annotation radondb#165
Browse files Browse the repository at this point in the history
  • Loading branch information
runkecheng committed Aug 4, 2021
1 parent 7521423 commit 990f346
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ type StatefulSetSyncer struct {

sfs *appsv1.StatefulSet

// configmap resourceVersion.
// Configmap resourceVersion.
cmRev string

// secret resourceVersion.
// Secret resourceVersion.
sctRev string
}

Expand Down Expand Up @@ -97,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// It's called by cluster controller, when return error, retry Reconcile(),when return nil, exit this cycle.
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
Expand All @@ -105,14 +106,19 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)

result.Operation, err = s.createOrUpdate(ctx)

// Get namespace and name.
key := client.ObjectKeyFromObject(s.sfs)
// Get groupVersionKind.
gvk, gvkErr := apiutil.GVKForObject(s.sfs, s.cli.Scheme())
if gvkErr != nil {
kind = fmt.Sprintf("%T", s.sfs)
} else {
kind = gvk.String()
}

// Print log.
// Info: owner is deleted or ignored error.
// Waring: other errors.
// Normal: no error.
switch {
case errors.Is(err, syncer.ErrOwnerDeleted):
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err)
Expand All @@ -133,9 +139,10 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)
}

// createOrUpdate creates or updates the statefulset in the Kubernetes cluster.
// see https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate
// See https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/controller/controllerutil?utm_source=gopls#CreateOrUpdate
func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.OperationResult, error) {
var err error
// Check if statefulset exists
if err = s.cli.Get(ctx, client.ObjectKeyFromObject(s.sfs), s.sfs); err != nil {
if !k8serrors.IsNotFound(err) {
return controllerutil.OperationResultNone, err
Expand All @@ -151,32 +158,33 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultCreated, nil
}
}

// Deep copy the old statefulset from StatefulSetSyncer.
existing := s.sfs.DeepCopyObject()
// Sync data from cluster.spec to statefulset.
if err = s.mutate(); err != nil {
return controllerutil.OperationResultNone, err
}

// Check if statefulset changed.
if equality.Semantic.DeepEqual(existing, s.sfs) {
return controllerutil.OperationResultNone, nil
}

// If changed, update statefulset.
if err := s.cli.Update(ctx, s.sfs); err != nil {
return controllerutil.OperationResultNone, err
}

// Update every pods of statefulset.
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

// Update pvc.
if err := s.updatePVC(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
// updatePod update the pods, update follower nodes first.
// This can reduce the number of master-slave switching during the update process.
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
Expand All @@ -188,7 +196,7 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}

// Get all pods.
pods := corev1.PodList{}
if err := s.cli.List(ctx,
&pods,
Expand All @@ -199,31 +207,33 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
); err != nil {
return err
}

var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
// Check if the pod is healthy.
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

// Skip if pod is leader.
if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" {
leaderPod = pod
continue
}

followerPods = append(followerPods, pod)
// If pod is not leader, direct update.
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
}

// All followers have been updated now, then update leader.
if leaderPod.Name != "" {
// When replicas is two (one leader and one follower).
if len(followerPods) == 1 {
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil {
return err
}
}
// Update the leader.
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
}
Expand All @@ -232,7 +242,10 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
return nil
}

// preUpdate run before update the leader pod when replica is 2.
// preUpdate run before update the leader pod when replicas is 2.
// For a two-node cluster, we need to adjust the parameter rpl_semi_sync_master_timeout
// before upgrading from the node.
// Otherwise, the leader node will hang due to semi-sync.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
if s.sfs.Status.Replicas != 2 {
return nil
Expand All @@ -243,7 +256,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
port := utils.MysqlPort
nameSpace := s.Namespace

// get secrets.
// Get secrets.
secret := &corev1.Secret{}
if err := s.cli.Get(context.TODO(),
types.NamespacedName{
Expand Down Expand Up @@ -276,13 +289,13 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// set leader read only.
// Set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// make sure the master has sent all binlog to slave.
// Make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
if err != nil {
return false, err
Expand All @@ -297,7 +310,7 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri

followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace)
if err = retry(time.Second*5, time.Second*60, func() (bool, error) {
// check whether is leader.
// Check whether is leader.
status, err := checkRole(followerHost, rootPasswd)
if err != nil {
log.Error(err, "failed to check role", "pod", follower)
Expand Down Expand Up @@ -356,15 +369,15 @@ func (s *StatefulSetSyncer) mutate() error {
}
}

// set owner reference only if owner resource is not being deleted, otherwise the owner
// Set owner reference only if owner resource is not being deleted, otherwise the owner
// reference will be reset in case of deleting with cascade=false.
if s.Unwrap().GetDeletionTimestamp().IsZero() {
if err := controllerutil.SetControllerReference(s.Unwrap(), s.sfs, s.cli.Scheme()); err != nil {
return err
}
} else if ctime := s.Unwrap().GetCreationTimestamp(); ctime.IsZero() {
// the owner is deleted, don't recreate the resource if does not exist, because gc
// will not delete it again because has no owner reference set
// The owner is deleted, don't recreate the resource if does not exist, because gc
// will not delete it again because has no owner reference set.
return fmt.Errorf("owner is deleted")
}
return nil
Expand Down Expand Up @@ -438,6 +451,7 @@ func (s *StatefulSetSyncer) updatePVC(ctx context.Context) error {
}

func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) error {
// Check version, if not latest, delete node.
if pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
log.Info("pod is already updated", "pod name", pod.Name)
} else {
Expand All @@ -451,7 +465,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
}
}

// wait the pod restart and healthy.
// Wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -480,9 +494,8 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
})
}

// retry runs func "f" every "in" time until "limit" is reached
// it also doesn't have an extra tail wait after the limit is reached
// and f func runs first time instantly
// retry runs func "f" every "in" time until "limit" is reached.
// Manually switch the master node.
func retry(in, limit time.Duration, f func() (bool, error)) error {
fdone, err := f()
if err != nil {
Expand Down

0 comments on commit 990f346

Please sign in to comment.