Skip to content

Commit

Permalink
*: support two nodes roll update radondb#121
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jul 27, 2021
1 parent 54e334d commit a622540
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 67 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ type NodeCondition struct {
LastTransitionTime metav1.Time `json:"lastTransitionTime"`
}

// The index of the NodeStatus.Conditions.
type NodeConditionsIndex uint8

const (
IndexLagged NodeConditionsIndex = iota
IndexLeader
IndexReadOnly
IndexReplicating
)

// NodeConditionType defines type for node condition type.
type NodeConditionType string

Expand Down
163 changes: 133 additions & 30 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package syncer

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/iancoleman/strcase"
Expand All @@ -39,6 +42,7 @@ import (

"github.com/radondb/radondb-mysql-kubernetes/cluster"
"github.com/radondb/radondb-mysql-kubernetes/cluster/container"
"github.com/radondb/radondb-mysql-kubernetes/internal"
"github.com/radondb/radondb-mysql-kubernetes/utils"
)

Expand Down Expand Up @@ -93,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
var kind string
Expand All @@ -108,20 +113,21 @@ func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error)
kind = gvk.String()
}

if errors.Is(err, syncer.ErrOwnerDeleted) {
switch {
case errors.Is(err, syncer.ErrOwnerDeleted):
log.Info(string(result.Operation), "key", key, "kind", kind, "error", err)
err = nil
} else if errors.Is(err, syncer.ErrIgnore) {
log.V(1).Info("syncer skipped", "key", key, "kind", kind, "error", err)
case errors.Is(err, syncer.ErrIgnore):
log.Info("syncer skipped", "key", key, "kind", kind, "error", err)
err = nil
} else if err != nil {
case err != nil:
result.SetEventData("Warning", basicEventReason(s.Name, err),
fmt.Sprintf("%s %s failed syncing: %s", kind, key, err))
log.Error(err, string(result.Operation), "key", key, "kind", kind)
} else {
default:
result.SetEventData("Normal", basicEventReason(s.Name, err),
fmt.Sprintf("%s %s %s successfully", kind, key, result.Operation))
log.V(1).Info(string(result.Operation), "key", key, "kind", kind)
log.Info(string(result.Operation), "key", key, "kind", kind)
}
return result, err
}
Expand Down Expand Up @@ -159,22 +165,22 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultNone, err
}

if err := s.updatePod(ctx, s.sfs); err != nil {
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error {
if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas {
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
}

log.Info("statefulSet was changed, run update")

if sfs.Status.ReadyReplicas < sfs.Status.Replicas {
if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}
Expand All @@ -183,39 +189,127 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS
if err := s.cli.List(ctx,
&pods,
&client.ListOptions{
Namespace: sfs.Namespace,
Namespace: s.sfs.Namespace,
LabelSelector: s.GetLabels().AsSelector(),
},
); err != nil {
return err
}

// get the leader pod.
var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" {
leaderPod = pod
log.Info("get leader pod", "pod name", leaderPod.Name)
continue
}

followerPods = append(followerPods, pod)
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
}

if leaderPod.Name != "" {
log.Info("apply changes to leader pod", "pod name", leaderPod.Name)
if len(followerPods) == 1 {
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil {
return err
}
}
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
}
}

log.Info("update finished")
return nil
}

// preUpdate run before update the leader pod when replica is 2.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
if s.sfs.Status.Replicas != 2 {
return nil
}

sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

// get secrets.
secret := &corev1.Secret{}
if err := s.cli.Get(context.TODO(),
types.NamespacedName{
Namespace: nameSpace,
Name: sctName,
},
secret,
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}
rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leader)
return err
}
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
if err != nil {
return false, err
}
if success {
return true, nil
}
return false, nil
}); err != nil {
return err
}

followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace)
if err = retry(time.Second*5, time.Second*60, func() (bool, error) {
// check whether is leader.
status, err := checkRole(followerHost, rootPasswd)
if err != nil {
log.Error(err, "failed to check role", "pod", follower)
return false, nil
}
if status == corev1.ConditionTrue {
return true, nil
}

// If not leader, try to leader.
xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil)
return false, nil
}); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -312,31 +406,19 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
}
}

// wait the pod restart.
// wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
return false, err
}

var xenonReady, mysqlReady bool
for _, container := range pod.Status.ContainerStatuses {
if container.Name == "xenon" {
xenonReady = container.Ready
}
if container.Name == "mysql" {
mysqlReady = container.Ready
}
}

if pod.Status.Phase == corev1.PodFailed {
return false, fmt.Errorf("pod %s is in failed phase", pod.Name)
}

if pod.Status.Phase == corev1.PodRunning && xenonReady && mysqlReady &&
pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision &&
pod.ObjectMeta.Labels["healthy"] == "yes" {
log.Info("pod is running", "pod name", pod.Name)
if pod.ObjectMeta.Labels["healthy"] == "yes" &&
pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
return true, nil
}

Expand Down Expand Up @@ -384,3 +466,24 @@ func basicEventReason(objKindName string, err error) string {

return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName))
}

func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body)
if err != nil {
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

return resp.Body, nil
}
Loading

0 comments on commit a622540

Please sign in to comment.