Skip to content

Commit

Permalink
*: support two nodes roll update radondb#121
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jul 15, 2021
1 parent 5914a55 commit cd8a830
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 39 deletions.
135 changes: 124 additions & 11 deletions cluster/syncer/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package syncer

import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"time"

"github.com/iancoleman/strcase"
Expand All @@ -39,6 +42,7 @@ import (

"github.com/radondb/radondb-mysql-kubernetes/cluster"
"github.com/radondb/radondb-mysql-kubernetes/cluster/container"
"github.com/radondb/radondb-mysql-kubernetes/internal"
"github.com/radondb/radondb-mysql-kubernetes/utils"
)

Expand Down Expand Up @@ -93,6 +97,7 @@ func (s *StatefulSetSyncer) ObjectOwner() runtime.Object { return s.Unwrap() }
func (s *StatefulSetSyncer) GetOwner() runtime.Object { return s.Unwrap() }

// Sync persists data into the external store.
// See https://github.com/presslabs/controller-util/blob/master/syncer/object.go#L68
func (s *StatefulSetSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
var err error
var kind string
Expand Down Expand Up @@ -159,22 +164,22 @@ func (s *StatefulSetSyncer) createOrUpdate(ctx context.Context) (controllerutil.
return controllerutil.OperationResultNone, err
}

if err := s.updatePod(ctx, s.sfs); err != nil {
if err := s.updatePod(ctx); err != nil {
return controllerutil.OperationResultNone, err
}

return controllerutil.OperationResultUpdated, nil
}

// updatePod update the pods.
func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulSet) error {
if sfs.Status.UpdatedReplicas >= sfs.Status.Replicas {
func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
if s.sfs.Status.UpdatedReplicas >= s.sfs.Status.Replicas {
return nil
}

log.Info("statefulSet was changed, run update")

if sfs.Status.ReadyReplicas < sfs.Status.Replicas {
if s.sfs.Status.ReadyReplicas < s.sfs.Status.Replicas {
log.Info("can't start/continue 'update': waiting for all replicas are ready")
return nil
}
Expand All @@ -183,39 +188,127 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context, sfs *appsv1.StatefulS
if err := s.cli.List(ctx,
&pods,
&client.ListOptions{
Namespace: sfs.Namespace,
Namespace: s.sfs.Namespace,
LabelSelector: s.GetLabels().AsSelector(),
},
); err != nil {
return err
}

// get the leader pod.
var leaderPod corev1.Pod
var followerPods []corev1.Pod
for _, pod := range pods.Items {
if pod.ObjectMeta.Labels["healthy"] != "yes" {
return fmt.Errorf("can't start/continue 'update': pod[%s] is unhealthy", pod.Name)
}

if pod.ObjectMeta.Labels["role"] == "leader" && leaderPod.Name == "" {
leaderPod = pod
log.Info("get leader pod", "pod name", leaderPod.Name)
continue
}

followerPods = append(followerPods, pod)
if err := s.applyNWait(ctx, &pod); err != nil {
return err
}
}

if leaderPod.Name != "" {
log.Info("apply changes to leader pod", "pod name", leaderPod.Name)
if len(followerPods) == 1 {
if err := s.preUpdate(ctx, leaderPod.Name, followerPods[0].Name); err != nil {
return err
}
}
if err := s.applyNWait(ctx, &leaderPod); err != nil {
return err
}
}

log.Info("update finished")
return nil
}

// preUpdate run before update the leader pod when replica is 2.
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
if s.sfs.Status.Replicas != 2 {
return nil
}

sctName := s.GetNameForResource(utils.Secret)
svcName := s.GetNameForResource(utils.HeadlessSVC)
port := utils.MysqlPort
nameSpace := s.Namespace

// get secrets.
secret := &corev1.Secret{}
if err := s.cli.Get(context.TODO(),
types.NamespacedName{
Namespace: nameSpace,
Name: sctName,
},
secret,
); err != nil {
return fmt.Errorf("failed to get the secret: %s", sctName)
}
user, ok := secret.Data["operator-user"]
if !ok {
return fmt.Errorf("failed to get the user: %s", user)
}
password, ok := secret.Data["operator-password"]
if !ok {
return fmt.Errorf("failed to get the password: %s", password)
}
rootPasswd, ok := secret.Data["root-password"]
if !ok {
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
}

leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
if err != nil {
log.Error(err, "failed to connect the mysql", "node", leader)
return err
}
defer leaderRunner.Close()

if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
// set leader read only.
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
log.Error(err, "failed to set leader read only", "node", leader)
return false, err
}

// make sure the master has sent all binlog to slave.
success, err := leaderRunner.CheckProcesslist()
if err != nil {
return false, err
}
if success {
return true, nil
}
return false, nil
}); err != nil {
return err
}

followerHost := fmt.Sprintf("%s.%s.%s", follower, svcName, nameSpace)
if err = retry(time.Second*5, time.Second*60, func() (bool, error) {
// check whether is leader.
status, err := checkRole(followerHost, rootPasswd)
if err != nil {
log.Error(err, "failed to check role", "pod", follower)
return false, nil
}
if status == corev1.ConditionTrue {
return true, nil
}

// If not leader, try to leader.
xenonHttpRequest(followerHost, "POST", "/v1/raft/trytoleader", rootPasswd, nil)
return false, nil
}); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -312,7 +405,7 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
}
}

