Skip to content

Commit

Permalink
*: support two nodes roll update radondb#121
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jul 27, 2021
1 parent 54e334d commit 647aacb
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 61 deletions.
10 changes: 10 additions & 0 deletions api/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ type NodeCondition struct {
LastTransitionTime metav1.Time `json:"lastTransitionTime"`
}

// The index of the NodeStatus.Conditions.
type NodeConditionsIndex uint8

const (
IndexLagged NodeConditionsIndex = iota
IndexLeader
IndexReadOnly
IndexReplicating
)

// NodeConditionType defines type for node condition type.
type NodeConditionType string

Expand Down
150 changes: 126 additions & 24 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package syncer

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/iancoleman/strcase"
Expand All @@ -39,6 +42,7 @@ import (

"github.com/radondb/radondb-mysql-kubernetes/cluster"
"github.com/radondb/radondb-mysql-kubernetes/cluster/container"
"github.com/radondb/radondb-mysql-kubernetes/internal"
"github.com/radondb/radondb-mysql-kubernetes/utils"
)

Expand Down Expand Up @@ -93,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
var kind string
Expand Down Expand Up @@ -159,22 +164,22 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultNone, err
}

if err := s.updatePod(ctx, s.sfs); err != nil {
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error {
if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas {
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
}

log.Info("statefulSet was changed, run update")

if sfs.Status.ReadyReplicas < sfs.Status.Replicas {
if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}
Expand All @@ -183,39 +188,127 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS
if err := s.cli.List(ctx,
&pods,
&client.ListOptions{
Namespace: sfs.Namespace,
Namespace: s.sfs.Namespace,
LabelSelector: s.GetLabels().AsSelector(),
},
); err != nil {
return err
}

// get the leader pod.
var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" {
leaderPod = pod
log.Info("get leader pod", "pod name", leaderPod.Name)
continue
}

followerPods = append(followerPods, pod)
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
}

if leaderPod.Name != "" {
log.Info("apply changes to leader pod", "pod name", leaderPod.Name)
if len(followerPods) == 1 {
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil {
return err
}
}
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
}
}

log.Info("update finished")
return nil
}

// preUpdate run before update the leader pod when replica is 2.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
if s.sfs.Status.Replicas != 2 {
return nil
}

sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

// get secrets.
secret := &corev1.Secret{}
if err := s.cli.Get(context.TODO(),
types.NamespacedName{
Namespace: nameSpace,
Name: sctName,
},
secret,
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}
rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leader)
return err
}
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
if err != nil {
return false, err
}
if success {
return true, nil
}
return false, nil
}); err != nil {
return err
}

followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace)
if err = retry(time.Second*5, time.Second*60, func() (bool, error) {
// check whether is leader.
status, err := checkRole(followerHost, rootPasswd)
if err != nil {
log.Error(err, "failed to check role", "pod", follower)
return false, nil
}
if status == corev1.ConditionTrue {
return true, nil
}

// If not leader, try to leader.
xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil)
return false, nil
}); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -312,31 +405,19 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
}
}

// wait the pod restart.
// wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
return false, err
}

var xenonReady, mysqlReady bool
for _, container := range pod.Status.ContainerStatuses {
if container.Name == "xenon" {
xenonReady = container.Ready
}
if container.Name == "mysql" {
mysqlReady = container.Ready
}
}

if pod.Status.Phase == corev1.PodFailed {
return false, fmt.Errorf("pod %s is in failed phase", pod.Name)
}

if pod.Status.Phase == corev1.PodRunning && xenonReady && mysqlReady &&
pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision &&
pod.ObjectMeta.Labels["healthy"] == "yes" {
log.Info("pod is running", "pod name", pod.Name)
if pod.ObjectMeta.Labels["healthy"] == "yes" &&
pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision {
return true, nil
}

Expand Down Expand Up @@ -384,3 +465,24 @@ func basicEventReason(objKindName string, err error) string {

return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName))
}

func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body)
if err != nil {
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

return resp.Body, nil
}
54 changes: 19 additions & 35 deletions cluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package syncer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/presslabs/controller-util/syncer"
Expand Down Expand Up @@ -186,7 +184,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
node.Message = err.Error()
}
// update apiv1alpha1.NodeConditionLeader.
s.updateNodeCondition(node, 1, isLeader)
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)

isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port)
Expand Down Expand Up @@ -218,13 +216,13 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
}

// update apiv1alpha1.NodeConditionLagged.
s.updateNodeCondition(node, 0, isLagged)
s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged)
// update apiv1alpha1.NodeConditionReplicating.
s.updateNodeCondition(node, 3, isReplicating)
s.updateNodeCondition(node, int(apiv1alpha1.IndexReplicating), isReplicating)
// update apiv1alpha1.NodeConditionReadOnly.
s.updateNodeCondition(node, 2, isReadOnly)
s.updateNodeCondition(node, int(apiv1alpha1.IndexReadOnly), isReadOnly)

if err = setPodHealthy(ctx, cli, &pod, node); err != nil {
if err = s.setPodHealthy(ctx, &pod, node); err != nil {
log.Error(err, "cannot update pod", "name", podName, "namespace", pod.Namespace)
}
}
Expand Down Expand Up @@ -284,28 +282,14 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int

// checkRole used to check whether the mysql role is leader.
func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
status := corev1.ConditionUnknown

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/v1/raft/status", host, utils.XenonPeerPort), nil)
if err != nil {
return status, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil)
if err != nil {
return status, err
}

if resp.StatusCode != 200 {
return status, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
return corev1.ConditionUnknown, err
}

var out map[string]interface{}
if err = unmarshalJSON(resp.Body, &out); err != nil {
return status, err
if err = unmarshalJSON(body, &out); err != nil {
return corev1.ConditionUnknown, err
}

if out["state"] == "LEADER" {
Expand All @@ -316,27 +300,27 @@ func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
return corev1.ConditionFalse, nil
}

return status, nil
return corev1.ConditionUnknown, nil
}

// setPodHealthy set the pod lable healthy.
func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
healthy := "no"
if node.Conditions[0].Status == corev1.ConditionFalse {
if node.Conditions[1].Status == corev1.ConditionFalse &&
node.Conditions[2].Status == corev1.ConditionTrue &&
node.Conditions[3].Status == corev1.ConditionTrue {
if node.Conditions[apiv1alpha1.IndexLagged].Status == corev1.ConditionFalse {
if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionFalse &&
node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionTrue &&
node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionTrue {
healthy = "yes"
} else if node.Conditions[1].Status == corev1.ConditionTrue &&
node.Conditions[2].Status == corev1.ConditionFalse &&
node.Conditions[3].Status == corev1.ConditionFalse {
} else if node.Conditions[apiv1alpha1.IndexLeader].Status == corev1.ConditionTrue &&
node.Conditions[apiv1alpha1.IndexReplicating].Status == corev1.ConditionFalse &&
node.Conditions[apiv1alpha1.IndexReadOnly].Status == corev1.ConditionFalse {
healthy = "yes"
}
}

if pod.Labels["healthy"] != healthy {
pod.Labels["healthy"] = healthy
if err := cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
if err := s.cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 647aacb

Please sign in to comment.