// wait the pod restart.
// wait the pod restart and healthy.
return retry(time.Second*10, time.Duration(waitLimit)*time.Second, func() (bool, error) {
err := s.cli.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, pod)
if err != nil && !k8serrors.IsNotFound(err) {
Expand All @@ -336,7 +429,6 @@ func (s *StatefulSetSyncer) applyNWait(ctx context.Context, pod *corev1.Pod) err
if pod.Status.Phase == corev1.PodRunning && xenonReady && mysqlReady &&
pod.ObjectMeta.Labels["controller-revision-hash"] == s.sfs.Status.UpdateRevision &&
pod.ObjectMeta.Labels["healthy"] == "yes" {
log.Info("pod is running", "pod name", pod.Name)
return true, nil
}

Expand Down Expand Up @@ -384,3 +476,24 @@ func basicEventReason(objKindName string, err error) string {

return fmt.Sprintf("%sSyncSuccessfull", strcase.ToCamel(objKindName))
}

func xenonHttpRequest(host, method, url string, rootPasswd []byte, body io.Reader) (io.ReadCloser, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s:%d%s", host, utils.XenonPeerPort, url), body)
if err != nil {
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

if resp.StatusCode != 200 {
return nil, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
}

return resp.Body, nil
}
37 changes: 11 additions & 26 deletions cluster/syncer/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package syncer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"

"github.com/presslabs/controller-util/syncer"
Expand Down Expand Up @@ -224,7 +222,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
// update apiv1alpha1.NodeConditionReadOnly.
s.updateNodeCondition(node, 2, isReadOnly)

if err = setPodHealthy(ctx, cli, &pod, node); err != nil {
if err = s.setPodHealthy(ctx, &pod, node); err != nil {
log.Error(err, "cannot update pod", "name", podName, "namespace", pod.Namespace)
}
}
Expand Down Expand Up @@ -284,28 +282,14 @@ func (s *StatusSyncer) updateNodeCondition(node *apiv1alpha1.NodeStatus, idx int

// checkRole used to check whether the mysql role is leader.
func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
status := corev1.ConditionUnknown

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s:%d/v1/raft/status", host, utils.XenonPeerPort), nil)
if err != nil {
return status, err
}
encoded := base64.StdEncoding.EncodeToString(append([]byte("root:"), rootPasswd...))
req.Header.Set("Authorization", "Basic "+encoded)

client := &http.Client{}
resp, err := client.Do(req)
body, err := xenonHttpRequest(host, "GET", "/v1/raft/status", rootPasswd, nil)
if err != nil {
return status, err
}

if resp.StatusCode != 200 {
return status, fmt.Errorf("get raft status failed, status code is %d", resp.StatusCode)
return corev1.ConditionUnknown, err
}

var out map[string]interface{}
if err = unmarshalJSON(resp.Body, &out); err != nil {
return status, err
if err = unmarshalJSON(body, &out); err != nil {
return corev1.ConditionUnknown, err
}

if out["state"] == "LEADER" {
Expand All @@ -316,27 +300,28 @@ func checkRole(host string, rootPasswd []byte) (corev1.ConditionStatus, error) {
return corev1.ConditionFalse, nil
}

return status, nil
return corev1.ConditionUnknown, nil
}

// setPodHealthy set the pod lable healthy.
func setPodHealthy(ctx context.Context, cli client.Client, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
func (s *StatusSyncer) setPodHealthy(ctx context.Context, pod *corev1.Pod, node *apiv1alpha1.NodeStatus) error {
healthy := "no"
if node.Conditions[0].Status == corev1.ConditionFalse {
if node.Conditions[1].Status == corev1.ConditionFalse &&
node.Conditions[2].Status == corev1.ConditionTrue &&
node.Conditions[3].Status == corev1.ConditionTrue {
healthy = "yes"
} else if node.Conditions[1].Status == corev1.ConditionTrue &&
node.Conditions[2].Status == corev1.ConditionFalse &&
node.Conditions[3].Status == corev1.ConditionFalse {
node.Conditions[3].Status == corev1.ConditionFalse &&
(node.Conditions[2].Status == corev1.ConditionFalse ||
node.Conditions[2].Status == corev1.ConditionTrue) {
healthy = "yes"
}
}

if pod.Labels["healthy"] != healthy {
pod.Labels["healthy"] = healthy
if err := cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
if err := s.cli.Update(ctx, pod); client.IgnoreNotFound(err) != nil {
return err
}
}
Expand Down
37 changes: 35 additions & 2 deletions internal/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,42 @@ func (s *SQLRunner) RunQuery(query string, args ...interface{}) error {
}

// GetGlobalVariable used to get the global variable by param.
func (sr *SQLRunner) GetGlobalVariable(param string, val interface{}) error {
func (s *SQLRunner) GetGlobalVariable(param string, val interface{}) error {
query := fmt.Sprintf("select @@global.%s", param)
return sr.db.QueryRow(query).Scan(val)
return s.db.QueryRow(query).Scan(val)
}

func (s *SQLRunner) CheckProcesslist() (bool, error) {
var rows *sql.Rows
rows, err := s.db.Query("show processlist;")
if err != nil {
return false, err
}

defer rows.Close()

var cols []string
cols, err = rows.Columns()
if err != nil {
return false, err
}

scanArgs := make([]interface{}, len(cols))
for i := range scanArgs {
scanArgs[i] = &sql.RawBytes{}
}

for rows.Next() {
if err = rows.Scan(scanArgs...); err != nil {
return false, err
}

state := columnValue(scanArgs, cols, "State")
if strings.Contains(state, "Master has sent all binlog to slave") {
return true, nil
}
}
return false, nil
}

// Close closes the database and prevents new queries from starting.
Expand Down

0 comments on commit cd8a830

Please sign in to comment.